/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriFunction;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DeclarativeSlotPoolBridgeResourceDeclarationTest
extends TestLogger {
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private RequirementListener requirementListener;
    private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

    @Before
    public void setup() throws Exception {
        this.requirementListener = new RequirementListener();
        TestingDeclarativeSlotPoolBuilder slotPoolBuilder = TestingDeclarativeSlotPool.builder().setIncreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.increaseRequirements(x$0)).setDecreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.decreaseRequirements(x$0)).setReserveFreeSlotFunction((allocationId, resourceProfile) -> DeclarativeSlotPoolBridgeTest.createAllocatedSlot(allocationId)).setFreeReservedSlotFunction((TriFunction<AllocationID, Throwable, Long, ResourceCounter>)((TriFunction)(allocationID, throwable, aLong) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))).setReleaseSlotFunction((allocationID, e) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1));
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
        this.declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);
    }

    @After
    public void teardown() throws Exception {
        if (this.declarativeSlotPoolBridge != null) {
            this.declarativeSlotPoolBridge.close();
        }
    }

    @Test
    public void testRequirementsIncreasedOnNewAllocation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes((long)5L));
        Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        try {
            ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
            CompletableFuture allocationFuture = CompletableFuture.supplyAsync(() -> this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.milliseconds((long)5L)), (Executor)mainThreadExecutor).get();
            Assert.assertThat((Object)allocationFuture, (Matcher)FlinkMatchers.futureWillCompleteExceptionally((Duration)Duration.ofMinutes(1L)));
            CompletableFuture.runAsync(() -> Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)0)), (Executor)mainThreadExecutor).join();
        }
        finally {
            scheduledExecutorService.shutdown();
        }
    }

    @Test
    public void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)0));
    }

    @Test
    public void testRequirementsIncreasedOnSlotReservation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void testRequirementsDecreasedOnSlotFreeing() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.releaseSlot(slotRequestId, (Throwable)new RuntimeException("Test exception"));
        Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)0));
    }

    @Test
    public void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.failAllocation(newSlot.getTaskManagerLocation().getResourceID(), newSlot.getAllocationId(), (Exception)new RuntimeException("Test exception"));
        Assert.assertThat((Object)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)0));
    }

    private static final class RequirementListener {
        private ResourceCounter requirements = ResourceCounter.empty();

        private RequirementListener() {
        }

        private void increaseRequirements(ResourceCounter requirements) {
            this.requirements = this.requirements.add(requirements);
        }

        private void decreaseRequirements(ResourceCounter requirements) {
            this.requirements = this.requirements.subtract(requirements);
        }

        public ResourceCounter getRequirements() {
            return this.requirements;
        }
    }
}

