/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.ws.emr.hadoop.fs.staging;

import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Preconditions;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.ListeningExecutorService;
import com.amazon.ws.emr.hadoop.fs.staging.Task;
import com.amazon.ws.emr.hadoop.fs.staging.TaskCoordinator;
import com.amazon.ws.emr.hadoop.fs.util.ExceptionCollector;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import lombok.NonNull;

class StagedFilesExecutor {
    private final ListeningExecutorService exec;
    private final int maxActiveTasks;

    StagedFilesExecutor(@NonNull ListeningExecutorService exec, int maxActiveTasks) {
        if (exec == null) {
            throw new NullPointerException("exec is marked non-null but is null");
        }
        Preconditions.checkArgument(maxActiveTasks > 1, "use StagedFilesExecutor only when maxActiveTask is larger than 1 one to avoid thread overhead");
        this.exec = exec;
        this.maxActiveTasks = maxActiveTasks;
    }

    void run(@NonNull TaskCoordinator taskCoordinator) throws InterruptedException, IOException {
        if (taskCoordinator == null) {
            throw new NullPointerException("taskCoordinator is marked non-null but is null");
        }
        LinkedBlockingQueue<TaskResult> results = new LinkedBlockingQueue<TaskResult>();
        ExceptionCollector exceptionCollector = ExceptionCollector.withDefaultMaxCollectedExceptions();
        long iteratedInFirstBatch = this.submitOrRunFirstBatch(taskCoordinator, results, exceptionCollector);
        try {
            this.awaitResultsAndSubmitOrRunRemaining(taskCoordinator, results, exceptionCollector, exceptionCollector.isEmpty(), iteratedInFirstBatch);
        }
        catch (InterruptedException ie) {
            exceptionCollector.forEach(ie::addSuppressed);
            throw ie;
        }
        exceptionCollector.rethrowIfNotEmpty(IOException.class);
    }

    private long submitOrRunFirstBatch(TaskCoordinator taskCoordinator, BlockingQueue<TaskResult> results, ExceptionCollector exceptionCollector) {
        int iterated;
        try {
            for (iterated = 0; iterated < this.maxActiveTasks && taskCoordinator.hasRemainingTasks(); ++iterated) {
                Task next = taskCoordinator.nextTask();
                this.submitOrRun(next, results);
            }
        }
        catch (RuntimeException exception) {
            exceptionCollector.add(exception);
        }
        return iterated;
    }

    private void submitOrRun(Task task, BlockingQueue<TaskResult> results) {
        Runnable runnable = () -> {
            try {
                task.run();
                results.offer(TaskResult.success());
            }
            catch (Throwable exception) {
                results.offer(TaskResult.fail(exception, task.abandonRemainingTasks(exception)));
            }
        };
        if (task.isBlockingTask()) {
            runnable.run();
        } else {
            try {
                this.exec.submit(runnable);
            }
            catch (RejectedExecutionException ree) {
                runnable.run();
                throw ree;
            }
        }
    }

    private void awaitResultsAndSubmitOrRunRemaining(TaskCoordinator taskCoordinator, BlockingQueue<TaskResult> results, ExceptionCollector exceptionCollector, boolean submitNewTasks, long submittedInFirstBatch) throws InterruptedException {
        long pending = submittedInFirstBatch;
        boolean shouldAbandonRemainingTasks = false;
        while (pending > 0L) {
            TaskResult oneResult = results.take();
            if (!oneResult.success) {
                exceptionCollector.add(oneResult.exception);
            }
            if (oneResult.shouldAbandonRemainingTasks) {
                shouldAbandonRemainingTasks = true;
            }
            --pending;
            try {
                if (!submitNewTasks || shouldAbandonRemainingTasks || !taskCoordinator.hasRemainingTasks()) continue;
                Task task = taskCoordinator.nextTask();
                ++pending;
                this.submitOrRun(task, results);
            }
            catch (RuntimeException re) {
                shouldAbandonRemainingTasks = true;
                exceptionCollector.add(re);
            }
        }
    }

    private static class TaskResult {
        final boolean success;
        final Throwable exception;
        final boolean shouldAbandonRemainingTasks;
        public static final TaskResult SUCCESS = new TaskResult(true, null, false);

        private TaskResult(boolean success, Throwable exception, boolean shouldAbandonRemainingTasks) {
            this.success = success;
            this.exception = exception;
            this.shouldAbandonRemainingTasks = shouldAbandonRemainingTasks;
        }

        static TaskResult success() {
            return SUCCESS;
        }

        static TaskResult fail(@NonNull Throwable exception, boolean shouldAbandonRemainingTasks) {
            if (exception == null) {
                throw new NullPointerException("exception is marked non-null but is null");
            }
            return new TaskResult(false, exception, shouldAbandonRemainingTasks);
        }
    }
}

