package org.xtreemfs.foundation.pbrpc.server;

import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.xtreemfs.foundation.LifeCycleThread;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.pbrpc.generatedinterfaces.RPC;
import org.xtreemfs.foundation.pbrpc.utils.PBRPCDatagramPacket;
import org.xtreemfs.foundation.pbrpc.utils.RecordMarker;
import org.xtreemfs.foundation.pbrpc.utils.ReusableBufferInputStream;

/* loaded from: input_file:org/xtreemfs/foundation/pbrpc/server/RPCUDPSocketServer.class */
public class RPCUDPSocketServer extends LifeCycleThread implements RPCServerInterface {
    public final int port;
    private final DatagramChannel channel;
    private final Selector selector;
    private volatile boolean quit;
    private final LinkedBlockingQueue<UDPMessage> q;
    private final RPCServerRequestListener receiver;
    public static final int MAX_UDP_SIZE = 2048;
    private final AtomicInteger callIdCounter;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RPCUDPSocketServer(int i, RPCServerRequestListener rPCServerRequestListener) throws IOException {
        super("UDPComStage");
        this.port = i;
        this.q = new LinkedBlockingQueue<>();
        this.receiver = rPCServerRequestListener;
        this.callIdCounter = new AtomicInteger(1);
        this.selector = Selector.open();
        this.channel = DatagramChannel.open();
        this.channel.socket().setReuseAddress(true);
        this.channel.socket().bind(new InetSocketAddress(i));
        this.channel.configureBlocking(false);
        this.channel.register(this.selector, 1);
        if (Logging.isInfo()) {
            Logging.logMessage(6, Logging.Category.net, this, "UDP socket on port %d ready", Integer.valueOf(i));
        }
    }

    @Override // org.xtreemfs.foundation.pbrpc.server.RPCServerInterface
    public void sendResponse(RPCServerRequest rPCServerRequest, RPCServerResponse rPCServerResponse) {
        UDPMessage uDPMessage = new UDPMessage(rPCServerResponse.getBuffers()[0], ((UDPMessage) rPCServerRequest.getConnection()).getAddress(), this);
        rPCServerRequest.freeBuffers();
        send(uDPMessage);
    }

    public void sendRequest(RPC.RPCHeader rPCHeader, Message message, InetSocketAddress inetSocketAddress) throws IOException {
        PBRPCDatagramPacket pBRPCDatagramPacket = new PBRPCDatagramPacket(rPCHeader, message);
        rPCHeader.toBuilder().setCallId(this.callIdCounter.getAndIncrement()).build();
        send(new UDPMessage(pBRPCDatagramPacket.assembleDatagramPacket(), inetSocketAddress, this));
    }

    private void send(UDPMessage uDPMessage) {
        this.q.add(uDPMessage);
        if (this.q.size() == 1) {
            this.selector.wakeup();
        }
    }

    @Override // org.xtreemfs.foundation.LifeCycleThread
    public void shutdown() {
        this.quit = true;
        interrupt();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            notifyStarted();
            boolean z = true;
            loop0: while (!this.quit) {
                if (this.q.size() == 0) {
                    if (!z) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        z = true;
                    }
                } else if (z) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    z = false;
                }
                int select = this.selector.select();
                if (this.q.size() == 0) {
                    if (!z) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        z = true;
                    }
                } else if (z) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    z = false;
                }
                if (select != 0) {
                    if (this.q.size() > 10000) {
                        Logging.logMessage(4, Logging.Category.net, this, "QS!!!!! %d", Integer.valueOf(this.q.size()));
                        Logging.logMessage(4, Logging.Category.net, this, "is readOnly: " + z, new Object[0]);
                    }
                    Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        it.remove();
                        if (next.isReadable()) {
                            ReusableBuffer allocate = BufferPool.allocate(MAX_UDP_SIZE);
                            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.receive(allocate.getBuffer());
                            if (inetSocketAddress == null || !allocate.hasRemaining()) {
                                BufferPool.free(allocate);
                                Logging.logMessage(4, Logging.Category.net, this, "read key for empty read/empty packet", new Object[0]);
                            } else {
                                if (Logging.isDebug()) {
                                    Logging.logMessage(7, Logging.Category.net, this, "read data from %s", inetSocketAddress.toString());
                                }
                                try {
                                    allocate.flip();
                                    if (Logging.isDebug()) {
                                        Logging.logMessage(7, Logging.Category.net, this, "data: %s", allocate.toString());
                                    }
                                    RecordMarker recordMarker = new RecordMarker(allocate.getBuffer());
                                    if (Logging.isDebug()) {
                                        Logging.logMessage(7, Logging.Category.net, this, "rm: %d/%d data: %d", Integer.valueOf(recordMarker.getRpcHeaderLength()), Integer.valueOf(recordMarker.getMessageLength()), Integer.valueOf(allocate.limit()));
                                    }
                                    ReusableBufferInputStream reusableBufferInputStream = new ReusableBufferInputStream(allocate);
                                    int limit = allocate.limit();
                                    if (!$assertionsDisabled && limit != 12 + recordMarker.getRpcHeaderLength() + recordMarker.getMessageLength()) {
                                        throw new AssertionError();
                                        break loop0;
                                    }
                                    allocate.limit(12 + recordMarker.getRpcHeaderLength());
                                    RPC.RPCHeader build = ((RPC.RPCHeader.Builder) RPC.RPCHeader.newBuilder().mergeFrom((InputStream) reusableBufferInputStream)).build();
                                    allocate.range(12 + recordMarker.getRpcHeaderLength(), recordMarker.getMessageLength());
                                    this.receiver.receiveRecord(new RPCServerRequest(new UDPMessage(null, inetSocketAddress, this), build, allocate));
                                } catch (Throwable th) {
                                    th.printStackTrace();
                                    Logging.logMessage(4, Logging.Category.net, this, "received invalid UPD message: " + th, new Object[0]);
                                    BufferPool.free(allocate);
                                }
                            }
                        } else {
                            if (!next.isWritable()) {
                                throw new RuntimeException("strange key state: " + next);
                            }
                            UDPMessage poll = this.q.poll();
                            while (true) {
                                if (poll == null) {
                                    break;
                                }
                                if (Logging.isDebug()) {
                                    Logging.logMessage(7, Logging.Category.net, this, "sent packet to %s", poll.getAddress().toString());
                                }
                                int send = this.channel.send(poll.getBuffer().getBuffer(), poll.getAddress());
                                BufferPool.free(poll.getBuffer());
                                if (send == 0) {
                                    if (Logging.isDebug()) {
                                        Logging.logMessage(7, Logging.Category.net, this, "cannot send anymore", new Object[0]);
                                    }
                                    this.q.put(poll);
                                } else {
                                    poll = this.q.poll();
                                }
                            }
                        }
                    }
                }
            }
            this.selector.close();
            this.channel.close();
        } catch (ClosedByInterruptException e) {
        } catch (IOException e2) {
            Logging.logError(3, this, e2);
        } catch (CancelledKeyException e3) {
        } catch (Throwable th2) {
            notifyCrashed(th2);
            return;
        }
        notifyStopped();
    }

    static {
        $assertionsDisabled = !RPCUDPSocketServer.class.desiredAssertionStatus();
    }
}
