/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.foreman;

import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.QueryManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FragmentsRunner {
    private static final Logger logger = LoggerFactory.getLogger(FragmentsRunner.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class);
    private final WorkManager.WorkerBee bee;
    private final UserClientConnection initiatingClient;
    private final DrillbitContext drillbitContext;
    private final Foreman foreman;
    private List<BitControl.PlanFragment> planFragments;
    private BitControl.PlanFragment rootPlanFragment;
    private FragmentRoot rootOperator;

    public FragmentsRunner(WorkManager.WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) {
        this.bee = bee;
        this.initiatingClient = initiatingClient;
        this.drillbitContext = drillbitContext;
        this.foreman = foreman;
    }

    public WorkManager.WorkerBee getBee() {
        return this.bee;
    }

    public void setFragmentsInfo(List<BitControl.PlanFragment> planFragments, BitControl.PlanFragment rootPlanFragment, FragmentRoot rootOperator) {
        this.planFragments = planFragments;
        this.rootPlanFragment = rootPlanFragment;
        this.rootOperator = rootOperator;
    }

    public void submit() throws ExecutionSetupException {
        assert (this.planFragments != null);
        assert (this.rootPlanFragment != null);
        assert (this.rootOperator != null);
        UserBitShared.QueryId queryId = this.foreman.getQueryId();
        assert (queryId == this.rootPlanFragment.getHandle().getQueryId());
        QueryManager queryManager = this.foreman.getQueryManager();
        this.drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
        this.drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
        logger.debug("Submitting fragments to run.");
        this.setupRootFragment(this.rootPlanFragment, this.rootOperator);
        this.setupNonRootFragments(this.planFragments);
        logger.debug("Fragments running.");
    }

    private void setupRootFragment(BitControl.PlanFragment rootFragment, FragmentRoot rootOperator) throws ExecutionSetupException {
        QueryManager queryManager = this.foreman.getQueryManager();
        FragmentContextImpl rootContext = new FragmentContextImpl(this.drillbitContext, rootFragment, this.foreman.getQueryContext(), this.initiatingClient, this.drillbitContext.getFunctionImplementationRegistry());
        FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
        FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
        RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
        queryManager.addFragmentStatusTracker(rootFragment, true);
        if (rootContext.isBuffersDone()) {
            this.bee.addFragmentRunner(rootRunner);
        } else {
            this.drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
        }
    }

    private void setupNonRootFragments(Collection<BitControl.PlanFragment> fragments) throws ExecutionSetupException {
        if (fragments.isEmpty()) {
            return;
        }
        ArrayListMultimap<CoordinationProtos.DrillbitEndpoint, BitControl.PlanFragment> leafFragmentMap = ArrayListMultimap.create();
        ArrayListMultimap<CoordinationProtos.DrillbitEndpoint, BitControl.PlanFragment> intFragmentMap = ArrayListMultimap.create();
        for (BitControl.PlanFragment planFragment : fragments) {
            if (logger.isTraceEnabled()) {
                logger.trace("Tracking intermediate remote node {} with data {}", (Object)planFragment.getAssignment(), (Object)planFragment.getFragmentJson());
            }
            this.foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
            if (planFragment.getLeafFragment()) {
                leafFragmentMap.put(planFragment.getAssignment(), planFragment);
                continue;
            }
            intFragmentMap.put(planFragment.getAssignment(), planFragment);
        }
        this.scheduleIntermediateFragments(intFragmentMap);
        injector.injectChecked(this.foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);
        for (CoordinationProtos.DrillbitEndpoint ep : leafFragmentMap.keySet()) {
            this.sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
        }
    }

    private void sendRemoteFragments(CoordinationProtos.DrillbitEndpoint assignment, Collection<BitControl.PlanFragment> fragments, CountDownLatch latch, FragmentSubmitFailures fragmentSubmitFailures) {
        Controller controller = this.drillbitContext.getController();
        BitControl.InitializeFragments.Builder fb = BitControl.InitializeFragments.newBuilder();
        for (BitControl.PlanFragment planFragment : fragments) {
            fb.addFragment(planFragment);
        }
        BitControl.InitializeFragments initFrags = fb.build();
        logger.debug("Sending remote fragments to node: {}\nData: {}", (Object)assignment, (Object)initFrags);
        FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
        controller.getTunnel(assignment).sendFragments(listener, initFrags);
    }

    private void scheduleIntermediateFragments(Multimap<CoordinationProtos.DrillbitEndpoint, BitControl.PlanFragment> intermediateFragmentMap) {
        int numIntFragments = intermediateFragmentMap.keySet().size();
        ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
        FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
        for (CoordinationProtos.DrillbitEndpoint ep : intermediateFragmentMap.keySet()) {
            this.sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
        }
        long timeout = this.drillbitContext.getOptionManager().getLong("drill.exec.rpc.fragrunner.timeout") * (long)numIntFragments;
        if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
            long numberRemaining = endpointLatch.getCount();
            throw UserException.connectionError().message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. Sent %d and only heard response back from %d nodes.", timeout, numIntFragments, (long)numIntFragments - numberRemaining).build(logger);
        }
        List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
        if (submissionExceptions.size() > 0) {
            HashSet<CoordinationProtos.DrillbitEndpoint> endpoints = Sets.newHashSet();
            StringBuilder sb = new StringBuilder();
            boolean first = true;
            for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
                CoordinationProtos.DrillbitEndpoint endpoint = e.drillbitEndpoint;
                if (!endpoints.add(endpoint)) continue;
                if (first) {
                    first = false;
                } else {
                    sb.append(", ");
                }
                sb.append(endpoint.getAddress());
            }
            throw UserException.connectionError(submissionExceptions.get((int)0).rpcException).message("Error setting up remote intermediate fragment execution", new Object[0]).addContext("Nodes with failures", sb.toString()).build(logger);
        }
    }

    private static class FragmentSubmitFailures {
        final List<SubmissionException> submissionExceptions = new LinkedList<SubmissionException>();

        private FragmentSubmitFailures() {
        }

        void addFailure(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, RpcException rpcException) {
            this.submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
        }

        static class SubmissionException {
            final CoordinationProtos.DrillbitEndpoint drillbitEndpoint;
            final RpcException rpcException;

            SubmissionException(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, RpcException rpcException) {
                this.drillbitEndpoint = drillbitEndpoint;
                this.rpcException = rpcException;
            }
        }
    }

    private class FragmentSubmitListener
    extends EndpointListener<GeneralRPCProtos.Ack, BitControl.InitializeFragments> {
        private final CountDownLatch latch;
        private final FragmentSubmitFailures fragmentSubmitFailures;

        public FragmentSubmitListener(CoordinationProtos.DrillbitEndpoint endpoint, BitControl.InitializeFragments value, CountDownLatch latch, FragmentSubmitFailures fragmentSubmitFailures) {
            super(endpoint, value);
            Preconditions.checkState(latch == null == (fragmentSubmitFailures == null));
            this.latch = latch;
            this.fragmentSubmitFailures = fragmentSubmitFailures;
        }

        @Override
        public void success(GeneralRPCProtos.Ack ack, ByteBuf byteBuf) {
            if (this.latch != null) {
                this.latch.countDown();
            }
        }

        @Override
        public void failed(RpcException ex) {
            if (this.latch != null) {
                this.fragmentSubmitFailures.addFailure(this.endpoint, ex);
                this.latch.countDown();
            } else {
                logger.debug("Failure while sending fragment.  Stopping query.", (Throwable)ex);
                FragmentsRunner.this.foreman.addToEventQueue(UserBitShared.QueryResult.QueryState.FAILED, ex);
            }
        }

        @Override
        public void interrupted(InterruptedException e) {
            String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
            logger.error("Interrupted while waiting for the RPC outcome of fragment submission.", (Throwable)e);
            this.failed(new RpcException("Interrupted while waiting for the RPC outcome of fragment submission.", e));
        }
    }
}

