/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.control;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.util.JacksonUtils;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.FutureBitCommand;
import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlConnectionManager;
import org.apache.drill.exec.rpc.control.Controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlTunnel {
    static final Logger logger = LoggerFactory.getLogger(ControlTunnel.class);
    private final ControlConnectionManager manager;

    public ControlTunnel(ControlConnectionManager manager) {
        this.manager = manager;
    }

    public void sendFragments(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, BitControl.InitializeFragments fragments) {
        SendFragment b = new SendFragment(outcomeListener, fragments);
        this.manager.runCommand(b);
    }

    public void cancelFragment(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ExecProtos.FragmentHandle handle) {
        SignalFragment b = new SignalFragment(outcomeListener, handle, BitControl.RpcType.REQ_CANCEL_FRAGMENT);
        this.manager.runCommand(b);
    }

    public void unpauseFragment(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ExecProtos.FragmentHandle handle) {
        SignalFragment b = new SignalFragment(outcomeListener, handle, BitControl.RpcType.REQ_UNPAUSE_FRAGMENT);
        this.manager.runCommand(b);
    }

    public DrillRpcFuture<GeneralRPCProtos.Ack> requestCancelQuery(UserBitShared.QueryId queryId) {
        CancelQuery c = new CancelQuery(queryId);
        this.manager.runCommand(c);
        return c.getFuture();
    }

    public void informReceiverFinished(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, BitControl.FinishedReceiver finishedReceiver) {
        ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver);
        this.manager.runCommand(b);
    }

    public DrillRpcFuture<GeneralRPCProtos.Ack> sendFragmentStatus(BitControl.FragmentStatus status) {
        SendFragmentStatus b = new SendFragmentStatus(status);
        this.manager.runCommand(b);
        return b.getFuture();
    }

    public DrillRpcFuture<UserBitShared.QueryProfile> requestQueryProfile(UserBitShared.QueryId queryId) {
        RequestProfile b = new RequestProfile(queryId);
        this.manager.runCommand(b);
        return b.getFuture();
    }

    public <SEND extends MessageLite, RECEIVE extends MessageLite> CustomTunnel<SEND, RECEIVE> getCustomTunnel(int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
        return new CustomTunnel(messageTypeId, new ProtoSerDe(null), new ProtoSerDe<RECEIVE>(parser));
    }

    public <SEND, RECEIVE> CustomTunnel<SEND, RECEIVE> getCustomTunnel(int messageTypeId, Controller.CustomSerDe<SEND> send, Controller.CustomSerDe<RECEIVE> receive) {
        return new CustomTunnel(messageTypeId, send, receive);
    }

    public static class SendFragment
    extends ListeningCommand<GeneralRPCProtos.Ack, ControlConnection, BitControl.RpcType, BitControl.InitializeFragments> {
        final BitControl.InitializeFragments fragments;

        public SendFragment(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, BitControl.InitializeFragments fragments) {
            super(listener);
            this.fragments = fragments;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.fragments, GeneralRPCProtos.Ack.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS;
        }

        @Override
        public BitControl.InitializeFragments getMessage() {
            return this.fragments;
        }
    }

    public static class SignalFragment
    extends ListeningCommand<GeneralRPCProtos.Ack, ControlConnection, BitControl.RpcType, ExecProtos.FragmentHandle> {
        final ExecProtos.FragmentHandle handle;
        final BitControl.RpcType type;

        public SignalFragment(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, ExecProtos.FragmentHandle handle, BitControl.RpcType type) {
            super(listener);
            this.handle = handle;
            this.type = type;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ControlConnection connection) {
            connection.sendUnsafe(outcomeListener, this.type, this.handle, GeneralRPCProtos.Ack.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return this.type;
        }

        @Override
        public ExecProtos.FragmentHandle getMessage() {
            return this.handle;
        }
    }

    public static class CancelQuery
    extends FutureBitCommand<GeneralRPCProtos.Ack, ControlConnection, BitControl.RpcType, UserBitShared.QueryId> {
        final UserBitShared.QueryId queryId;

        public CancelQuery(UserBitShared.QueryId queryId) {
            this.queryId = queryId;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.queryId, GeneralRPCProtos.Ack.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_QUERY_CANCEL;
        }

        @Override
        public UserBitShared.QueryId getMessage() {
            return this.queryId;
        }
    }

    public static class ReceiverFinished
    extends ListeningCommand<GeneralRPCProtos.Ack, ControlConnection, BitControl.RpcType, BitControl.FinishedReceiver> {
        final BitControl.FinishedReceiver finishedReceiver;

        public ReceiverFinished(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, BitControl.FinishedReceiver finishedReceiver) {
            super(listener);
            this.finishedReceiver = finishedReceiver;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.finishedReceiver, GeneralRPCProtos.Ack.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_RECEIVER_FINISHED;
        }

        @Override
        public BitControl.FinishedReceiver getMessage() {
            return this.finishedReceiver;
        }
    }

    public static class SendFragmentStatus
    extends FutureBitCommand<GeneralRPCProtos.Ack, ControlConnection, BitControl.RpcType, BitControl.FragmentStatus> {
        final BitControl.FragmentStatus status;

        public SendFragmentStatus(BitControl.FragmentStatus status) {
            this.status = status;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, ControlConnection connection) {
            connection.sendUnsafe(outcomeListener, this.getRpcType(), this.status, GeneralRPCProtos.Ack.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_FRAGMENT_STATUS;
        }

        @Override
        public BitControl.FragmentStatus getMessage() {
            return this.status;
        }
    }

    public static class RequestProfile
    extends FutureBitCommand<UserBitShared.QueryProfile, ControlConnection, BitControl.RpcType, UserBitShared.QueryId> {
        final UserBitShared.QueryId queryId;

        public RequestProfile(UserBitShared.QueryId queryId) {
            this.queryId = queryId;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.queryId, UserBitShared.QueryProfile.class, new ByteBuf[0]);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_QUERY_STATUS;
        }

        @Override
        public UserBitShared.QueryId getMessage() {
            return this.queryId;
        }
    }

    public class CustomTunnel<SEND, RECEIVE> {
        private int messageTypeId;
        private Controller.CustomSerDe<SEND> send;
        private Controller.CustomSerDe<RECEIVE> receive;

        private CustomTunnel(int messageTypeId, Controller.CustomSerDe<SEND> send, Controller.CustomSerDe<RECEIVE> receive) {
            this.messageTypeId = messageTypeId;
            this.send = send;
            this.receive = receive;
        }

        public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf ... dataBodies) {
            BitControl.CustomMessage customMessage = BitControl.CustomMessage.newBuilder().setMessage(ByteString.copyFrom(this.send.serializeToSend(messageToSend))).setType(this.messageTypeId).build();
            SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
            ControlTunnel.this.manager.runCommand(b);
            DrillRpcFuture<BitControl.CustomMessage> innerFuture = b.getFuture();
            return new CustomFuture<RECEIVE>(this.receive, innerFuture);
        }

        public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf ... dataBodies) {
            BitControl.CustomMessage customMessage = BitControl.CustomMessage.newBuilder().setMessage(ByteString.copyFrom(this.send.serializeToSend(messageToSend))).setType(this.messageTypeId).build();
            ControlTunnel.this.manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage, dataBodies));
        }

        private class CustomTunnelListener
        implements RpcOutcomeListener<BitControl.CustomMessage> {
            final RpcOutcomeListener<RECEIVE> innerListener;

            public CustomTunnelListener(RpcOutcomeListener<RECEIVE> innerListener) {
                this.innerListener = innerListener;
            }

            @Override
            public void failed(RpcException ex) {
                this.innerListener.failed(ex);
            }

            @Override
            public void success(BitControl.CustomMessage value, ByteBuf buffer) {
                try {
                    Object message = CustomTunnel.this.receive.deserializeReceived(value.getMessage().toByteArray());
                    this.innerListener.success(message, buffer);
                }
                catch (Exception e) {
                    this.innerListener.failed(new RpcException("Failure while parsing message locally.", e));
                }
            }

            @Override
            public void interrupted(InterruptedException e) {
                this.innerListener.interrupted(e);
            }
        }
    }

    public static class ProtoSerDe<MSG extends MessageLite>
    implements Controller.CustomSerDe<MSG> {
        private final Parser<MSG> parser;

        ProtoSerDe(Parser<MSG> parser) {
            this.parser = parser;
        }

        @Override
        public byte[] serializeToSend(MSG send) {
            return send.toByteArray();
        }

        @Override
        public MSG deserializeReceived(byte[] bytes) throws Exception {
            return (MSG)((MessageLite)this.parser.parseFrom(bytes));
        }
    }

    public static class JacksonSerDe<MSG>
    implements Controller.CustomSerDe<MSG> {
        private final ObjectWriter writer;
        private final ObjectReader reader;

        public JacksonSerDe(Class<MSG> clazz) {
            ObjectMapper mapper = JacksonUtils.createObjectMapper();
            this.writer = mapper.writerFor(clazz);
            this.reader = mapper.readerFor(clazz);
        }

        public JacksonSerDe(Class<MSG> clazz, JsonSerializer<MSG> serializer, JsonDeserializer<MSG> deserializer) {
            SimpleModule module = new SimpleModule();
            module.addSerializer(clazz, serializer);
            module.addDeserializer(clazz, deserializer);
            ObjectMapper mapper = ((JsonMapper.Builder)JacksonUtils.createJsonMapperBuilder().addModule((Module)module)).build();
            this.writer = mapper.writerFor(clazz);
            this.reader = mapper.readerFor(clazz);
        }

        @Override
        public byte[] serializeToSend(MSG send) {
            try {
                return this.writer.writeValueAsBytes(send);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public MSG deserializeReceived(byte[] bytes) throws Exception {
            return (MSG)this.reader.readValue(bytes);
        }
    }

    public class CustomFuture<RECEIVE> {
        private final Controller.CustomSerDe<RECEIVE> serde;
        private final DrillRpcFuture<BitControl.CustomMessage> future;

        public CustomFuture(Controller.CustomSerDe<RECEIVE> serde, DrillRpcFuture<BitControl.CustomMessage> future) {
            this.serde = serde;
            this.future = future;
        }

        public RECEIVE get() throws Exception {
            BitControl.CustomMessage message = (BitControl.CustomMessage)this.future.checkedGet();
            return this.serde.deserializeReceived(message.getMessage().toByteArray());
        }

        public RECEIVE get(long timeout, TimeUnit unit) throws Exception, InvalidProtocolBufferException {
            BitControl.CustomMessage message = (BitControl.CustomMessage)this.future.checkedGet(timeout, unit);
            return this.serde.deserializeReceived(message.getMessage().toByteArray());
        }

        public DrillBuf getBuffer() throws RpcException {
            return (DrillBuf)this.future.getBuffer();
        }
    }

    public static class SyncCustomMessageSender
    extends FutureBitCommand<BitControl.CustomMessage, ControlConnection, BitControl.RpcType, BitControl.CustomMessage> {
        private BitControl.CustomMessage message;
        private ByteBuf[] dataBodies;

        public SyncCustomMessageSender(BitControl.CustomMessage message, ByteBuf[] dataBodies) {
            this.message = message;
            this.dataBodies = dataBodies;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<BitControl.CustomMessage> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.message, BitControl.CustomMessage.class, this.dataBodies);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_CUSTOM;
        }

        @Override
        public BitControl.CustomMessage getMessage() {
            return this.message;
        }

        public ByteBuf[] getDataBodies() {
            return this.dataBodies;
        }
    }

    public static class CustomMessageSender
    extends ListeningCommand<BitControl.CustomMessage, ControlConnection, BitControl.RpcType, BitControl.CustomMessage> {
        private BitControl.CustomMessage message;
        private ByteBuf[] dataBodies;

        public CustomMessageSender(RpcOutcomeListener<BitControl.CustomMessage> listener, BitControl.CustomMessage message, ByteBuf[] dataBodies) {
            super(listener);
            this.message = message;
            this.dataBodies = dataBodies;
        }

        @Override
        public void doRpcCall(RpcOutcomeListener<BitControl.CustomMessage> outcomeListener, ControlConnection connection) {
            connection.send(outcomeListener, this.getRpcType(), this.message, BitControl.CustomMessage.class, this.dataBodies);
        }

        @Override
        public BitControl.RpcType getRpcType() {
            return BitControl.RpcType.REQ_CUSTOM;
        }

        @Override
        public BitControl.CustomMessage getMessage() {
            return this.message;
        }

        public ByteBuf[] getDataBodies() {
            return this.dataBodies;
        }
    }
}

