/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.connector.presto;

import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

public class PinotScatterGatherQueryClient {
    private static final String PRESTO_HOST_PREFIX = "presto-pinot-";
    private final String _prestoHostId;
    private final BrokerMetrics _brokerMetrics;
    private final Queue<QueryRouter> _queryRouters = new ConcurrentLinkedQueue<QueryRouter>();
    private final Config _config;
    private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap<String, AtomicInteger>();

    public PinotScatterGatherQueryClient(Config pinotConfig) {
        this._prestoHostId = this.getDefaultPrestoId();
        PinotMetricUtils.init((PinotConfiguration)new PinotConfiguration((Map)ImmutableMap.of((Object)"factory.className", (Object)NoopPinotMetricFactory.class.getName())));
        this._brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
        this._brokerMetrics.initializeGlobalMeters();
        TlsConfig tlsConfig = this.getTlsConfig(pinotConfig);
        for (int i = 0; i < pinotConfig.getThreadPoolSize(); ++i) {
            this._queryRouters.add(new QueryRouter(String.format("%s-%d", this._prestoHostId, i), this._brokerMetrics, null, tlsConfig));
        }
        this._config = pinotConfig;
    }

    private TlsConfig getTlsConfig(Config pinotConfig) {
        TlsConfig tlsConfig = new TlsConfig();
        tlsConfig.setClientAuthEnabled(pinotConfig.isClientAuthEnabled());
        tlsConfig.setTrustStoreType(pinotConfig.getTrustStoreType());
        tlsConfig.setTrustStorePath(pinotConfig.getTrustStorePath());
        tlsConfig.setTrustStorePassword(pinotConfig.getTrustStorePassword());
        tlsConfig.setKeyStoreType(pinotConfig.getKeyStoreType());
        tlsConfig.setKeyStorePath(pinotConfig.getKeyStorePath());
        tlsConfig.setKeyStorePassword(pinotConfig.getKeyStorePassword());
        tlsConfig.setSslProvider(pinotConfig.getSslProvider());
        return tlsConfig;
    }

    private static <T> T doWithRetries(int retries, Function<Integer, T> caller) {
        PinotException firstError = null;
        for (int i = 0; i < retries; ++i) {
            try {
                return caller.apply(i);
            }
            catch (PinotException e) {
                if (firstError == null) {
                    firstError = e;
                }
                if (e.getErrorCode().isRetriable()) continue;
                throw e;
            }
        }
        throw firstError;
    }

    private String getDefaultPrestoId() {
        String defaultBrokerId;
        try {
            defaultBrokerId = PRESTO_HOST_PREFIX + InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            defaultBrokerId = PRESTO_HOST_PREFIX;
        }
        return defaultBrokerId;
    }

