/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.ws.emr.hadoop.fs.concurrent;

import com.amazon.ws.emr.hadoop.fs.concurrent.BlockingQueueConsumer;
import com.amazon.ws.emr.hadoop.fs.concurrent.BlockingQueueProducer;
import com.amazon.ws.emr.hadoop.fs.concurrent.Consumer;
import com.amazon.ws.emr.hadoop.fs.concurrent.Producer;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Optional;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

public class ProducerConsumerExecutor {
    public synchronized <T> void execute(Collection<Producer<T>> producers, Collection<Consumer<T>> consumers) {
        this.doExecute(producers, consumers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void doExecute(Collection<Producer<T>> producers, Collection<Consumer<T>> consumers) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(String.format("Executor:%d:thread:%%d", System.identityHashCode(this))).build();
        ExecutorService pool = Executors.newFixedThreadPool(producers.size() + consumers.size(), threadFactory);
        try {
            ArrayBlockingQueue queue = new ArrayBlockingQueue(1000);
            ArrayList<Future> producerFutures = new ArrayList<Future>();
            for (Producer<T> p : producers) {
                BlockingQueueProducer blockingQueueProducer = new BlockingQueueProducer(queue, p);
                producerFutures.add(pool.submit(blockingQueueProducer));
            }
            ArrayList<Future> consumerFutures = new ArrayList<Future>();
            for (Consumer<T> c : consumers) {
                BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(queue, c);
                consumerFutures.add(pool.submit(blockingQueueConsumer));
            }
            this.waitForExecutionToFinish(queue, producerFutures, consumerFutures);
        }
        finally {
            pool.shutdownNow();
        }
    }

    private void waitForExecutionToFinish(BlockingQueue queue, List<Future> producerFutures, List<Future> consumerFutures) {
        try {
            for (Future f : producerFutures) {
                f.get();
            }
            for (Future f : consumerFutures) {
                queue.put(Optional.absent());
            }
            for (Future f : consumerFutures) {
                f.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            this.cancelAll(producerFutures);
            this.cancelAll(consumerFutures);
            throw new RuntimeException(e);
        }
    }

    private void cancelAll(Collection<Future> futures) {
        for (Future f : futures) {
            f.cancel(true);
        }
    }
}

