/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.foreman;

import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.predicates.IntObjectPredicate;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.FragmentData;
import org.apache.drill.exec.work.foreman.FragmentStatusListener;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryManager
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(QueryManager.class);
    private final Map<CoordinationProtos.DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
    private final UserBitShared.QueryId queryId;
    private final String stringQueryId;
    private final UserProtos.RunQuery runQuery;
    private final Foreman foreman;
    private final IntObjectHashMap<IntObjectHashMap<FragmentData>> fragmentDataMap = new IntObjectHashMap();
    private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
    private final PersistentStore<UserBitShared.QueryProfile> completedProfileStore;
    private final TransientStore<UserBitShared.QueryInfo> runningProfileStore;
    private String planText;
    private PlanProperties planProps;
    private long startTime = System.currentTimeMillis();
    private long endTime;
    private long planningEndTime;
    private long queueWaitEndTime;
    private final AtomicInteger finishedNodes = new AtomicInteger(0);
    private final AtomicInteger finishedFragments = new AtomicInteger(0);
    private boolean inTransientStore;
    private double totalCost;
    private String queueName;
    private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener(){

        @Override
        public void statusUpdate(BitControl.FragmentStatus status) {
            logger.debug("New fragment status was provided to QueryManager of {}", (Object)status);
            switch (status.getProfile().getState()) {
                case AWAITING_ALLOCATION: 
                case RUNNING: 
                case CANCELLATION_REQUESTED: {
                    QueryManager.this.updateFragmentStatus(status);
                    break;
                }
                case FAILED: {
                    QueryManager.this.foreman.addToEventQueue(UserBitShared.QueryResult.QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
                }
                case FINISHED: 
                case CANCELLED: {
                    QueryManager.this.fragmentDone(status);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("Received status of %s", status));
                }
            }
        }
    };
    private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){

        @Override
        public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
        }

        @Override
        public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
            StringBuilder failedNodeList = new StringBuilder();
            boolean atLeastOneFailure = false;
            for (CoordinationProtos.DrillbitEndpoint ep : unregisteredDrillbits) {
                NodeTracker tracker = (NodeTracker)QueryManager.this.nodeMap.get(ep);
                if (tracker == null || !tracker.nodeDead()) continue;
                if (atLeastOneFailure) {
                    failedNodeList.append(", ");
                } else {
                    atLeastOneFailure = true;
                }
                failedNodeList.append(ep.getAddress());
                failedNodeList.append(":");
                failedNodeList.append(ep.getUserPort());
            }
            if (atLeastOneFailure) {
                logger.warn("Drillbits [{}] no longer registered in cluster.  Canceling query {}", (Object)failedNodeList, (Object)QueryIdHelper.getQueryId(QueryManager.this.queryId));
                QueryManager.this.foreman.addToEventQueue(UserBitShared.QueryResult.QueryState.FAILED, new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].", failedNodeList)));
            }
        }
    };

    public QueryManager(UserBitShared.QueryId queryId, UserProtos.RunQuery runQuery, PersistentStoreProvider storeProvider, ClusterCoordinator coordinator, Foreman foreman) {
        this.queryId = queryId;
        this.runQuery = runQuery;
        this.foreman = foreman;
        this.stringQueryId = QueryIdHelper.getQueryId(queryId);
        this.completedProfileStore = foreman.getQueryContext().getProfileStoreContext().getCompletedProfileStore();
        this.runningProfileStore = foreman.getQueryContext().getProfileStoreContext().getRunningProfileStore();
    }

    private static boolean isTerminal(UserBitShared.FragmentState state) {
        return state == UserBitShared.FragmentState.FAILED || state == UserBitShared.FragmentState.FINISHED || state == UserBitShared.FragmentState.CANCELLED;
    }

    private boolean updateFragmentStatus(BitControl.FragmentStatus fragmentStatus) {
        ExecProtos.FragmentHandle fragmentHandle = fragmentStatus.getHandle();
        int majorFragmentId = fragmentHandle.getMajorFragmentId();
        int minorFragmentId = fragmentHandle.getMinorFragmentId();
        FragmentData data = (FragmentData)((IntObjectHashMap)this.fragmentDataMap.get(majorFragmentId)).get(minorFragmentId);
        UserBitShared.FragmentState oldState = data.getState();
        boolean inTerminalState = QueryManager.isTerminal(oldState);
        UserBitShared.FragmentState currentState = fragmentStatus.getProfile().getState();
        if (inTerminalState || oldState == UserBitShared.FragmentState.CANCELLATION_REQUESTED && !QueryManager.isTerminal(currentState)) {
            logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s", QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState));
            return false;
        }
        data.setStatus(fragmentStatus);
        return oldState != currentState;
    }

    private void fragmentDone(BitControl.FragmentStatus status) {
        boolean stateChanged = this.updateFragmentStatus(status);
        if (stateChanged) {
            NodeTracker node = this.nodeMap.get(status.getProfile().getEndpoint());
            node.fragmentComplete();
            this.finishedFragments.incrementAndGet();
        }
    }

    private void addFragment(FragmentData fragmentData) {
        ExecProtos.FragmentHandle fragmentHandle = fragmentData.getHandle();
        int majorFragmentId = fragmentHandle.getMajorFragmentId();
        int minorFragmentId = fragmentHandle.getMinorFragmentId();
        IntObjectHashMap minorMap = (IntObjectHashMap)this.fragmentDataMap.get(majorFragmentId);
        if (minorMap == null) {
            minorMap = new IntObjectHashMap();
            this.fragmentDataMap.put(majorFragmentId, (Object)minorMap);
        }
        minorMap.put(minorFragmentId, (Object)fragmentData);
        this.fragmentDataSet.add(fragmentData);
    }

    public String getFragmentStatesAsString() {
        return this.fragmentDataMap.toString();
    }

    void addFragmentStatusTracker(BitControl.PlanFragment fragment, boolean isRoot) {
        CoordinationProtos.DrillbitEndpoint assignment = fragment.getAssignment();
        NodeTracker tracker = this.nodeMap.get(assignment);
        if (tracker == null) {
            tracker = new NodeTracker(assignment);
            this.nodeMap.put(assignment, tracker);
        }
        tracker.addFragment();
        this.addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
    }

    void cancelExecutingFragments(DrillbitContext drillbitContext) {
        Controller controller = drillbitContext.getController();
        for (FragmentData data : this.fragmentDataSet) {
            switch (data.getState()) {
                case SENDING: 
                case AWAITING_ALLOCATION: 
                case RUNNING: {
                    ExecProtos.FragmentHandle handle = data.getHandle();
                    CoordinationProtos.DrillbitEndpoint endpoint = data.getEndpoint();
                    controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle, SignalListener.Signal.CANCEL), handle);
                    break;
                }
            }
        }
    }

    void unpauseExecutingFragments(DrillbitContext drillbitContext) {
        Controller controller = drillbitContext.getController();
        for (FragmentData data : this.fragmentDataSet) {
            CoordinationProtos.DrillbitEndpoint endpoint = data.getEndpoint();
            ExecProtos.FragmentHandle handle = data.getHandle();
            controller.getTunnel(endpoint).unpauseFragment(new SignalListener(endpoint, handle, SignalListener.Signal.UNPAUSE), handle);
        }
    }

    @Override
    public void close() throws Exception {
    }

    void updateEphemeralState(UserBitShared.QueryResult.QueryState queryState) {
        if (!this.inTransientStore && !this.foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
            return;
        }
        switch (queryState) {
            case PREPARING: 
            case PLANNING: 
            case ENQUEUED: 
            case STARTING: 
            case RUNNING: 
            case CANCELLATION_REQUESTED: {
                try {
                    this.runningProfileStore.put(this.stringQueryId, this.getQueryInfo());
                    this.inTransientStore = true;
                    break;
                }
                catch (IllegalArgumentException e) {
                    throw UserException.executionError(e).message("Failed to persist query info. Query length is too big.", e).build(logger);
                }
            }
            case COMPLETED: 
            case CANCELED: 
            case FAILED: {
                try {
                    this.runningProfileStore.remove(this.stringQueryId);
                    this.inTransientStore = false;
                }
                catch (Exception e) {
                    logger.warn("Failure while trying to delete the stored profile for the query [{}]", (Object)this.stringQueryId, (Object)e);
                }
                break;
            }
            default: {
                throw new IllegalStateException("unrecognized queryState " + queryState);
            }
        }
    }

    void writeFinalProfile(UserException ex) {
        try {
            this.completedProfileStore.put(this.stringQueryId, this.getQueryProfile(ex));
        }
        catch (Exception e) {
            logger.error("Failure while storing Query Profile", (Throwable)e);
        }
    }

    private UserBitShared.QueryInfo getQueryInfo() {
        String queryText = this.foreman.getQueryText();
        UserBitShared.QueryInfo.Builder queryInfoBuilder = UserBitShared.QueryInfo.newBuilder().setState(this.foreman.getState()).setUser(this.foreman.getQueryContext().getQueryUserName()).setForeman(this.foreman.getQueryContext().getCurrentEndpoint()).setStart(this.startTime).setTotalCost(this.totalCost).setQueueName(this.queueName == null ? "-" : this.queueName).setOptionsJson(this.getQueryOptionsAsJson());
        if (queryText != null) {
            queryInfoBuilder.setQuery(queryText);
        }
        return queryInfoBuilder.build();
    }

    public UserBitShared.QueryProfile getQueryProfile() {
        return this.getQueryProfile(null);
    }

    private UserBitShared.QueryProfile getQueryProfile(UserException ex) {
        int autoLimitRowCount;
        String queryText;
        QueryContext queryCtx = this.foreman.getQueryContext();
        UserBitShared.QueryProfile.Builder profileBuilder = UserBitShared.QueryProfile.newBuilder().setUser(queryCtx.getQueryUserName()).setType(this.runQuery.getType()).setId(this.queryId).setQueryId(QueryIdHelper.getQueryId(this.queryId)).setState(this.foreman.getState()).setForeman(queryCtx.getCurrentEndpoint()).setStart(this.startTime).setEnd(this.endTime).setPlanEnd(this.planningEndTime).setQueueWaitEnd(this.queueWaitEndTime).setTotalFragments(this.fragmentDataSet.size()).setFinishedFragments(this.finishedFragments.get()).setTotalCost(this.totalCost).setQueueName(this.queueName == null ? "-" : this.queueName).setOptionsJson(this.getQueryOptionsAsJson());
        if (ex != null) {
            profileBuilder.setError(ex.getMessage(false));
            profileBuilder.setVerboseError(ex.getVerboseMessage(false));
            profileBuilder.setErrorId(ex.getErrorId());
            if (ex.getErrorLocation() != null) {
                profileBuilder.setErrorNode(ex.getErrorLocation());
            }
        }
        if (this.planText != null) {
            profileBuilder.setPlan(this.planText);
        }
        if ((queryText = this.foreman.getQueryText()) != null) {
            profileBuilder.setQuery(queryText);
        }
        if (this.planProps != null && this.planProps.scannedPluginNames != null) {
            profileBuilder.addAllScannedPlugins(this.planProps.scannedPluginNames);
        }
        if ((autoLimitRowCount = queryCtx.getOptions().getOption((String)"exec.query.max_rows").num_val.intValue()) > 0) {
            profileBuilder.setAutoLimit(autoLimitRowCount);
            logger.debug("The query's resultset was limited to {} rows", (Object)autoLimitRowCount);
        }
        this.fragmentDataMap.forEach((IntObjectPredicate)new OuterIter(profileBuilder));
        return profileBuilder.build();
    }

    private String getQueryOptionsAsJson() {
        try {
            OptionList optionList = this.foreman.getQueryContext().getOptions().getOptionList();
            return this.foreman.getQueryContext().getLpPersistence().getMapper().writeValueAsString((Object)optionList);
        }
        catch (JsonProcessingException e) {
            throw new DrillRuntimeException("Error while trying to convert option list to json string", e);
        }
    }

    void setPlanText(String planText) {
        this.planText = planText;
    }

    void setPlanProperties(PlanProperties planProps) {
        this.planProps = planProps;
    }

    void markStartTime() {
        this.startTime = System.currentTimeMillis();
    }

    void markEndTime() {
        this.endTime = System.currentTimeMillis();
    }

    void markPlanningEndTime() {
        this.planningEndTime = System.currentTimeMillis();
    }

    void markQueueWaitEndTime() {
        this.queueWaitEndTime = System.currentTimeMillis();
    }

    public void setTotalCost(double totalCost) {
        this.totalCost = totalCost;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    private void nodeComplete() {
        int totalNodes;
        int finishedNodes = this.finishedNodes.incrementAndGet();
        Preconditions.checkArgument(finishedNodes <= (totalNodes = this.nodeMap.size()), "The finished node count exceeds the total node count");
        int remaining = totalNodes - finishedNodes;
        if (remaining == 0) {
            this.foreman.addToEventQueue(UserBitShared.QueryResult.QueryState.COMPLETED, null);
        } else {
            logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", (Object)remaining, (Object)(this.fragmentDataSet.size() - this.finishedFragments.get()));
        }
    }

    public FragmentStatusListener getFragmentStatusListener() {
        return this.fragmentStatusListener;
    }

    public DrillbitStatusListener getDrillbitStatusListener() {
        return this.drillbitStatusListener;
    }

    private class NodeTracker {
        private final AtomicInteger totalFragments = new AtomicInteger(0);
        private final AtomicInteger completedFragments = new AtomicInteger(0);

        public NodeTracker(CoordinationProtos.DrillbitEndpoint endpoint) {
        }

        public void addFragment() {
            this.totalFragments.incrementAndGet();
        }

        public void fragmentComplete() {
            if (this.totalFragments.get() == this.completedFragments.incrementAndGet()) {
                QueryManager.this.nodeComplete();
            }
        }

        public boolean nodeDead() {
            if (this.completedFragments.get() == this.totalFragments.get()) {
                return false;
            }
            while (this.completedFragments.get() < this.totalFragments.get()) {
                this.fragmentComplete();
            }
            return true;
        }
    }

    private static class SignalListener
    extends EndpointListener<GeneralRPCProtos.Ack, ExecProtos.FragmentHandle> {
        private final Signal signal;

        public SignalListener(CoordinationProtos.DrillbitEndpoint endpoint, ExecProtos.FragmentHandle handle, Signal signal) {
            super(endpoint, handle);
            this.signal = signal;
        }

        @Override
        public void failed(RpcException ex) {
            logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", new Object[]{this.signal, this.value, this.endpoint, ex});
        }

        @Override
        public void success(GeneralRPCProtos.Ack ack, ByteBuf buf) {
            if (!ack.getOk()) {
                logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", new Object[]{this.endpoint, this.signal, this.value, ack});
            }
        }

        @Override
        public void interrupted(InterruptedException ex) {
            logger.error("Interrupted while waiting for RPC outcome of action fragment {}. Endpoint {}, Fragment handle {}", new Object[]{this.signal, this.endpoint, this.value, ex});
        }

        public static enum Signal {
            CANCEL,
            UNPAUSE;

        }
    }

    private class OuterIter
    implements IntObjectPredicate<IntObjectHashMap<FragmentData>> {
        private final UserBitShared.QueryProfile.Builder profileBuilder;

        public OuterIter(UserBitShared.QueryProfile.Builder profileBuilder) {
            this.profileBuilder = profileBuilder;
        }

        public boolean apply(int majorFragmentId, IntObjectHashMap<FragmentData> minorMap) {
            UserBitShared.MajorFragmentProfile.Builder builder = UserBitShared.MajorFragmentProfile.newBuilder().setMajorFragmentId(majorFragmentId);
            minorMap.forEach((IntObjectPredicate)new InnerIter(builder));
            this.profileBuilder.addFragmentProfile(builder);
            return true;
        }
    }

    private class InnerIter
    implements IntObjectPredicate<FragmentData> {
        private final UserBitShared.MajorFragmentProfile.Builder builder;

        public InnerIter(UserBitShared.MajorFragmentProfile.Builder fb) {
            this.builder = fb;
        }

        public boolean apply(int key, FragmentData data) {
            this.builder.addMinorFragmentProfile(data.getProfile());
            return true;
        }
    }
}

