/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.control;

import com.carrotsearch.hppc.IntObjectHashMap;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

public class CustomHandlerRegistry {
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final AutoCloseableLock read = new AutoCloseableLock(this.readWriteLock.readLock());
    private final AutoCloseableLock write = new AutoCloseableLock(this.readWriteLock.writeLock());
    private final IntObjectHashMap<ParsingHandler<?, ?>> handlers = new IntObjectHashMap();
    private volatile CoordinationProtos.DrillbitEndpoint endpoint;

    public void setEndpoint(CoordinationProtos.DrillbitEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId, Controller.CustomMessageHandler<REQUEST, RESPONSE> handler, Controller.CustomSerDe<REQUEST> requestSerde, Controller.CustomSerDe<RESPONSE> responseSerde) {
        Preconditions.checkNotNull(handler);
        Preconditions.checkNotNull(requestSerde);
        Preconditions.checkNotNull(responseSerde);
        try (AutoCloseables.Closeable lock = this.write.open();){
            ParsingHandler<REQUEST, RESPONSE> parsingHandler = (ParsingHandler<REQUEST, RESPONSE>)this.handlers.get(messageTypeId);
            if (parsingHandler != null) {
                throw new IllegalStateException(String.format("Only one handler can be registered for a given custom message type. You tried to register a handler for the %d message type but one had already been registered.", messageTypeId));
            }
            parsingHandler = new ParsingHandler<REQUEST, RESPONSE>(handler, requestSerde, responseSerde);
            this.handlers.put(messageTypeId, parsingHandler);
        }
    }

    public Response handle(BitControl.CustomMessage message, DrillBuf dBody) throws RpcException {
        ParsingHandler handler;
        try (AutoCloseables.Closeable lock = this.read.open();){
            handler = (ParsingHandler)this.handlers.get(message.getType());
        }
        if (handler == null) {
            throw new UserRpcException(this.endpoint, "Unable to handle message.", new IllegalStateException(String.format("Unable to handle message. The message type provided [%d] did not have a registered handler.", message.getType())));
        }
        Controller.CustomResponse<?> customResponse = handler.onMessage(message.getMessage(), dBody);
        BitControl.CustomMessage responseMessage = BitControl.CustomMessage.newBuilder().setMessage(ByteString.copyFrom(handler.getResponseSerDe().serializeToSend(customResponse.getMessage()))).setType(message.getType()).build();
        ByteBuf[] dBodies = customResponse.getBodies() == null ? new DrillBuf[]{} : customResponse.getBodies();
        return new Response(BitControl.RpcType.RESP_CUSTOM, responseMessage, dBodies);
    }

    private class ParsingHandler<REQUEST, RESPONSE> {
        private final Controller.CustomMessageHandler<REQUEST, ?> handler;
        private final Controller.CustomSerDe<REQUEST> requestSerde;
        private final Controller.CustomSerDe<RESPONSE> responseSerde;

        public ParsingHandler(Controller.CustomMessageHandler<REQUEST, RESPONSE> handler, Controller.CustomSerDe<REQUEST> requestSerde, Controller.CustomSerDe<RESPONSE> responseSerde) {
            this.handler = handler;
            this.requestSerde = requestSerde;
            this.responseSerde = responseSerde;
        }

        public Controller.CustomSerDe<RESPONSE> getResponseSerDe() {
            return this.responseSerde;
        }

        public Controller.CustomResponse<?> onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException {
            try {
                REQUEST message = this.requestSerde.deserializeReceived(pBody.toByteArray());
                return this.handler.onMessage(message, dBody);
            }
            catch (Exception e) {
                throw new UserRpcException(CustomHandlerRegistry.this.endpoint, "Failure parsing message.", e);
            }
        }
    }
}

