/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.fragment;

import java.io.IOException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.common.DeferredException;
import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.FailureUtils;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.StateTransitionException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FragmentExecutor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FragmentExecutor.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
    private final String fragmentName;
    private final ExecutorFragmentContext fragmentContext;
    private final FragmentStatusReporter statusReporter;
    private final DeferredException deferredException = new DeferredException();
    private final BitControl.PlanFragment fragment;
    private final FragmentRoot rootOperator;
    private volatile RootExec root;
    private final AtomicReference<UserBitShared.FragmentState> fragmentState = new AtomicReference<UserBitShared.FragmentState>(UserBitShared.FragmentState.AWAITING_ALLOCATION);
    private final Queue<ExecProtos.FragmentHandle> receiverFinishedQueue = new ConcurrentLinkedQueue<ExecProtos.FragmentHandle>();
    private final FragmentEventProcessor eventProcessor = new FragmentEventProcessor();
    private final AtomicReference<Thread> myThreadRef = new AtomicReference<Object>(null);

    public FragmentExecutor(ExecutorFragmentContext context, BitControl.PlanFragment fragment, FragmentStatusReporter statusReporter) {
        this(context, fragment, statusReporter, null);
    }

    public FragmentExecutor(ExecutorFragmentContext context, BitControl.PlanFragment fragment, FragmentStatusReporter statusReporter, FragmentRoot rootOperator) {
        this.fragmentContext = context;
        this.statusReporter = statusReporter;
        this.fragment = fragment;
        this.rootOperator = rootOperator;
        this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
        context.setExecutorState(new ExecutorStateImpl());
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("FragmentExecutor [fragmentContext=");
        builder.append(this.fragmentContext);
        builder.append(", fragmentState=");
        builder.append(this.fragmentState);
        builder.append("]");
        return builder.toString();
    }

    public BitControl.FragmentStatus getStatus() {
        if (this.fragmentState.get() == UserBitShared.FragmentState.RUNNING) {
            return this.statusReporter.getStatus(UserBitShared.FragmentState.RUNNING);
        }
        return null;
    }

    public void cancel() {
        boolean thisIsOnlyThread = this.myThreadRef.compareAndSet(null, Thread.currentThread());
        if (thisIsOnlyThread) {
            this.eventProcessor.cancelAndFinish();
            this.eventProcessor.start();
        } else {
            this.eventProcessor.cancel();
        }
    }

    private void cleanup(UserBitShared.FragmentState state) {
        this.closeOutResources();
        this.updateState(state);
        this.sendFinalState();
    }

    public synchronized void unpause() {
        this.fragmentContext.getExecutionControls().unpauseAll();
    }

    public void receivingFragmentFinished(ExecProtos.FragmentHandle handle) {
        this.eventProcessor.receiverFinished(handle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Thread myThread = Thread.currentThread();
        if (!this.myThreadRef.compareAndSet(null, myThread)) {
            return;
        }
        String originalThreadName = myThread.getName();
        ExecProtos.FragmentHandle fragmentHandle = this.fragmentContext.getHandle();
        ClusterCoordinator clusterCoordinator = this.fragmentContext.getClusterCoordinator();
        FragmentDrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
        String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
        try {
            myThread.setName(newThreadName);
            FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator : this.fragmentContext.getPlanReader().readFragmentRoot(this.fragment.getFragmentJson());
            this.root = ImplCreator.getExec(this.fragmentContext, rootOperator);
            if (this.root == null) {
                return;
            }
            clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
            this.updateState(UserBitShared.FragmentState.RUNNING);
            this.eventProcessor.start();
            injector.injectPause(this.fragmentContext.getExecutionControls(), "fragment-running", logger);
            CoordinationProtos.DrillbitEndpoint endpoint = this.fragmentContext.getEndpoint();
            logger.debug("Starting fragment {}:{} on {}:{}", new Object[]{fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), endpoint.getAddress(), endpoint.getUserPort()});
            UserGroupInformation queryUserUgi = this.fragmentContext.isImpersonationEnabled() ? ImpersonationUtil.createProxyUgi(this.fragmentContext.getQueryUserName()) : ImpersonationUtil.getProcessUserUGI();
            queryUserUgi.doAs(() -> {
                injector.injectChecked(this.fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
                while (this.shouldContinue()) {
                    ExecProtos.FragmentHandle fragmentHandle1;
                    while ((fragmentHandle1 = this.receiverFinishedQueue.poll()) != null) {
                        this.root.receivingFragmentFinished(fragmentHandle1);
                    }
                    if (this.root.next()) continue;
                    break;
                }
                return null;
            });
        }
        catch (QueryCancelledException rootOperator) {
        }
        catch (OutOfMemoryError | OutOfMemoryException e) {
            if (FailureUtils.isDirectMemoryOOM(e)) {
                this.root.dumpBatches(e);
                this.fail(UserException.memoryError(e).build(logger));
            } else {
                FailureUtils.unrecoverableFailure(e, "Unable to handle out of memory condition in FragmentExecutor.", -1);
            }
        }
        catch (InterruptedException e) {
            logger.trace("Interrupted root: {}", (Object)this.root, (Object)e);
        }
        catch (Throwable t) {
            if (this.root != null) {
                this.root.dumpBatches(t);
            }
            this.fail(t);
        }
        finally {
            this.eventProcessor.terminate();
            Thread.interrupted();
            this.cleanup(UserBitShared.FragmentState.FINISHED);
            clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
            myThread.setName(originalThreadName);
        }
    }

    private boolean shouldContinue() {
        return !this.isCompleted() && UserBitShared.FragmentState.CANCELLATION_REQUESTED != this.fragmentState.get();
    }

    private boolean isCompleted() {
        return this.isTerminal(this.fragmentState.get());
    }

    private void sendFinalState() {
        UserBitShared.FragmentState outcome = this.fragmentState.get();
        if (outcome == UserBitShared.FragmentState.FAILED) {
            ExecProtos.FragmentHandle handle = this.getContext().getHandle();
            UserException uex = UserException.systemError(this.deferredException.getAndClear()).addIdentity(this.getContext().getEndpoint()).addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId()).build(logger);
            this.statusReporter.fail(uex);
        } else {
            this.statusReporter.stateChanged(outcome);
        }
        this.statusReporter.close();
    }

    private void closeOutResources() {
        try {
            if (this.root != null) {
                this.root.close();
            }
        }
        catch (Exception e) {
            this.fail(e);
        }
        this.fragmentContext.close();
    }

    private void warnStateChange(UserBitShared.FragmentState current, UserBitShared.FragmentState target) {
        logger.warn(this.fragmentName + ": Ignoring unexpected state transition {} --> {}", (Object)current.name(), (Object)target.name());
    }

    private void errorStateChange(UserBitShared.FragmentState current, UserBitShared.FragmentState target) {
        String msg = "%s: Invalid state transition %s --> %s";
        throw new StateTransitionException(String.format("%s: Invalid state transition %s --> %s", this.fragmentName, current.name(), target.name()));
    }

    /*
     * Unable to fully structure code
     */
    private synchronized boolean updateState(UserBitShared.FragmentState target) {
        current = this.fragmentState.get();
        FragmentExecutor.logger.info(this.fragmentName + ": State change requested {} --> {}", (Object)current, (Object)target);
        switch (1.$SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[target.ordinal()]) {
            case 4: {
                switch (1.$SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[current.ordinal()]) {
                    case 1: 
                    case 2: 
                    case 3: {
                        this.fragmentState.set(target);
                        this.statusReporter.stateChanged(target);
                        return true;
                    }
                }
                this.warnStateChange(current, target);
                return false;
            }
            case 5: {
                if (current != UserBitShared.FragmentState.CANCELLATION_REQUESTED) ** GOTO lbl16
                target = UserBitShared.FragmentState.CANCELLED;
                ** GOTO lbl18
lbl16:
                // 1 sources

                if (current == UserBitShared.FragmentState.FAILED) {
                    target = UserBitShared.FragmentState.FAILED;
                }
            }
lbl18:
            // 5 sources

            case 6: {
                if (!this.isTerminal(current)) {
                    this.fragmentState.set(target);
                    return true;
                }
                if (current == UserBitShared.FragmentState.FAILED) {
                    return false;
                }
                if (current == UserBitShared.FragmentState.CANCELLED && target == UserBitShared.FragmentState.FAILED) {
                    this.fragmentState.set(UserBitShared.FragmentState.FAILED);
                    return true;
                }
                this.warnStateChange(current, target);
                return false;
            }
            case 3: {
                if (current == UserBitShared.FragmentState.AWAITING_ALLOCATION) {
                    this.fragmentState.set(target);
                    this.statusReporter.stateChanged(target);
                    return true;
                }
                this.errorStateChange(current, target);
            }
        }
        this.errorStateChange(current, target);
        throw new IllegalStateException();
    }

    private boolean isTerminal(UserBitShared.FragmentState state) {
        return state == UserBitShared.FragmentState.CANCELLED || state == UserBitShared.FragmentState.FAILED || state == UserBitShared.FragmentState.FINISHED;
    }

    private void fail(Throwable excep) {
        this.deferredException.addThrowable(excep);
        this.updateState(UserBitShared.FragmentState.FAILED);
    }

    public ExecutorFragmentContext getContext() {
        return this.fragmentContext;
    }

    private class FragmentEventProcessor
    extends EventProcessor<FragmentEvent> {
        private final AtomicBoolean terminate = new AtomicBoolean(false);

        private FragmentEventProcessor() {
        }

        void cancel() {
            this.sendEvent(new FragmentEvent(EventType.CANCEL, null));
        }

        void cancelAndFinish() {
            this.sendEvent(new FragmentEvent(EventType.CANCEL_AND_FINISH, null));
        }

        void receiverFinished(ExecProtos.FragmentHandle handle) {
            this.sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
        }

        public void terminate() {
            this.terminate.set(true);
        }

        @Override
        protected void processEvent(FragmentEvent event) {
            if (event.type.equals((Object)EventType.RECEIVER_FINISHED) ? this.terminate.get() : !this.terminate.compareAndSet(false, true)) {
                return;
            }
            switch (event.type) {
                case CANCEL: {
                    FragmentExecutor.this.updateState(UserBitShared.FragmentState.CANCELLATION_REQUESTED);
                    this.killThread();
                    break;
                }
                case CANCEL_AND_FINISH: {
                    FragmentExecutor.this.updateState(UserBitShared.FragmentState.CANCELLATION_REQUESTED);
                    FragmentExecutor.this.cleanup(UserBitShared.FragmentState.FINISHED);
                    break;
                }
                case RECEIVER_FINISHED: {
                    assert (event.handle != null) : "RECEIVER_FINISHED event must have a handle";
                    if (FragmentExecutor.this.root != null) {
                        logger.info("Applying request for early sender termination for {} -> {}.", (Object)QueryIdHelper.getQueryIdentifier(FragmentExecutor.this.getContext().getHandle()), (Object)QueryIdHelper.getFragmentId(event.handle));
                        FragmentExecutor.this.receiverFinishedQueue.add(event.handle);
                        break;
                    }
                    logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.", (Object)QueryIdHelper.getFragmentId(FragmentExecutor.this.getContext().getHandle()), (Object)QueryIdHelper.getFragmentId(event.handle));
                }
            }
        }

        private void killThread() {
            Thread myThread = (Thread)FragmentExecutor.this.myThreadRef.get();
            logger.debug("Interrupting fragment thread {}", (Object)myThread.getName());
            myThread.interrupt();
        }
    }

    private class ExecutorStateImpl
    implements FragmentContext.ExecutorState {
        private ExecutorStateImpl() {
        }

        @Override
        public boolean shouldContinue() {
            return FragmentExecutor.this.shouldContinue();
        }

        @Override
        public void fail(Throwable t) {
            FragmentExecutor.this.fail(t);
        }

        @Override
        public boolean isFailed() {
            return FragmentExecutor.this.fragmentState.get() == UserBitShared.FragmentState.FAILED;
        }

        @Override
        public Throwable getFailureCause() {
            return FragmentExecutor.this.deferredException.getException();
        }

        @Override
        public void checkContinue() {
            if (!this.shouldContinue()) {
                throw new QueryCancelledException();
            }
        }
    }

    private class FragmentDrillbitStatusListener
    implements DrillbitStatusListener {
        private FragmentDrillbitStatusListener() {
        }

        @Override
        public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
        }

        @Override
        public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
            CoordinationProtos.DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
            if (unregisteredDrillbits.contains(foremanEndpoint)) {
                logger.warn("Foreman {} no longer active.  Cancelling fragment {}.", (Object)foremanEndpoint.getAddress(), (Object)QueryIdHelper.getQueryIdentifier(FragmentExecutor.this.fragmentContext.getHandle()));
                FragmentExecutor.this.statusReporter.close();
                FragmentExecutor.this.cancel();
            }
        }
    }

    private class FragmentEvent {
        private final EventType type;
        private final ExecProtos.FragmentHandle handle;

        FragmentEvent(EventType type, ExecProtos.FragmentHandle handle) {
            this.type = type;
            this.handle = handle;
        }
    }

    private static enum EventType {
        CANCEL,
        CANCEL_AND_FINISH,
        RECEIVER_FINISHED;

    }
}

