/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.AdaptiveLifoCoDelCallQueue;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PluggableRpcQueueNotFound;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QueueBalancer;
import org.apache.hadoop.hbase.ipc.RandomQueueBalancer;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"Coprocesssor", "Phoenix"})
@InterfaceStability.Evolving
public abstract class RpcExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
    protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
    public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
    public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
    public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
    public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
    public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable";
    public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
    public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = "fifo";
    public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
    public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
    public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
    public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
    public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
    public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
    public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
    public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
    public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = "hbase.ipc.server.callqueue.pluggable.queue.class.name";
    public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
    private final LongAdder numGeneralCallsDropped = new LongAdder();
    private final LongAdder numLifoModeSwitches = new LongAdder();
    protected final int numCallQueues;
    protected final List<BlockingQueue<CallRunner>> queues;
    private final Class<? extends BlockingQueue> queueClass;
    private final Object[] queueInitArgs;
    protected volatile int currentQueueLimit;
    private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
    private final List<RpcHandler> handlers;
    private final int handlerCount;
    private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
    private String name;
    private final Configuration conf;
    private final Abortable abortable;
    private static final QueueBalancer ONE_QUEUE = val -> 0;

    public RpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) {
        this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, "fifo"), maxQueueLength, priority, conf, abortable);
    }

    public RpcExecutor(String name, int handlerCount, String callQueueType, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) {
        this.name = Strings.nullToEmpty(name);
        this.conf = conf;
        this.abortable = abortable;
        float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
        if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0 || Float.compare(0.0f, callQueuesHandlersFactor) > 0) {
            LOG.warn("hbase.ipc.server.callqueue.handler.factor is *ILLEGAL*, it should be in range [0.0, 1.0]");
            if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) {
                LOG.warn("Set hbase.ipc.server.callqueue.handler.factor 1.0f");
                callQueuesHandlersFactor = 1.0f;
            } else {
                LOG.warn("Set hbase.ipc.server.callqueue.handler.factor default value 0.0f");
            }
        }
        this.numCallQueues = this.computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
        this.queues = new ArrayList<BlockingQueue<CallRunner>>(this.numCallQueues);
        this.handlerCount = Math.max(handlerCount, this.numCallQueues);
        this.handlers = new ArrayList<RpcHandler>(this.handlerCount);
        if (RpcExecutor.isDeadlineQueueType(callQueueType)) {
            this.name = this.name + ".Deadline";
            this.queueInitArgs = new Object[]{maxQueueLength, new CallPriorityComparator(conf, priority)};
            this.queueClass = BoundedPriorityBlockingQueue.class;
        } else if (RpcExecutor.isCodelQueueType(callQueueType)) {
            this.name = this.name + ".Codel";
            int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 100);
            int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, 100);
            double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
            this.queueInitArgs = new Object[]{maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, this.numGeneralCallsDropped, this.numLifoModeSwitches};
            this.queueClass = AdaptiveLifoCoDelCallQueue.class;
        } else if (RpcExecutor.isPluggableQueueType(callQueueType)) {
            Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = this.getPluggableQueueClass();
            if (!pluggableQueueClass.isPresent()) {
                throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call queue type required");
            }
            this.queueInitArgs = new Object[]{maxQueueLength, priority, conf};
            this.queueClass = pluggableQueueClass.get();
        } else {
            this.name = this.name + ".Fifo";
            this.queueInitArgs = new Object[]{maxQueueLength};
            this.queueClass = LinkedBlockingQueue.class;
        }
        LOG.info("Instantiated {} with queueClass={}; numCallQueues={}, maxQueueLength={}, handlerCount={}", new Object[]{this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount});
    }

    protected int computeNumCallQueues(int handlerCount, float callQueuesHandlersFactor) {
        return Math.max(1, Math.round((float)handlerCount * callQueuesHandlersFactor));
    }

    private static String getMethodName(CallRunner callRunner) {
        return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getMethod).map(Descriptors.MethodDescriptor::getName).orElse("Unknown");
    }

    private static long getRpcCallSize(CallRunner callRunner) {
        return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getSize).orElse(0L);
    }

    public Map<String, Long> getCallQueueCountsSummary() {
        return this.queues.stream().flatMap(Collection::stream).map(RpcExecutor::getMethodName).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
    }

    public Map<String, Long> getCallQueueSizeSummary() {
        return this.queues.stream().flatMap(Collection::stream).map(callRunner -> new Pair<String, Long>(RpcExecutor.getMethodName(callRunner), RpcExecutor.getRpcCallSize(callRunner))).collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
    }

    protected void initializeQueues(int numQueues) {
        if (this.queueInitArgs.length > 0) {
            this.currentQueueLimit = (Integer)this.queueInitArgs[0];
            this.queueInitArgs[0] = Math.max((Integer)this.queueInitArgs[0], 250);
        }
        for (int i = 0; i < numQueues; ++i) {
            this.queues.add(ReflectionUtils.newInstance(this.queueClass, this.queueInitArgs));
        }
    }

    public void start(int port) {
        this.startHandlers(port);
    }

    public void stop() {
        for (RpcHandler handler : this.handlers) {
            handler.stopRunning();
            handler.interrupt();
        }
    }

    public abstract boolean dispatch(CallRunner var1);

    protected List<BlockingQueue<CallRunner>> getQueues() {
        return this.queues;
    }

    protected void startHandlers(int port) {
        List<BlockingQueue<CallRunner>> callQueues = this.getQueues();
        this.startHandlers(null, this.handlerCount, callQueues, 0, callQueues.size(), port, this.activeHandlerCount);
    }

    protected RpcHandler getHandler(String name, double handlerFailureThreshhold, int handlerCount, BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount, AtomicInteger failedHandlerCount, Abortable abortable) {
        return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount, abortable);
    }

    protected void startHandlers(String nameSuffix, int numHandlers, List<BlockingQueue<CallRunner>> callQueues, int qindex, int qsize, int port, AtomicInteger activeHandlerCount) {
        String threadPrefix = this.name + Strings.nullToEmpty(nameSuffix);
        double handlerFailureThreshhold = this.conf == null ? 1.0 : this.conf.getDouble("hbase.regionserver.handler.abort.on.error.percent", 0.5);
        for (int i = 0; i < numHandlers; ++i) {
            int index = qindex + i % qsize;
            String name = "RpcServer." + threadPrefix + ".handler=" + this.handlers.size() + ",queue=" + index + ",port=" + port;
            RpcHandler handler = this.getHandler(name, handlerFailureThreshhold, this.handlerCount, callQueues.get(index), activeHandlerCount, this.failedHandlerCount, this.abortable);
            handler.start();
            this.handlers.add(handler);
        }
        LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", new Object[]{this.handlers.size(), threadPrefix, qsize, port});
    }

    public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
        Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
        if (queues.size() == 1) {
            return ONE_QUEUE;
        }
        Class balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
        return (QueueBalancer)ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
    }

    public static boolean isDeadlineQueueType(String callQueueType) {
        return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
    }

    public static boolean isCodelQueueType(String callQueueType) {
        return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
    }

    public static boolean isFifoQueueType(String callQueueType) {
        return callQueueType.equals("fifo");
    }

    public static boolean isPluggableQueueType(String callQueueType) {
        return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
    }

    public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) {
        return RpcExecutor.isPluggableQueueType(callQueueType) && conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false);
    }

    private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
        String queueClassName = this.conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
        if (queueClassName == null) {
            LOG.error("Pluggable queue class config at hbase.ipc.server.callqueue.pluggable.queue.class.name was not found");
            return Optional.empty();
        }
        try {
            Class<?> clazz = Class.forName(queueClassName);
            if (BlockingQueue.class.isAssignableFrom(clazz)) {
                return Optional.of(clazz);
            }
            LOG.error("Pluggable Queue class " + queueClassName + " does not extend BlockingQueue<CallRunner>");
            return Optional.empty();
        }
        catch (ClassNotFoundException exception) {
            LOG.error("Could not find " + queueClassName + " on the classpath to load.");
            return Optional.empty();
        }
    }

    public long getNumGeneralCallsDropped() {
        return this.numGeneralCallsDropped.longValue();
    }

    public long getNumLifoModeSwitches() {
        return this.numLifoModeSwitches.longValue();
    }

    public int getActiveHandlerCount() {
        return this.activeHandlerCount.get();
    }

    public int getActiveWriteHandlerCount() {
        return 0;
    }

    public int getActiveReadHandlerCount() {
        return 0;
    }

    public int getActiveScanHandlerCount() {
        return 0;
    }

    public int getQueueLength() {
        int length = 0;
        for (BlockingQueue<CallRunner> queue : this.queues) {
            length += queue.size();
        }
        return length;
    }

    public int getReadQueueLength() {
        return 0;
    }

    public int getScanQueueLength() {
        return 0;
    }

    public int getWriteQueueLength() {
        return 0;
    }

    public String getName() {
        return this.name;
    }

    public void resizeQueues(Configuration conf) {
        String configKey = "hbase.ipc.server.max.callqueue.length";
        if (this.name != null) {
            if (this.name.toLowerCase(Locale.ROOT).contains("priority")) {
                configKey = "hbase.ipc.server.priority.max.callqueue.length";
            } else if (this.name.toLowerCase(Locale.ROOT).contains("replication")) {
                configKey = "hbase.ipc.server.replication.max.callqueue.length";
            }
        }
        int queueLimit = this.currentQueueLimit;
        this.currentQueueLimit = conf.getInt(configKey, queueLimit);
    }

    public void onConfigurationChange(Configuration conf) {
        int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 100);
        int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, 100);
        double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
        for (BlockingQueue<CallRunner> queue : this.queues) {
            if (queue instanceof AdaptiveLifoCoDelCallQueue) {
                ((AdaptiveLifoCoDelCallQueue)queue).updateTunables(codelTargetDelay, codelInterval, codelLifoThreshold);
                continue;
            }
            if (!(queue instanceof ConfigurationObserver)) continue;
            ((ConfigurationObserver)((Object)queue)).onConfigurationChange(conf);
        }
    }

    private static class CallPriorityComparator
    implements Comparator<CallRunner> {
        private static final int DEFAULT_MAX_CALL_DELAY = 5000;
        private final PriorityFunction priority;
        private final int maxDelay;

        public CallPriorityComparator(Configuration conf, PriorityFunction priority) {
            this.priority = priority;
            this.maxDelay = conf.getInt(RpcExecutor.QUEUE_MAX_CALL_DELAY_CONF_KEY, 5000);
        }

        @Override
        public int compare(CallRunner a, CallRunner b) {
            RpcCall callA = a.getRpcCall();
            RpcCall callB = b.getRpcCall();
            long deadlineA = this.priority.getDeadline(callA.getHeader(), callA.getParam());
            long deadlineB = this.priority.getDeadline(callB.getHeader(), callB.getParam());
            deadlineA = callA.getReceiveTime() + Math.min(deadlineA, (long)this.maxDelay);
            deadlineB = callB.getReceiveTime() + Math.min(deadlineB, (long)this.maxDelay);
            return Long.compare(deadlineA, deadlineB);
        }
    }
}

