/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.data;

import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.rpc.DynamicSemaphore;
import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataClientConnection;
import org.apache.drill.exec.rpc.data.DataConnectionManager;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataTunnel {
    static final Logger logger = LoggerFactory.getLogger(DataTunnel.class);
    private final DataConnectionManager manager;
    private final DynamicSemaphore sendingSemaphore = new DynamicSemaphore();
    private boolean isInjectionControlSet;
    private ControlsInjector testInjector;
    private ExecutionControls testControls;
    private Logger testLogger;

    public DataTunnel(DataConnectionManager manager) {
        this.manager = manager;
    }

    public void setTestInjectionControls(ControlsInjector testInjector, ExecutionControls testControls, Logger testLogger) {
        this.isInjectionControlSet = true;
        this.testInjector = testInjector;
        this.testControls = testControls;
        this.testLogger = testLogger;
    }

    public void sendRecordBatch(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, FragmentWritableBatch batch) {
        SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
        try {
            if (this.isInjectionControlSet) {
                this.testInjector.injectInterruptiblePause(this.testControls, "data-tunnel-send-batch-wait-for-interrupt", this.testLogger);
            }
            this.sendingSemaphore.acquire();
            this.manager.runCommand(b);
        }
        catch (InterruptedException e) {
            for (ByteBuf buffer : batch.getBuffers()) {
                buffer.release();
            }
            outcomeListener.interrupted(e);
            Thread.currentThread().interrupt();
        }
    }

    public void sendRuntimeFilter(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, RuntimeFilterWritable runtimeFilter) {
        SendRuntimeFilterAsyncListen cmd = new SendRuntimeFilterAsyncListen(outcomeListener, runtimeFilter);
        try {
            if (this.isInjectionControlSet) {
                this.testInjector.injectInterruptiblePause(this.testControls, "data-tunnel-send-runtime_filter-wait-for-interrupt", this.testLogger);
            }
            this.manager.runCommand(cmd);
        }
        catch (InterruptedException e) {
            runtimeFilter.close();
            outcomeListener.interrupted(e);
            Thread.currentThread().interrupt();
        }
    }

    private class SendBatchAsyncListen
    extends ListeningCommand<BitData.AckWithCredit, DataClientConnection, BitData.RpcType, MessageLite> {
        final FragmentWritableBatch batch;

        public SendBatchAsyncListen(RpcOutcomeListener<BitData.AckWithCredit> listener, FragmentWritableBatch batch) {
            super(listener);
            this.batch = batch;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, DataClientConnection connection) {
            connection.send(new ThrottlingOutcomeListener(outcomeListener), this.getRpcType(), this.batch.getHeader(), BitData.AckWithCredit.class, this.batch.getBuffers());
        }

        @Override
        public BitData.RpcType getRpcType() {
            return BitData.RpcType.REQ_RECORD_BATCH;
        }

        @Override
        public MessageLite getMessage() {
            return this.batch.getHeader();
        }

        public String toString() {
            return "SendBatch [batch.header=" + this.batch.getHeader() + "]";
        }

        @Override
        public void connectionFailed(RpcConnectionHandler.FailureType type, Throwable t) {
            for (ByteBuf buffer : this.batch.getBuffers()) {
                buffer.release();
            }
            super.connectionFailed(type, t);
        }
    }

    private class SendRuntimeFilterAsyncListen
    extends ListeningCommand<BitData.AckWithCredit, DataClientConnection, BitData.RpcType, MessageLite> {
        final RuntimeFilterWritable runtimeFilter;

        public SendRuntimeFilterAsyncListen(RpcOutcomeListener<BitData.AckWithCredit> listener, RuntimeFilterWritable runtimeFilter) {
            super(listener);
            this.runtimeFilter = runtimeFilter;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, DataClientConnection connection) {
            connection.send(outcomeListener, BitData.RpcType.REQ_RUNTIME_FILTER, this.runtimeFilter.getRuntimeFilterBDef(), BitData.AckWithCredit.class, this.runtimeFilter.getData());
        }

        @Override
        public BitData.RpcType getRpcType() {
            return BitData.RpcType.REQ_RUNTIME_FILTER;
        }

        @Override
        public MessageLite getMessage() {
            return this.runtimeFilter.getRuntimeFilterBDef();
        }

        @Override
        public void connectionFailed(RpcConnectionHandler.FailureType type, Throwable t) {
            this.runtimeFilter.close();
            super.connectionFailed(type, t);
        }
    }

    private class ThrottlingOutcomeListener
    implements RpcOutcomeListener<BitData.AckWithCredit> {
        RpcOutcomeListener<BitData.AckWithCredit> inner;

        public ThrottlingOutcomeListener(RpcOutcomeListener<BitData.AckWithCredit> inner) {
            this.inner = inner;
        }

        @Override
        public void failed(RpcException ex) {
            DataTunnel.this.sendingSemaphore.release();
            this.inner.failed(ex);
        }

        @Override
        public void success(BitData.AckWithCredit value, ByteBuf buffer) {
            int credit = value.getAllowedCredit();
            if (credit > 0) {
                DataTunnel.this.sendingSemaphore.tryToIncreaseCredit(credit);
            }
            DataTunnel.this.sendingSemaphore.release();
            this.inner.success(value, buffer);
        }

        @Override
        public void interrupted(InterruptedException e) {
            DataTunnel.this.sendingSemaphore.release();
            this.inner.interrupted(e);
        }
    }
}

