/*
 * Decompiled with CFR 0.152.
 */
package io.activej.async.process;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AsyncExecutor;
import io.activej.common.Checks;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.RetryPolicy;
import io.activej.promise.SettablePromise;
import io.activej.reactor.Reactor;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;

public class AsyncExecutors {
    public static AsyncExecutor direct() {
        return new AsyncExecutor(){

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) {
                return supplier.get();
            }
        };
    }

    public static AsyncExecutor ofReactor(final Reactor reactor) {
        return new AsyncExecutor(){

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) {
                Reactor currentReactor = Reactor.getCurrentReactor();
                if (reactor == currentReactor) {
                    return supplier.get();
                }
                return Promise.ofCallback(cb -> {
                    currentReactor.startExternalTask();
                    reactor.execute(() -> supplier.get().subscribe((result, e) -> {
                        currentReactor.execute(() -> cb.set(result, e));
                        currentReactor.completeExternalTask();
                    }));
                });
            }
        };
    }

    public static AsyncExecutor roundRobin(final List<AsyncExecutor> executors) {
        return new AsyncExecutor(){
            int index;

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) {
                AsyncExecutor executor = (AsyncExecutor)executors.get(this.index);
                this.index = (this.index + 1) % executors.size();
                return executor.execute(supplier);
            }
        };
    }

    public static AsyncExecutor sequential() {
        return AsyncExecutors.buffered(1, Integer.MAX_VALUE);
    }

    public static AsyncExecutor buffered(int maxParallelCalls) {
        return AsyncExecutors.buffered(maxParallelCalls, Integer.MAX_VALUE);
    }

    public static AsyncExecutor buffered(final int maxParallelCalls, final int maxBufferedCalls) {
        return new AsyncExecutor(){
            private int pendingCalls;
            private final ArrayDeque<Object> deque = new ArrayDeque();

            private void processBuffer() {
                while (this.pendingCalls < maxParallelCalls && !this.deque.isEmpty()) {
                    AsyncSupplier supplier = (AsyncSupplier)this.deque.pollFirst();
                    SettablePromise cb = (SettablePromise)this.deque.pollFirst();
                    ++this.pendingCalls;
                    Promise promise = supplier.get();
                    if (promise.isComplete()) {
                        --this.pendingCalls;
                        cb.set(promise.getResult(), promise.getException());
                        continue;
                    }
                    promise.subscribe((result, e) -> {
                        --this.pendingCalls;
                        this.processBuffer();
                        cb.set(result, e);
                    });
                }
            }

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) throws RejectedExecutionException {
                if (this.pendingCalls < maxParallelCalls) {
                    ++this.pendingCalls;
                    return supplier.get().whenComplete(() -> {
                        --this.pendingCalls;
                        this.processBuffer();
                    });
                }
                if (this.deque.size() > maxBufferedCalls) {
                    throw new RejectedExecutionException("Too many operations running");
                }
                SettablePromise result = new SettablePromise();
                this.deque.addLast(supplier);
                this.deque.addLast(result);
                return result;
            }
        };
    }

    public static AsyncExecutor retry(final RetryPolicy<?> retryPolicy) {
        return new AsyncExecutor(){

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) {
                return Promises.retry(supplier, retryPolicy);
            }
        };
    }

    public static AsyncExecutor ofMaxRecursiveCalls(final int maxRecursiveCalls) {
        Checks.checkArgument((maxRecursiveCalls >= 0 ? 1 : 0) != 0, (Object)"Number of recursive calls cannot be less than 0");
        return new AsyncExecutor(){
            private final int maxCalls;
            private int counter;
            {
                this.maxCalls = maxRecursiveCalls + 1;
                this.counter = 0;
            }

            @Override
            public <T> Promise<T> execute(AsyncSupplier<T> supplier) {
                Promise<T> promise = supplier.get();
                if (promise.isComplete()) {
                    if (++this.counter % this.maxCalls == 0) {
                        this.counter = 0;
                        return promise.async();
                    }
                } else {
                    this.counter = 0;
                }
                return promise;
            }
        };
    }
}

