/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SlotPoolPendingRequestFailureTest
extends TestLogger {
    private TestingResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() {
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @Test
    public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
        CompletableFuture allocationIdFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        try (TestingSlotPoolImpl slotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
            FlinkException cause = new FlinkException("Fail pending slot request failure.");
            Optional responseFuture = slotPool.failAllocation(allocationId, (Exception)cause);
            MatcherAssert.assertThat((Object)responseFuture.isPresent(), (Matcher)Matchers.is((Object)false));
            try {
                slotFuture.get();
                Assert.fail((String)"Expected a slot allocation failure.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.equalTo((Object)cause));
            }
        }
    }

    @Test
    public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
        ArrayList allocations = new ArrayList();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
        try (TestingSlotPoolImpl slotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);){
            CompletableFuture<PhysicalSlot> slotFuture1 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            CompletableFuture<PhysicalSlot> slotFuture2 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            AllocationID allocationId1 = (AllocationID)allocations.get(0);
            AllocationID allocationId2 = (AllocationID)allocations.get(1);
            LocalTaskManagerLocation location = new LocalTaskManagerLocation();
            SlotOffer slotOffer = new SlotOffer(allocationId2, 0, ResourceProfile.ANY);
            slotPool.registerTaskManager(location.getResourceID());
            slotPool.offerSlot(location, new SimpleAckingTaskManagerGateway(), slotOffer);
            MatcherAssert.assertThat((Object)slotFuture1.isDone(), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotFuture2.isDone(), (Matcher)Matchers.is((Object)false));
            FlinkException cause = new FlinkException("Fail pending slot request failure.");
            Optional responseFuture = slotPool.failAllocation(allocationId1, (Exception)cause);
            MatcherAssert.assertThat((Object)responseFuture.isPresent(), (Matcher)Matchers.is((Object)false));
            try {
                slotFuture2.getNow(null);
                Assert.fail((String)"Expected a slot allocation failure.");
            }
            catch (Throwable t) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripCompletionException((Throwable)t), (Matcher)Matchers.equalTo((Object)cause));
            }
        }
    }

    @Test
    public void testFailingResourceManagerRequestFailsPendingSlotRequestAndCancelsRMRequest() throws Exception {
        try (TestingSlotPoolImpl slotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);){
            CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<Acknowledge>();
            CompletableFuture cancelSlotFuture = new CompletableFuture();
            CompletableFuture requestSlotFutureAllocationId = new CompletableFuture();
            this.resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
            this.resourceManagerGateway.setCancelSlotConsumer(cancelSlotFuture::complete);
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            requestSlotFuture.completeExceptionally((Throwable)new FlinkException("Testing exception."));
            try {
                slotFuture.get();
                Assert.fail((String)"The slot future should not have been completed properly.");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingSlotRequestTimeout() throws Exception {
        ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        ComponentMainThreadExecutor componentMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadExecutor);
        TestingSlotPoolImpl slotPool = new SlotPoolBuilder(componentMainThreadExecutor).setResourceManagerGateway(this.resourceManagerGateway).build();
        try {
            Time timeout = Time.milliseconds((long)5L);
            CompletionStage slotFuture = CompletableFuture.supplyAsync(() -> SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId(), timeout), (Executor)componentMainThreadExecutor).thenCompose(Function.identity());
            try {
                ((CompletableFuture)slotFuture).get();
                Assert.fail((String)"Expected that the future completes with a TimeoutException.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TimeoutException.class));
            }
        }
        catch (Throwable throwable) {
            CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> ((SlotPoolImpl)slotPool).close()), (Executor)componentMainThreadExecutor).get();
            singleThreadExecutor.shutdownNow();
            throw throwable;
        }
        CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> ((SlotPoolImpl)slotPool).close()), (Executor)componentMainThreadExecutor).get();
        singleThreadExecutor.shutdownNow();
    }
}

