/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestClientHandler
extends ChannelInboundHandlerAdapter
implements NetworkClientHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
    private final AtomicReference<Throwable> channelError = new AtomicReference();
    private final BufferListenerTask bufferListener = new BufferListenerTask();
    private final Queue<Object> stagedMessages = new ArrayDeque<Object>();
    private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask();
    private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
    private volatile ChannelHandlerContext ctx;

    PartitionRequestClientHandler() {
    }

    @Override
    public void addInputChannel(RemoteInputChannel listener) throws IOException {
        this.checkError();
        this.inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
    }

    @Override
    public void removeInputChannel(RemoteInputChannel listener) {
        this.inputChannels.remove((Object)listener.getInputChannelId());
    }

    @Override
    public void cancelRequestFor(InputChannelID inputChannelId) {
        if (inputChannelId == null || this.ctx == null) {
            return;
        }
        if (this.cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
            this.ctx.writeAndFlush((Object)new NettyMessage.CancelPartitionRequest(inputChannelId));
        }
    }

    @Override
    public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelActive(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.inputChannels.isEmpty()) {
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. This might indicate that the remote task manager was lost.", remoteAddr));
        }
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof TransportException) {
            this.notifyAllChannelsOfErrorAndClose(cause);
        } else {
            TransportException tex;
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            if (cause instanceof IOException && cause.getMessage().equals("Connection reset by peer")) {
                tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. This indicates that the remote task manager was lost.", remoteAddr, cause);
            } else {
                SocketAddress localAddr = ctx.channel().localAddress();
                tex = new LocalTransportException(String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr), localAddr, cause);
            }
            this.notifyAllChannelsOfErrorAndClose(tex);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (!this.bufferListener.hasStagedBufferOrEvent() && this.stagedMessages.isEmpty()) {
                this.decodeMsg(msg, false);
            } else {
                this.stagedMessages.add(msg);
            }
        }
        catch (Throwable t) {
            this.notifyAllChannelsOfErrorAndClose(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
        if (this.channelError.compareAndSet(null, cause)) {
            try {
                for (RemoteInputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.onError(cause);
                }
            }
            catch (Throwable t) {
                LOG.warn("An Exception was thrown during error notification of a remote input channel.", t);
            }
            finally {
                this.inputChannels.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    private void checkError() throws IOException {
        Throwable t = this.channelError.get();
        if (t != null) {
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException("There has been an error in the channel.", t);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    private boolean decodeMsg(Object msg, boolean isStagedBuffer) throws Throwable {
        Class<?> msgClazz = msg.getClass();
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse)msg;
            RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();
                this.cancelRequestFor(bufferOrEvent.receiverId);
                return true;
            }
            return this.decodeBufferOrEvent(inputChannel, bufferOrEvent, isStagedBuffer);
        }
        if (msgClazz == NettyMessage.ErrorResponse.class) {
            NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse)msg;
            SocketAddress remoteAddr = this.ctx.channel().remoteAddress();
            if (error.isFatalError()) {
                this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
            } else {
                RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)error.receiverId);
                if (inputChannel != null) {
                    if (error.cause.getClass() == PartitionNotFoundException.class) {
                        inputChannel.onFailedPartitionRequest();
                    } else {
                        inputChannel.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
                    }
                }
            }
        } else {
            throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent, boolean isStagedBuffer) throws Throwable {
        boolean releaseNettyBuffer = true;
        try {
            ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
            int receivedSize = nettyBuffer.readableBytes();
            if (bufferOrEvent.isBuffer()) {
                if (receivedSize == 0) {
                    inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, -1);
                    boolean bl = true;
                    return bl;
                }
                BufferProvider bufferProvider = inputChannel.getBufferProvider();
                if (bufferProvider == null) {
                    this.cancelRequestFor(bufferOrEvent.receiverId);
                    boolean bl = isStagedBuffer;
                    return bl;
                }
                while (true) {
                    Buffer buffer;
                    if ((buffer = bufferProvider.requestBuffer()) != null) {
                        nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
                        inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
                        boolean bl = true;
                        return bl;
                    }
                    if (this.bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
                        releaseNettyBuffer = false;
                        boolean bl = false;
                        return bl;
                    }
                    if (!bufferProvider.isDestroyed()) continue;
                    boolean bl = isStagedBuffer;
                    return bl;
                }
            }
            byte[] byteArray = new byte[receivedSize];
            nettyBuffer.readBytes(byteArray);
            MemorySegment memSeg = MemorySegmentFactory.wrap((byte[])byteArray);
            NetworkBuffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
            inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
            boolean bl = true;
            return bl;
        }
        finally {
            if (releaseNettyBuffer) {
                bufferOrEvent.releaseBuffer();
            }
        }
    }

    public class StagedMessagesHandlerTask
    implements Runnable {
        @Override
        public void run() {
            try {
                Object msg;
                while ((msg = PartitionRequestClientHandler.this.stagedMessages.poll()) != null) {
                    if (PartitionRequestClientHandler.this.decodeMsg(msg, true)) continue;
                    return;
                }
                PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                PartitionRequestClientHandler.this.ctx.channel().read();
            }
            catch (Throwable t) {
                PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
        }
    }

    private class BufferListenerTask
    implements BufferListener,
    Runnable {
        private final AtomicReference<Buffer> availableBuffer = new AtomicReference();
        private NettyMessage.BufferResponse stagedBufferResponse;

        private BufferListenerTask() {
        }

        private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) {
            this.stagedBufferResponse = bufferResponse;
            if (bufferProvider.addBufferListener(this)) {
                if (PartitionRequestClientHandler.this.ctx.channel().config().isAutoRead()) {
                    PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(false);
                }
                return true;
            }
            this.stagedBufferResponse = null;
            return false;
        }

        private boolean hasStagedBufferOrEvent() {
            return this.stagedBufferResponse != null;
        }

        @Override
        public void notifyBufferDestroyed() {
            this.stagedBufferResponse = null;
            if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                PartitionRequestClientHandler.this.ctx.channel().read();
            } else {
                PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)PartitionRequestClientHandler.this.stagedMessagesHandler);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean notifyBufferAvailable(Buffer buffer) {
            block7: {
                boolean success = false;
                try {
                    if (this.availableBuffer.compareAndSet(null, buffer)) {
                        PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)this);
                        success = true;
                        break block7;
                    }
                    throw new IllegalStateException("Received a buffer notification,  but the previous one has not been handled yet.");
                }
                catch (Throwable t) {
                    PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)new AsyncErrorNotificationTask(t));
                }
                finally {
                    if (!success && buffer != null) {
                        buffer.recycleBuffer();
                    }
                }
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean success = false;
            Buffer buffer = null;
            try {
                buffer = this.availableBuffer.getAndSet(null);
                if (buffer == null) {
                    throw new IllegalStateException("Running buffer availability task w/o a buffer.");
                }
                ByteBuf nettyBuffer = this.stagedBufferResponse.getNettyBuffer();
                nettyBuffer.readBytes(buffer.asByteBuf(), nettyBuffer.readableBytes());
                this.stagedBufferResponse.releaseBuffer();
                RemoteInputChannel inputChannel = (RemoteInputChannel)PartitionRequestClientHandler.this.inputChannels.get((Object)this.stagedBufferResponse.receiverId);
                if (inputChannel != null) {
                    inputChannel.onBuffer(buffer, this.stagedBufferResponse.sequenceNumber, -1);
                    success = true;
                } else {
                    PartitionRequestClientHandler.this.cancelRequestFor(this.stagedBufferResponse.receiverId);
                }
                this.stagedBufferResponse = null;
                if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                    PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                    PartitionRequestClientHandler.this.ctx.channel().read();
                } else {
                    PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)PartitionRequestClientHandler.this.stagedMessagesHandler);
                }
            }
            catch (Throwable t) {
                PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
            finally {
                if (!success && buffer != null) {
                    buffer.recycleBuffer();
                }
            }
        }
    }

    private class AsyncErrorNotificationTask
    implements Runnable {
        private final Throwable error;

        public AsyncErrorNotificationTask(Throwable error) {
            this.error = error;
        }

        @Override
        public void run() {
            PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(this.error);
        }
    }
}