    public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String query, String serverHost, List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) {
        BrokerRequest brokerRequest;
        try {
            brokerRequest = CalciteSqlCompiler.compileToBrokerRequest((String)query);
        }
        catch (Exception e) {
            throw new PinotException(ErrorCode.PINOT_INVALID_SQL_GENERATED, String.format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e);
        }
        HashMap<ServerInstance, ArrayList<String>> routingTable = new HashMap<ServerInstance, ArrayList<String>>();
        routingTable.put(new ServerInstance(new InstanceConfig(serverHost)), new ArrayList<String>(segments));
        Map serverResponseMap = PinotScatterGatherQueryClient.doWithRetries(pinotRetryCount, requestId -> {
            int concurrentQueryNum;
            String rawTableName = TableNameBuilder.extractRawTableName((String)brokerRequest.getQuerySource().getTableName());
            if (!this._concurrentQueriesCountMap.containsKey(serverHost)) {
                this._concurrentQueriesCountMap.put(serverHost, new AtomicInteger(0));
            }
            if ((concurrentQueryNum = this._concurrentQueriesCountMap.get(serverHost).get()) > this._config.getMaxBacklogPerServer()) {
                throw new PinotException(ErrorCode.PINOT_QUERY_BACKLOG_FULL, "Reaching server query max backlog size is - " + this._config.getMaxBacklogPerServer());
            }
            this._concurrentQueriesCountMap.get(serverHost).incrementAndGet();
            QueryRouter nextAvailableQueryRouter = this.getNextAvailableQueryRouter();
            AsyncQueryResponse asyncQueryResponse = TableNameBuilder.getTableTypeFromTableName((String)brokerRequest.getQuerySource().getTableName()) == TableType.REALTIME ? nextAvailableQueryRouter.submitQuery((long)requestId.intValue(), rawTableName, null, null, brokerRequest, routingTable, connectionTimeoutInMillis) : nextAvailableQueryRouter.submitQuery((long)requestId.intValue(), rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis);
            Map<ServerInstance, DataTable> serverInstanceDataTableMap = this.gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse, brokerRequest.getQuerySource().getTableName());
            this._queryRouters.offer(nextAvailableQueryRouter);
            this._concurrentQueriesCountMap.get(serverHost).decrementAndGet();
            return serverInstanceDataTableMap;
        });
        return serverResponseMap;
    }

    private QueryRouter getNextAvailableQueryRouter() {
        QueryRouter queryRouter = this._queryRouters.poll();
        while (queryRouter == null) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            queryRouter = this._queryRouters.poll();
        }
        return queryRouter;
    }

    private Map<ServerInstance, DataTable> gatherServerResponses(boolean ignoreEmptyResponses, Map<ServerInstance, List<String>> routingTable, AsyncQueryResponse asyncQueryResponse, String tableNameWithType) {
        try {
            Map queryResponses = asyncQueryResponse.getFinalResponses();
            if (!ignoreEmptyResponses && queryResponses.size() != routingTable.size()) {
                HashMap routingTableForLogging = new HashMap();
                routingTable.entrySet().forEach(entry -> {
                    String valueToPrint = ((List)entry.getValue()).size() > 10 ? String.format("%d segments", ((List)entry.getValue()).size()) : ((List)entry.getValue()).toString();
                    routingTableForLogging.put(((ServerInstance)entry.getKey()).toString(), valueToPrint);
                });
                throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String.format("%d of %d servers responded with routing table servers: %s, query stats: %s", queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getServerStats()));
            }
            HashMap<ServerInstance, DataTable> serverResponseMap = new HashMap<ServerInstance, DataTable>();
            queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance(new InstanceConfig(String.format("Server_%s_%d", ((ServerRoutingInstance)entry.getKey()).getHostname(), ((ServerRoutingInstance)entry.getKey()).getPort()))), ((ServerResponse)entry.getValue()).getDataTable()));
            return serverResponseMap;
        }
        catch (InterruptedException e) {
            throw new PinotException(ErrorCode.PINOT_UNCLASSIFIED_ERROR, String.format("Caught exception while fetching responses for table: %s", tableNameWithType), e);
        }
    }

    public static class Config {
        private final int _threadPoolSize;
        private final int _maxBacklogPerServer;
        private TlsConfig _tlsConfig = new TlsConfig();
        @Deprecated
        private final long _idleTimeoutMillis;
        @Deprecated
        private final int _minConnectionsPerServer;
        @Deprecated
        private final int _maxConnectionsPerServer;

        public Config(Map<String, Object> pinotConfigs) {
            this._idleTimeoutMillis = Long.parseLong(pinotConfigs.get("idleTimeoutMillis").toString());
            this._threadPoolSize = Integer.parseInt(pinotConfigs.get("threadPoolSize").toString());
            this._minConnectionsPerServer = Integer.parseInt(pinotConfigs.get("minConnectionsPerServer").toString());
            this._maxBacklogPerServer = Integer.parseInt(pinotConfigs.get("maxBacklogPerServer").toString());
            this._maxConnectionsPerServer = Integer.parseInt(pinotConfigs.get("maxConnectionsPerServer").toString());
            this._tlsConfig.setClientAuthEnabled(Boolean.parseBoolean(pinotConfigs.get("isClientAuthEnabled").toString()));
            this._tlsConfig.setTrustStorePath(pinotConfigs.get("trustStorePath").toString());
            this._tlsConfig.setTrustStorePassword(pinotConfigs.get("trustStorePassword").toString());
            this._tlsConfig.setTrustStoreType(pinotConfigs.get("trustStoreType").toString());
            this._tlsConfig.setKeyStorePath(pinotConfigs.get("keyStorePath").toString());
            this._tlsConfig.setKeyStorePassword(pinotConfigs.get("keyStorePassword").toString());
            this._tlsConfig.setKeyStoreType(pinotConfigs.get("keyStoreType").toString());
            this._tlsConfig.setSslProvider(pinotConfigs.get("sslProvider").toString());
        }

        public Config(long idleTimeoutMillis, int threadPoolSize, int minConnectionsPerServer, int maxBacklogPerServer, int maxConnectionsPerServer) {
            this._idleTimeoutMillis = idleTimeoutMillis;
            this._threadPoolSize = threadPoolSize;
            this._minConnectionsPerServer = minConnectionsPerServer;
            this._maxBacklogPerServer = maxBacklogPerServer;
            this._maxConnectionsPerServer = maxConnectionsPerServer;
            this._tlsConfig.setClientAuthEnabled(false);
        }

        public int getThreadPoolSize() {
            return this._threadPoolSize;
        }

        public int getMaxBacklogPerServer() {
            return this._maxBacklogPerServer;
        }

        @Deprecated
        public long getIdleTimeoutMillis() {
            return this._idleTimeoutMillis;
        }

        @Deprecated
        public int getMinConnectionsPerServer() {
            return this._minConnectionsPerServer;
        }

        @Deprecated
        public int getMaxConnectionsPerServer() {
            return this._maxConnectionsPerServer;
        }

        public boolean isClientAuthEnabled() {
            return this._tlsConfig.isClientAuthEnabled();
        }

        public String getTrustStoreType() {
            return this._tlsConfig.getTrustStoreType();
        }

        public String getTrustStorePath() {
            return this._tlsConfig.getTrustStorePath();
        }

        public String getTrustStorePassword() {
            return this._tlsConfig.getTrustStorePassword();
        }

        public String getKeyStoreType() {
            return this._tlsConfig.getKeyStoreType();
        }

        public String getKeyStorePath() {
            return this._tlsConfig.getKeyStorePath();
        }

        public String getKeyStorePassword() {
            return this._tlsConfig.getKeyStorePassword();
        }

        public String getSslProvider() {
            return this._tlsConfig.getSslProvider();
        }
    }

    public static class PinotException
    extends RuntimeException {
        private final ErrorCode _errorCode;

        public PinotException(ErrorCode errorCode, String message, Throwable t) {
            super(message, t);
            this._errorCode = errorCode;
        }

        public PinotException(ErrorCode errorCode, String message) {
            this(errorCode, message, null);
        }

        public ErrorCode getErrorCode() {
            return this._errorCode;
        }
    }

    public static enum ErrorCode {
        PINOT_INSUFFICIENT_SERVER_RESPONSE(true),
        PINOT_INVALID_SQL_GENERATED(false),
        PINOT_UNCLASSIFIED_ERROR(false),
        PINOT_QUERY_BACKLOG_FULL(false);

        private final boolean _retriable;

        private ErrorCode(boolean retriable) {
            this._retriable = retriable;
        }

        public boolean isRetriable() {
            return this._retriable;
        }
    }
}

