/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.avro.Protocol;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.NettyTransportCodec;
import org.apache.avro.ipc.Transceiver;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyTransceiver
extends Transceiver {
    public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60000L;
    public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
    public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
    public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
    private static final Logger LOG = LoggerFactory.getLogger((String)NettyTransceiver.class.getName());
    private final AtomicInteger serialGenerator = new AtomicInteger(0);
    private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
    private final ChannelFactory channelFactory;
    private final long connectTimeoutMillis;
    private final ClientBootstrap bootstrap;
    private final InetSocketAddress remoteAddr;
    volatile ChannelFuture channelFuture;
    volatile boolean stopping;
    private final Object channelFutureLock = new Object();
    private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
    private Channel channel;
    private Protocol remote;

    NettyTransceiver() {
        this.channelFactory = null;
        this.connectTimeoutMillis = 0L;
        this.bootstrap = null;
        this.remoteAddr = null;
        this.channelFuture = null;
    }

    public NettyTransceiver(InetSocketAddress addr) throws IOException {
        this(addr, 60000L);
    }

    public NettyTransceiver(InetSocketAddress addr, Long connectTimeoutMillis) throws IOException {
        this(addr, (ChannelFactory)new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), (Executor)Executors.newCachedThreadPool(new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), connectTimeoutMillis);
    }

    public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException {
        this(addr, channelFactory, NettyTransceiver.buildDefaultBootstrapOptions(null));
    }

    public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Long connectTimeoutMillis) throws IOException {
        this(addr, channelFactory, NettyTransceiver.buildDefaultBootstrapOptions(connectTimeoutMillis));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Map<String, Object> nettyClientBootstrapOptions) throws IOException {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory is null");
        }
        this.channelFactory = channelFactory;
        this.connectTimeoutMillis = (Long)nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION);
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.remoteAddr = addr;
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline p = Channels.pipeline();
                p.addLast("frameDecoder", (ChannelHandler)new NettyTransportCodec.NettyFrameDecoder());
                p.addLast("frameEncoder", (ChannelHandler)new NettyTransportCodec.NettyFrameEncoder());
                p.addLast("handler", (ChannelHandler)new NettyClientAvroHandler());
                return p;
            }
        });
        if (nettyClientBootstrapOptions != null) {
            LOG.debug("Using Netty bootstrap options: " + nettyClientBootstrapOptions);
            this.bootstrap.setOptions(nettyClientBootstrapOptions);
        }
        this.stateLock.readLock().lock();
        try {
            this.getChannel();
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    private static Map<String, Object> buildDefaultBootstrapOptions(Long connectTimeoutMillis) {
        HashMap<String, Object> options = new HashMap<String, Object>(2);
        options.put(NETTY_TCP_NODELAY_OPTION, true);
        options.put(NETTY_CONNECT_TIMEOUT_OPTION, connectTimeoutMillis == null ? 60000L : connectTimeoutMillis);
        return options;
    }

    private static boolean isChannelReady(Channel channel) {
        return channel != null && channel.isOpen() && channel.isBound() && channel.isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Channel getChannel() throws IOException {
        block12: {
            if (!NettyTransceiver.isChannelReady(this.channel)) {
                this.stateLock.readLock().unlock();
                this.stateLock.writeLock().lock();
                try {
                    if (NettyTransceiver.isChannelReady(this.channel)) break block12;
                    Object object = this.channelFutureLock;
                    synchronized (object) {
                        if (!this.stopping) {
                            LOG.debug("Connecting to " + this.remoteAddr);
                            this.channelFuture = this.bootstrap.connect((SocketAddress)this.remoteAddr);
                        }
                    }
                    if (this.channelFuture == null) break block12;
                    this.channelFuture.awaitUninterruptibly(this.connectTimeoutMillis);
                    object = this.channelFutureLock;
                    synchronized (object) {
                        if (!this.channelFuture.isSuccess()) {
                            throw new IOException("Error connecting to " + this.remoteAddr, this.channelFuture.getCause());
                        }
                        this.channel = this.channelFuture.getChannel();
                        this.channelFuture = null;
                    }
                }
                finally {
                    this.stateLock.readLock().lock();
                    this.stateLock.writeLock().unlock();
                }
            }
        }
        return this.channel;
    }

    private void disconnect() {
        this.disconnect(false, false, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests, Throwable cause) {
        Channel channelToClose = null;
        ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
        boolean stateReadLockHeld = this.stateLock.getReadHoldCount() != 0;
        Object object = this.channelFutureLock;
        synchronized (object) {
            if (this.stopping && this.channelFuture != null) {
                this.channelFuture.cancel();
            }
        }
        if (stateReadLockHeld) {
            this.stateLock.readLock().unlock();
        }
        this.stateLock.writeLock().lock();
        try {
            if (this.channel != null) {
                if (cause != null) {
                    LOG.debug("Disconnecting from " + this.remoteAddr, cause);
                } else {
                    LOG.debug("Disconnecting from " + this.remoteAddr);
                }
                channelToClose = this.channel;
                this.channel = null;
                this.remote = null;
                if (cancelPendingRequests) {
                    requestsToCancel = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(this.requests);
                    this.requests.clear();
                }
            }
        }
        finally {
            if (stateReadLockHeld) {
                this.stateLock.readLock().lock();
            }
            this.stateLock.writeLock().unlock();
        }
        if (requestsToCancel != null && !requestsToCancel.isEmpty()) {
            LOG.debug("Removing " + requestsToCancel.size() + " pending request(s).");
            for (Callback request : requestsToCancel.values()) {
                request.handleError(cause != null ? cause : new IOException(this.getClass().getSimpleName() + " closed"));
            }
        }
        if (channelToClose != null) {
            ChannelFuture closeFuture = channelToClose.close();
            if (awaitCompletion && closeFuture != null) {
                closeFuture.awaitUninterruptibly(this.connectTimeoutMillis);
            }
        }
    }

    @Override
    public void lockChannel() {
    }

    @Override
    public void unlockChannel() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        try {
            this.stopping = true;
            this.disconnect(true, true, null);
        }
        finally {
            this.channelFactory.releaseExternalResources();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getRemoteName() throws IOException {
        this.stateLock.readLock().lock();
        try {
            String string = this.getChannel().getRemoteAddress().toString();
            return string;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override
    public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException {
        try {
            CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
            this.transceive(request, transceiverFuture);
            return transceiverFuture.get();
        }
        catch (InterruptedException e) {
            LOG.debug("failed to get the response", (Throwable)e);
            return null;
        }
        catch (ExecutionException e) {
            LOG.debug("failed to get the response", (Throwable)e);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException {
        this.stateLock.readLock().lock();
        try {
            int serial = this.serialGenerator.incrementAndGet();
            NettyTransportCodec.NettyDataPack dataPack = new NettyTransportCodec.NettyDataPack(serial, request);
            this.requests.put(serial, callback);
            this.writeDataPack(dataPack);
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
        this.stateLock.readLock().lock();
        try {
            this.writeDataPack(new NettyTransportCodec.NettyDataPack(this.serialGenerator.incrementAndGet(), buffers));
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    private void writeDataPack(NettyTransportCodec.NettyDataPack dataPack) throws IOException {
        this.getChannel().write((Object)dataPack);
    }

    @Override
    public List<ByteBuffer> readBuffers() throws IOException {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Protocol getRemote() {
        this.stateLock.readLock().lock();
        try {
            Protocol protocol = this.remote;
            return protocol;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isConnected() {
        this.stateLock.readLock().lock();
        try {
            boolean bl = this.remote != null;
            return bl;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setRemote(Protocol protocol) {
        this.stateLock.writeLock().lock();
        try {
            this.remote = protocol;
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    private static class NettyTransceiverThreadFactory
    implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;

        public NettyTransceiverThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(this.prefix + " " + this.threadId.incrementAndGet());
            return thread;
        }
    }

    class NettyClientAvroHandler
    extends SimpleChannelUpstreamHandler {
        NettyClientAvroHandler() {
        }

        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            if (e instanceof ChannelStateEvent) {
                LOG.debug(e.toString());
                ChannelStateEvent cse = (ChannelStateEvent)e;
                if (cse.getState() == ChannelState.OPEN && Boolean.FALSE.equals(cse.getValue())) {
                    LOG.debug("Remote peer " + NettyTransceiver.this.remoteAddr + " closed connection.");
                    NettyTransceiver.this.disconnect(false, true, null);
                }
            }
            super.handleUpstream(ctx, e);
        }

        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            super.channelOpen(ctx, e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            NettyTransportCodec.NettyDataPack dataPack = (NettyTransportCodec.NettyDataPack)e.getMessage();
            Callback callback = (Callback)NettyTransceiver.this.requests.get(dataPack.getSerial());
            if (callback == null) {
                throw new RuntimeException("Missing previous call info");
            }
            try {
                callback.handleResult(dataPack.getDatas());
            }
            finally {
                NettyTransceiver.this.requests.remove(dataPack.getSerial());
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            NettyTransceiver.this.disconnect(false, true, e.getCause());
        }
    }
}

