/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.control;

import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import java.io.Closeable;
import java.util.ArrayList;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.rpc.control.ConnectionManagerRegistry;
import org.apache.drill.exec.rpc.control.ControlConnectionConfig;
import org.apache.drill.exec.rpc.control.ControlConnectionManager;
import org.apache.drill.exec.rpc.control.ControlRpcMetrics;
import org.apache.drill.exec.rpc.control.ControlServer;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

public class ControllerImpl
implements Controller {
    private volatile ControlServer server;
    private final ConnectionManagerRegistry connectionRegistry;
    private final CustomHandlerRegistry handlerRegistry;
    private final ControlConnectionConfig config;

    public ControllerImpl(BootStrapContext context, BufferAllocator allocator, ControlMessageHandler handler) throws DrillbitStartupException {
        this.config = new ControlConnectionConfig(allocator, context, handler);
        this.connectionRegistry = new ConnectionManagerRegistry(this.config);
        this.handlerRegistry = handler.getHandlerRegistry();
        ((ControlRpcMetrics)ControlRpcMetrics.getInstance()).initialize(this.config.isEncryptionEnabled(), allocator);
    }

    @Override
    public CoordinationProtos.DrillbitEndpoint start(CoordinationProtos.DrillbitEndpoint partialEndpoint, boolean allowPortHunting) {
        this.server = new ControlServer(this.config, this.connectionRegistry);
        DrillConfig drillConfig = this.config.getBootstrapContext().getConfig();
        String bindAddr = drillConfig.getString("drill.exec.rpc.bind_addr");
        int port = drillConfig.getInt("drill.exec.rpc.bit.server.port");
        port = this.server.bind(bindAddr, port, allowPortHunting);
        CoordinationProtos.DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
        this.connectionRegistry.setLocalEndpoint(completeEndpoint);
        this.handlerRegistry.setEndpoint(completeEndpoint);
        return completeEndpoint;
    }

    @Override
    public ControlTunnel getTunnel(CoordinationProtos.DrillbitEndpoint endpoint) {
        return new ControlTunnel(this.connectionRegistry.getConnectionManager(endpoint));
    }

    @Override
    public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId, Controller.CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser) {
        this.handlerRegistry.registerCustomHandler(messageTypeId, handler, new ControlTunnel.ProtoSerDe<REQUEST>(parser), new ControlTunnel.ProtoSerDe(null));
    }

    @Override
    public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId, Controller.CustomMessageHandler<REQUEST, RESPONSE> handler, Controller.CustomSerDe<REQUEST> requestSerde, Controller.CustomSerDe<RESPONSE> responseSerde) {
        this.handlerRegistry.registerCustomHandler(messageTypeId, handler, requestSerde, responseSerde);
    }

    @Override
    public void close() throws Exception {
        ArrayList<Closeable> closeables = Lists.newArrayList();
        closeables.add(this.server);
        for (ControlConnectionManager bt : this.connectionRegistry) {
            closeables.add(bt);
        }
        AutoCloseables.close(closeables);
    }
}

