/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.MoreExecutors;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TimedCallable<V>
implements Callable<V> {
    private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000L;
    private volatile long startTime = 0L;
    private volatile long executionTime = -1L;

    @Override
    public final V call() throws Exception {
        V v;
        long start;
        this.startTime = start = System.nanoTime();
        try {
            logger.debug("Started execution of '{}' task at {} ms", (Object)this, (Object)TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS));
            v = this.runInner();
        }
        catch (InterruptedException e) {
            try {
                logger.warn("Task '{}' interrupted", (Object)this, (Object)e);
                throw e;
            }
            catch (Throwable throwable) {
                long time = System.nanoTime() - start;
                if (logger.isWarnEnabled()) {
                    long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
                    if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
                        logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", new Object[]{this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS});
                    } else {
                        logger.debug("Task '{}' execution time is {} ms", (Object)this, (Object)timeMillis);
                    }
                }
                this.executionTime = time;
                throw throwable;
            }
        }
        long time = System.nanoTime() - start;
        if (logger.isWarnEnabled()) {
            long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
            if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
                logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", new Object[]{this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS});
            } else {
                logger.debug("Task '{}' execution time is {} ms", (Object)this, (Object)timeMillis);
            }
        }
        this.executionTime = time;
        return v;
    }

    protected abstract V runInner() throws Exception;

    private long getStartTime(TimeUnit unit) {
        return unit.convert(this.startTime, TimeUnit.NANOSECONDS);
    }

    private long getExecutionTime(TimeUnit unit) {
        return unit.convert(this.executionTime, TimeUnit.NANOSECONDS);
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static <V> List<V> run(String activity, Logger logger, List<TimedCallable<V>> tasks, int parallelism) throws IOException {
        IOException iOException;
        List list;
        Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(), "list of tasks is empty");
        Preconditions.checkArgument(parallelism > 0);
        parallelism = Math.min(parallelism, tasks.size());
        ListeningExecutorService threadPool = parallelism == 1 ? MoreExecutors.newDirectExecutorService() : Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setNameFormat(activity + "-%d").build());
        long timeout = TIMEOUT_PER_RUNNABLE_IN_MSECS * (long)((tasks.size() - 1) / parallelism + 1);
        FutureMapper futureMapper = new FutureMapper();
        Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<V>() : null;
        try {
            list = Collectors.toList(threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS), futureMapper);
        }
        catch (InterruptedException e) {
            try {
                String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
                logger.error(errMsg, (Throwable)e);
                throw UserException.resourceError(e).message(errMsg, new Object[0]).build(logger);
                catch (RejectedExecutionException e2) {
                    errMsg = String.format("Failure while submitting activity '%s' tasks for execution.", activity);
                    logger.error(errMsg, (Throwable)e2);
                    throw UserException.internalError(e2).message(errMsg, new Object[0]).build(logger);
                }
            }
            catch (Throwable throwable) {
                IOException iOException2;
                List<Runnable> notStartedTasks2 = threadPool.shutdownNow();
                if (!notStartedTasks2.isEmpty()) {
                    logger.error("{} activity '{}' tasks never commenced execution.", (Object)notStartedTasks2.size(), (Object)activity);
                }
                try {
                    if (!threadPool.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                        logger.error("Detected run away tasks in activity '{}'.", (Object)activity);
                    }
                }
                catch (InterruptedException e3) {
                    logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", (Object)activity);
                }
                if (statistics != null) {
                    statistics.collect(tasks).log(activity, logger, parallelism);
                }
                if (futureMapper.count != tasks.size()) {
                    String errMsg = String.format("Waited for %d ms, but only %d tasks for '%s' are complete. Total number of tasks %d, parallelism %d.", timeout, futureMapper.count, activity, tasks.size(), parallelism);
                    logger.error(errMsg, futureMapper.throwable);
                    throw UserException.resourceError(futureMapper.throwable).message(errMsg, new Object[0]).build(logger);
                }
                if (futureMapper.throwable == null) throw throwable;
                if (futureMapper.throwable instanceof IOException) {
                    iOException2 = (IOException)futureMapper.throwable;
                    throw iOException2;
                }
                iOException2 = new IOException(futureMapper.throwable);
                throw iOException2;
            }
        }
        List<Runnable> notStartedTasks = threadPool.shutdownNow();
        if (!notStartedTasks.isEmpty()) {
            logger.error("{} activity '{}' tasks never commenced execution.", (Object)notStartedTasks.size(), (Object)activity);
        }
        try {
            if (!threadPool.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                logger.error("Detected run away tasks in activity '{}'.", (Object)activity);
            }
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", (Object)activity);
        }
        if (statistics != null) {
            statistics.collect(tasks).log(activity, logger, parallelism);
        }
        if (futureMapper.count != tasks.size()) {
            String errMsg = String.format("Waited for %d ms, but only %d tasks for '%s' are complete. Total number of tasks %d, parallelism %d.", timeout, futureMapper.count, activity, tasks.size(), parallelism);
            logger.error(errMsg, futureMapper.throwable);
            throw UserException.resourceError(futureMapper.throwable).message(errMsg, new Object[0]).build(logger);
        }
        if (futureMapper.throwable == null) return list;
        if (futureMapper.throwable instanceof IOException) {
            iOException = (IOException)futureMapper.throwable;
            throw iOException;
        }
        iOException = new IOException(futureMapper.throwable);
        throw iOException;
    }

    private static class FutureMapper<V>
    implements Function<Future<V>, V> {
        int count;
        Throwable throwable = null;

        private FutureMapper() {
        }

        private void setThrowable(Throwable t) {
            if (this.throwable == null) {
                this.throwable = t;
            } else {
                this.throwable.addSuppressed(t);
            }
        }

        @Override
        public V apply(Future<V> future) {
            Preconditions.checkState(future.isDone());
            if (!future.isCancelled()) {
                try {
                    ++this.count;
                    return future.get();
                }
                catch (InterruptedException e) {
                    logger.error("Unexpected exception", (Throwable)e);
                    throw UserException.internalError(e).message("Unexpected exception", new Object[0]).build(logger);
                }
                catch (ExecutionException e) {
                    this.setThrowable(e.getCause());
                }
            } else {
                this.setThrowable(new CancellationException());
            }
            return null;
        }
    }

    private static class Statistics<V>
    implements Consumer<TimedCallable<V>> {
        final long start = System.nanoTime();
        final Stopwatch watch = Stopwatch.createStarted();
        long totalExecution;
        long maxExecution;
        int count;
        int startedCount;
        private int doneCount;
        long earliestStart;
        long latestStart;
        long totalStart;

        private Statistics() {
        }

        @Override
        public void accept(TimedCallable<V> task) {
            ++this.count;
            long threadStart = ((TimedCallable)task).getStartTime(TimeUnit.NANOSECONDS) - this.start;
            if (threadStart >= 0L) {
                ++this.startedCount;
                this.earliestStart = Math.min(this.earliestStart, threadStart);
                this.latestStart = Math.max(this.latestStart, threadStart);
                this.totalStart += threadStart;
                long executionTime = ((TimedCallable)task).getExecutionTime(TimeUnit.NANOSECONDS);
                if (executionTime != -1L) {
                    ++this.doneCount;
                    this.totalExecution += executionTime;
                    this.maxExecution = Math.max(this.maxExecution, executionTime);
                } else {
                    logger.info("Task {} started at {} did not finish", task, (Object)threadStart);
                }
            } else {
                logger.info("Task {} never commenced execution", task);
            }
        }

        Statistics<V> collect(List<TimedCallable<V>> tasks) {
            this.maxExecution = 0L;
            this.totalExecution = 0L;
            this.doneCount = 0;
            this.startedCount = 0;
            this.count = 0;
            this.earliestStart = Long.MAX_VALUE;
            this.totalStart = 0L;
            this.latestStart = 0L;
            tasks.forEach(this);
            return this;
        }

        void log(String activity, Logger logger, int parallelism) {
            if (this.startedCount > 0) {
                logger.debug("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).", new Object[]{activity, this.startedCount, this.count, parallelism, TimeUnit.NANOSECONDS.toMillis(this.earliestStart), TimeUnit.NANOSECONDS.toMillis(this.totalStart) / (long)this.startedCount, TimeUnit.NANOSECONDS.toMillis(this.latestStart)});
            } else {
                logger.debug("{}: started {} out of {} using {} threads.", new Object[]{activity, this.startedCount, this.count, parallelism});
            }
            if (this.doneCount > 0) {
                logger.debug("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).", new Object[]{activity, this.doneCount, this.count, parallelism, this.watch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.NANOSECONDS.toMillis(this.totalExecution) / (long)this.doneCount, TimeUnit.NANOSECONDS.toMillis(this.maxExecution)});
            } else {
                logger.debug("{}: completed {} out of {} using {} threads", new Object[]{activity, this.doneCount, this.count, parallelism});
            }
        }
    }
}

