/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.runtime.concurrent.FixedRetryStrategy;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.RetryStrategy;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class FutureUtilsTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorResource(Executors::newSingleThreadScheduledExecutor);

    @Test
    public void testRetrySuccess() throws Exception {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 10) {
                return true;
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, TestingUtils.defaultExecutor()), (int)10, (Executor)TestingUtils.defaultExecutor());
        Assert.assertTrue((boolean)((Boolean)retryFuture.get()));
        Assert.assertEquals((long)10L, (long)atomicInteger.get());
    }

    @Test(expected=FutureUtils.RetryException.class)
    public void testRetryFailureFixedRetries() throws Throwable {
        int retries = 3;
        CompletableFuture retryFuture = FutureUtils.retry(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)3, (Executor)TestingUtils.defaultExecutor());
        try {
            retryFuture.get();
        }
        catch (ExecutionException ee) {
            throw ExceptionUtils.stripExecutionException((Throwable)ee);
        }
    }

    @Test
    public void testRetryCancellation() throws Exception {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch notificationLatch = new OneShotLatch();
        OneShotLatch waitLatch = new OneShotLatch();
        AtomicReference<Object> atomicThrowable = new AtomicReference<Object>(null);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 2) {
                notificationLatch.trigger();
                try {
                    waitLatch.await();
                }
                catch (InterruptedException e) {
                    atomicThrowable.compareAndSet(null, e);
                }
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, TestingUtils.defaultExecutor()), (int)10, (Executor)TestingUtils.defaultExecutor());
        notificationLatch.await();
        Assert.assertFalse((boolean)retryFuture.isDone());
        retryFuture.cancel(false);
        waitLatch.trigger();
        Assert.assertTrue((boolean)retryFuture.isCancelled());
        Assert.assertEquals((long)2L, (long)atomicInteger.get());
        if (atomicThrowable.get() != null) {
            throw new FlinkException("Exception occurred in the retry operation.", (Throwable)atomicThrowable.get());
        }
    }

    @Test
    public void testStopAtNonRetryableException() {
        int retries = 10;
        int notRetry = 3;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        FlinkRuntimeException nonRetryableException = new FlinkRuntimeException("Non-retryable exception");
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 3) {
                throw new CompletionException((Throwable)nonRetryableException);
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, TestingUtils.defaultExecutor()), (int)10, throwable -> ExceptionUtils.findThrowable((Throwable)throwable, FlinkException.class).isPresent(), (Executor)TestingUtils.defaultExecutor());
        try {
            retryFuture.get();
            Assert.fail((String)"Exception should be thrown.");
        }
        catch (Exception ex) {
            Assert.assertThat((Object)ex, (Matcher)FlinkMatchers.containsCause((Throwable)nonRetryableException));
        }
        Assert.assertThat((Object)atomicInteger.get(), (Matcher)Matchers.is((Object)3));
    }

    @Test(expected=FutureUtils.RetryException.class)
    public void testRetryWithDelayFixedArgsFailure() throws Throwable {
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)3, (Time)Time.milliseconds((long)1L), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        try {
            retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException ee) {
            throw ExceptionUtils.stripExecutionException((Throwable)ee);
        }
    }

    @Test(expected=FutureUtils.RetryException.class)
    public void testRetryWithDelayRetryStrategyFailure() throws Throwable {
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (RetryStrategy)new FixedRetryStrategy(3, Duration.ofMillis(1L)), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        try {
            retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException ee) {
            throw ExceptionUtils.stripExecutionException((Throwable)ee);
        }
    }

    @Test
    public void testRetryWithDelayFixedArgs() throws Exception {
        int retries = 4;
        Time delay = Time.milliseconds((long)5L);
        AtomicInteger countDown = new AtomicInteger(4);
        long start = System.currentTimeMillis();
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> {
            if (countDown.getAndDecrement() == 0) {
                return CompletableFuture.completedFuture(true);
            }
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception."));
        }, (int)4, (Time)delay, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        Boolean result = (Boolean)retryFuture.get();
        long completionTime = System.currentTimeMillis() - start;
        Assert.assertTrue((boolean)result);
        Assert.assertTrue((String)"The completion time should be at least retries times delay between retries.", (completionTime >= 4L * delay.toMilliseconds() ? 1 : 0) != 0);
    }

    @Test
    public void testRetryWithDelayRetryStrategy() throws Exception {
        int retries = 4;
        Time delay = Time.milliseconds((long)5L);
        AtomicInteger countDown = new AtomicInteger(4);
        long start = System.currentTimeMillis();
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> {
            if (countDown.getAndDecrement() == 0) {
                return CompletableFuture.completedFuture(true);
            }
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception."));
        }, (RetryStrategy)new ExponentialBackoffRetryStrategy(4, Duration.ofMillis(2L), Duration.ofMillis(5L)), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        Boolean result = (Boolean)retryFuture.get();
        long completionTime = System.currentTimeMillis() - start;
        Assert.assertTrue((boolean)result);
        Assert.assertTrue((String)"The completion time should be at least retries times delay between retries.", (completionTime >= 16L ? 1 : 0) != 0);
    }

    @Test
    public void testRetryWithDelayCancellation() {
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)1, (Time)TestingUtils.infiniteTime(), (ScheduledExecutor)scheduledExecutor);
        Assert.assertFalse((boolean)retryFuture.isDone());
        Collection<ScheduledFuture<?>> scheduledTasks = scheduledExecutor.getScheduledTasks();
        Assert.assertFalse((boolean)scheduledTasks.isEmpty());
        ScheduledFuture<?> scheduledFuture = scheduledTasks.iterator().next();
        Assert.assertFalse((boolean)scheduledFuture.isDone());
        retryFuture.cancel(false);
        Assert.assertTrue((boolean)retryFuture.isCancelled());
        Assert.assertTrue((boolean)scheduledFuture.isCancelled());
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        int expectedResult = 42;
        CompletableFuture completableFuture = FutureUtils.scheduleWithDelay(() -> 42, (Time)Time.milliseconds((long)0L), (ScheduledExecutor)scheduledExecutor);
        scheduledExecutor.triggerScheduledTasks();
        int actualResult = (Integer)completableFuture.get();
        Assert.assertEquals((long)42L, (long)actualResult);
    }

    @Test
    public void testScheduleWithDelayCancellation() {
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        Runnable noOpRunnable = () -> {};
        CompletableFuture completableFuture = FutureUtils.scheduleWithDelay((Runnable)noOpRunnable, (Time)TestingUtils.infiniteTime(), (ScheduledExecutor)scheduledExecutor);
        ScheduledFuture<?> scheduledFuture = scheduledExecutor.getScheduledTasks().iterator().next();
        completableFuture.cancel(false);
        Assert.assertTrue((boolean)completableFuture.isCancelled());
        Assert.assertTrue((boolean)scheduledFuture.isCancelled());
    }

    @Test
    public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
        Runnable noOpRunnable = () -> {};
        CompletableFuture completableFuture = FutureUtils.scheduleWithDelay((Runnable)noOpRunnable, (Time)TestingUtils.infiniteTime(), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        Assert.assertFalse((boolean)completableFuture.isDone());
        completableFuture.cancel(false);
    }

    @Test
    public void testOrTimeout() throws Exception {
        CompletableFuture future = new CompletableFuture();
        long timeout = 10L;
        FutureUtils.orTimeout(future, (long)10L, (TimeUnit)TimeUnit.MILLISECONDS);
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
        }
    }

    @Test
    public void testRetryWithDelayAndPredicate() throws Exception {
        ScheduledExecutorService retryExecutor = (ScheduledExecutorService)TEST_EXECUTOR_RESOURCE.getExecutor();
        String retryableExceptionMessage = "first exception";
        try {
            class TestStringSupplier
            implements Supplier<CompletableFuture<String>> {
                private final AtomicInteger counter = new AtomicInteger();

                TestStringSupplier() {
                }

                @Override
                public CompletableFuture<String> get() {
                    if (this.counter.getAndIncrement() == 0) {
                        return FutureUtils.completedExceptionally((Throwable)new RuntimeException("first exception"));
                    }
                    return FutureUtils.completedExceptionally((Throwable)new RuntimeException("should propagate"));
                }
            }
            FutureUtils.retryWithDelay((Supplier)new TestStringSupplier(), (int)1, (Time)Time.seconds((long)0L), throwable -> throwable instanceof RuntimeException && throwable.getMessage().contains("first exception"), (ScheduledExecutor)new ScheduledExecutorServiceAdapter(retryExecutor)).get();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"should propagate"));
        }
    }

    @Test
    public void testRunAfterwards() throws Exception {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch runnableLatch = new OneShotLatch();
        CompletableFuture runFuture = FutureUtils.runAfterwards(inputFuture, () -> ((OneShotLatch)runnableLatch).trigger());
        Assert.assertThat((Object)runnableLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)runFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.complete(null);
        Assert.assertThat((Object)runnableLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)runFuture.isDone(), (Matcher)Matchers.is((Object)true));
        runFuture.get();
    }

    @Test
    public void testRunAfterwardsExceptional() throws Exception {
        CompletableFuture inputFuture = new CompletableFuture();
        OneShotLatch runnableLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture runFuture = FutureUtils.runAfterwards(inputFuture, () -> ((OneShotLatch)runnableLatch).trigger());
        Assert.assertThat((Object)runnableLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)runFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.completeExceptionally((Throwable)testException);
        Assert.assertThat((Object)runnableLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)runFuture.isDone(), (Matcher)Matchers.is((Object)true));
        try {
            runFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.is((Object)testException));
        }
    }

    @Test
    public void testComposeAfterwards() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch composeLatch = new OneShotLatch();
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.complete(null);
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        composeFuture.get();
    }

    @Test
    public void testComposeAfterwardsFirstExceptional() throws InterruptedException {
        CompletableFuture inputFuture = new CompletableFuture();
        OneShotLatch composeLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.completeExceptionally((Throwable)testException);
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        try {
            composeFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.is((Object)testException));
        }
    }

    @Test
    public void testComposeAfterwardsSecondExceptional() throws InterruptedException {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch composeLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return FutureUtils.completedExceptionally((Throwable)testException);
        });
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.complete(null);
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        try {
            composeFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.is((Object)testException));
        }
    }

    @Test
    public void testComposeAfterwardsBothExceptional() throws InterruptedException {
        CompletableFuture inputFuture = new CompletableFuture();
        FlinkException testException1 = new FlinkException("Test exception1");
        FlinkException testException2 = new FlinkException("Test exception2");
        OneShotLatch composeLatch = new OneShotLatch();
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return FutureUtils.completedExceptionally((Throwable)testException2);
        });
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        inputFuture.completeExceptionally((Throwable)testException1);
        Assert.assertThat((Object)composeLatch.isTriggered(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)composeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        try {
            composeFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Throwable actual = ExceptionUtils.stripExecutionException((Throwable)ee);
            Assert.assertThat((Object)actual, (Matcher)Matchers.is((Object)testException1));
            Assert.assertThat((Object)actual.getSuppressed(), (Matcher)Matchers.arrayWithSize((int)1));
            Assert.assertThat((Object)actual.getSuppressed()[0], (Matcher)Matchers.is((Object)testException2));
        }
    }

    @Test
    public void testCompleteAll() throws Exception {
        CompletableFuture<String> inputFuture1 = new CompletableFuture<String>();
        CompletableFuture<Integer> inputFuture2 = new CompletableFuture<Integer>();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)completeFuture.getNumFuturesTotal(), (Matcher)Matchers.is((Object)futuresToComplete.size()));
        inputFuture2.complete(42);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)1));
        inputFuture1.complete("foobar");
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)2));
        completeFuture.get();
    }

    @Test
    public void testCompleteAllPartialExceptional() throws Exception {
        CompletableFuture<String> inputFuture1 = new CompletableFuture<String>();
        CompletableFuture inputFuture2 = new CompletableFuture();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)completeFuture.getNumFuturesTotal(), (Matcher)Matchers.is((Object)futuresToComplete.size()));
        FlinkException testException1 = new FlinkException("Test exception 1");
        inputFuture2.completeExceptionally((Throwable)testException1);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)1));
        inputFuture1.complete("foobar");
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)2));
        try {
            completeFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.is((Object)testException1));
        }
    }

    @Test
    public void testCompleteAllExceptional() throws Exception {
        CompletableFuture inputFuture1 = new CompletableFuture();
        CompletableFuture inputFuture2 = new CompletableFuture();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)completeFuture.getNumFuturesTotal(), (Matcher)Matchers.is((Object)futuresToComplete.size()));
        FlinkException testException1 = new FlinkException("Test exception 1");
        inputFuture1.completeExceptionally((Throwable)testException1);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)1));
        FlinkException testException2 = new FlinkException("Test exception 2");
        inputFuture2.completeExceptionally((Throwable)testException2);
        Assert.assertThat((Object)completeFuture.isDone(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)completeFuture.getNumFuturesCompleted(), (Matcher)Matchers.is((Object)2));
        try {
            completeFuture.get();
            Assert.fail((String)"Expected an exceptional completion");
        }
        catch (ExecutionException ee) {
            Throwable actual = ExceptionUtils.stripExecutionException((Throwable)ee);
            Throwable[] suppressed = actual.getSuppressed();
            FlinkException suppressedException = actual.equals(testException1) ? testException2 : testException1;
            Assert.assertThat((Object)suppressed, (Matcher)Matchers.is((Matcher)Matchers.not((Matcher)Matchers.emptyArray())));
            Assert.assertThat((Object)suppressed, (Matcher)Matchers.arrayContaining((Object[])new Throwable[]{suppressedException}));
        }
    }

    @Test
    public void testSupplyAsyncFailure() throws Exception {
        String exceptionMessage = "Test exception";
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture future = FutureUtils.supplyAsync(() -> {
            throw testException;
        }, (Executor)TestingUtils.defaultExecutor());
        try {
            future.get();
            Assert.fail((String)"Expected an exception.");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testSupplyAsync() throws Exception {
        CompletableFuture future = FutureUtils.supplyAsync(Acknowledge::get, (Executor)TestingUtils.defaultExecutor());
        Assert.assertThat(future.get(), (Matcher)Matchers.is((Object)Acknowledge.get()));
    }

    @Test
    public void testHandleAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.handleAsyncIfNotDone((CompletableFuture)future, (Executor)executor, (o, t) -> null));
    }

    @Test
    public void testApplyAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenApplyAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> null));
    }

    @Test
    public void testComposeAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenComposeAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> null));
    }

    @Test
    public void testWhenCompleteAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.whenCompleteAsyncIfNotDone((CompletableFuture)future, (Executor)executor, (o, throwable) -> {}));
    }

    @Test
    public void testThenAcceptAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenAcceptAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> {}));
    }

    private void testFutureContinuation(BiFunction<CompletableFuture<?>, Executor, CompletableFuture<?>> testFunctionGenerator) {
        CompletableFuture<Object> startFuture = new CompletableFuture<Object>();
        AtomicBoolean runWithExecutor = new AtomicBoolean(false);
        Executor executor = r -> {
            r.run();
            runWithExecutor.set(true);
        };
        CompletableFuture<?> continuationFuture = testFunctionGenerator.apply(startFuture, executor);
        Assert.assertFalse((boolean)continuationFuture.isDone());
        startFuture.complete(null);
        Assert.assertTrue((boolean)runWithExecutor.get());
        Assert.assertTrue((boolean)continuationFuture.isDone());
        runWithExecutor.set(false);
        continuationFuture = testFunctionGenerator.apply(startFuture, executor);
        Assert.assertFalse((boolean)runWithExecutor.get());
        Assert.assertTrue((boolean)continuationFuture.isDone());
    }

    @Test
    public void testHandleUncaughtExceptionWithCompletedFuture() {
        CompletableFuture<String> future = CompletableFuture.completedFuture("foobar");
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assert.assertThat((Object)uncaughtExceptionHandler.hasBeenCalled(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testHandleUncaughtExceptionWithNormalCompletion() {
        CompletableFuture<String> future = new CompletableFuture<String>();
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        future.complete("barfoo");
        Assert.assertThat((Object)uncaughtExceptionHandler.hasBeenCalled(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
        CompletableFuture future = FutureUtils.completedExceptionally((Throwable)new FlinkException("foobar"));
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException((CompletableFuture)future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assert.assertThat((Object)uncaughtExceptionHandler.hasBeenCalled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testHandleUncaughtExceptionWithExceptionallyCompletion() {
        CompletableFuture future = new CompletableFuture();
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assert.assertThat((Object)uncaughtExceptionHandler.hasBeenCalled(), (Matcher)Matchers.is((Object)false));
        future.completeExceptionally((Throwable)new FlinkException("barfoo"));
        Assert.assertThat((Object)uncaughtExceptionHandler.hasBeenCalled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testForwardNormal() throws Exception {
        CompletableFuture<String> source = new CompletableFuture<String>();
        CompletableFuture target = new CompletableFuture();
        FutureUtils.forward(source, target);
        Assert.assertThat((Object)target.isDone(), (Matcher)Matchers.is((Object)source.isDone()));
        source.complete("foobar");
        Assert.assertThat((Object)target.isDone(), (Matcher)Matchers.is((Object)source.isDone()));
        Assert.assertThat(target.get(), (Matcher)Matchers.is((Matcher)CoreMatchers.equalTo(source.get())));
    }

    @Test
    public void testForwardExceptionally() {
        CompletableFuture source = new CompletableFuture();
        CompletableFuture target = new CompletableFuture();
        FutureUtils.forward(source, target);
        Assert.assertThat((Object)target.isDone(), (Matcher)Matchers.is((Object)source.isDone()));
        source.completeExceptionally((Throwable)new FlinkException("foobar"));
        Assert.assertThat((Object)target.isDone(), (Matcher)Matchers.is((Object)source.isDone()));
        Throwable targetException = FutureUtilsTest.getThrowable(target);
        Throwable actualException = FutureUtilsTest.getThrowable(source);
        Assert.assertThat((Object)targetException, (Matcher)Matchers.is((Matcher)CoreMatchers.equalTo((Object)actualException)));
    }

    @Test
    public void testForwardAsync() throws Exception {
        CompletableFuture<String> source = new CompletableFuture<String>();
        CompletableFuture target = new CompletableFuture();
        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
        FutureUtils.forwardAsync(source, target, (Executor)((Object)executor));
        source.complete("foobar");
        Assert.assertThat((Object)target.isDone(), (Matcher)Matchers.is((Object)false));
        executor.triggerAll();
        Assert.assertThat(target.get(), (Matcher)Matchers.is((Matcher)CoreMatchers.equalTo(source.get())));
    }

    @Test
    public void testGetWithoutException() {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<Integer>();
        completableFuture.complete(1);
        Assert.assertEquals((Object)new Integer(1), (Object)FutureUtils.getWithoutException(completableFuture));
    }

    @Test
    public void testGetWithoutExceptionWithAnException() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new RuntimeException("expected"));
        Assert.assertNull((Object)FutureUtils.getWithoutException(completableFuture));
    }

    @Test
    public void testGetWithoutExceptionWithoutFinishing() {
        CompletableFuture completableFuture = new CompletableFuture();
        Assert.assertNull((Object)FutureUtils.getWithoutException(completableFuture));
    }

    @Test
    public void testSwitchExecutorForNormallyCompletedFuture() {
        CompletableFuture<String> source = new CompletableFuture<String>();
        ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture resultFuture = FutureUtils.switchExecutor(source, (Executor)singleThreadExecutor);
        String expectedThreadName = (String)FutureUtils.supplyAsync(() -> Thread.currentThread().getName(), (Executor)singleThreadExecutor).join();
        String expectedValue = "foobar";
        CompletionStage assertionFuture = resultFuture.handle((s, throwable) -> {
            Assert.assertThat((Object)s, (Matcher)Matchers.is((Object)"foobar"));
            Assert.assertThat((Object)Thread.currentThread().getName(), (Matcher)Matchers.is((Object)expectedThreadName));
            return null;
        });
        source.complete("foobar");
        ((CompletableFuture)assertionFuture).join();
    }

    @Test
    public void testSwitchExecutorForExceptionallyCompletedFuture() {
        CompletableFuture source = new CompletableFuture();
        ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture resultFuture = FutureUtils.switchExecutor(source, (Executor)singleThreadExecutor);
        String expectedThreadName = (String)FutureUtils.supplyAsync(() -> Thread.currentThread().getName(), (Executor)singleThreadExecutor).join();
        Exception expectedException = new Exception("foobar");
        CompletionStage assertionFuture = resultFuture.handle((s, throwable) -> {
            Assert.assertThat((Object)throwable, (Matcher)FlinkMatchers.containsCause((Throwable)expectedException));
            Assert.assertThat((Object)Thread.currentThread().getName(), (Matcher)Matchers.is((Object)expectedThreadName));
            return null;
        });
        source.completeExceptionally(expectedException);
        ((CompletableFuture)assertionFuture).join();
    }

    private static Throwable getThrowable(CompletableFuture<?> completableFuture) {
        try {
            completableFuture.join();
        }
        catch (CompletionException e) {
            return e.getCause();
        }
        throw new AssertionError((Object)"Future has not been completed exceptionally.");
    }

    private static class TestingUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private Throwable exception = null;

        private TestingUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.exception = e;
        }

        private boolean hasBeenCalled() {
            return this.exception != null;
        }
    }
}

