/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.RemoteReceiver;

public class OutboundConnectionQueue
extends ChannelInboundHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
    private final ChannelWriteListener writeListener = new ChannelWriteListener();
    private final Channel channel;
    private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
    private final RemoteReceiver receiver;
    private final NetworkConnectionManager connectionManager;
    private boolean hasRequestedClose = false;

    public OutboundConnectionQueue(Channel channel, RemoteReceiver receiver, NetworkConnectionManager connectionManager, int closeAfterIdleForMs) {
        this.channel = channel;
        this.receiver = receiver;
        this.connectionManager = connectionManager;
        channel.pipeline().addFirst("Outbound Connection Queue", (ChannelHandler)this);
        channel.pipeline().addFirst("Idle State Handler", (ChannelHandler)new IdleStateHandler(0L, 0L, (long)closeAfterIdleForMs, TimeUnit.MILLISECONDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean enqueue(Envelope env) {
        boolean triggerWrite;
        Channel channel = this.channel;
        synchronized (channel) {
            if (this.hasRequestedClose) {
                return false;
            }
            triggerWrite = this.queuedEnvelopes.isEmpty();
            this.queuedEnvelopes.add(env);
        }
        if (triggerWrite) {
            this.channel.pipeline().fireUserEventTriggered((Object)QueueEvent.TRIGGER_WRITE);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        if (event.getClass() == QueueEvent.class) {
            this.writeAndFlushNextEnvelopeIfPossible();
        } else if (event.getClass() == IdleStateEvent.class) {
            boolean closeConnection = false;
            Channel channel = this.channel;
            synchronized (channel) {
                if (this.queuedEnvelopes.isEmpty() && !this.hasRequestedClose) {
                    this.hasRequestedClose = true;
                    closeConnection = true;
                    this.connectionManager.close(this.receiver);
                }
            }
            if (closeConnection) {
                ctx.close().addListener((GenericFutureListener)new ChannelCloseListener());
            }
        } else {
            throw new IllegalStateException("Triggered unknown event.");
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextEnvelopeIfPossible();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumQueuedEnvelopes() {
        Channel channel = this.channel;
        synchronized (channel) {
            return this.queuedEnvelopes.size();
        }
    }

    public String toString() {
        return this.channel.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeAndFlushNextEnvelopeIfPossible() {
        Envelope nextEnvelope = null;
        Channel channel = this.channel;
        synchronized (channel) {
            if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
                nextEnvelope = this.queuedEnvelopes.poll();
            }
        }
        if (nextEnvelope != null) {
            this.channel.writeAndFlush((Object)nextEnvelope).addListener((GenericFutureListener)this.writeListener);
        }
    }

    private void exceptionOccurred(Throwable t) throws Exception {
        LOG.error((Object)String.format("Exception in Channel %s: %s", this.channel, t.getMessage()));
        throw new Exception(t);
    }

    private class ChannelCloseListener
    implements ChannelFutureListener {
        private ChannelCloseListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                OutboundConnectionQueue.this.exceptionOccurred(future.cause() == null ? new Exception("Close failed.") : future.cause());
            }
        }
    }

    private class ChannelWriteListener
    implements ChannelFutureListener {
        private ChannelWriteListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                OutboundConnectionQueue.this.writeAndFlushNextEnvelopeIfPossible();
            } else {
                OutboundConnectionQueue.this.exceptionOccurred(future.cause() == null ? new Exception("Envelope send aborted.") : future.cause());
            }
        }
    }

    private static enum QueueEvent {
        TRIGGER_WRITE;

    }
}

