/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskManagerCheckInSlotManagerTest
extends TestLogger {
    private static final ResourceID resourceID = ResourceID.generate();
    private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
    private static final SlotID slotId = new SlotID(resourceID, 0);
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1);
    private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
    private static final SlotReport slotReport = new SlotReport(slotStatus);
    private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference();
    private final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setCanBeReleasedSupplier(this.canBeReleasedFuture::get).createTestingTaskExecutorGateway();
    private final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, this.taskExecutorGateway);
    private CompletableFuture<InstanceID> releaseFuture;
    private ResourceActions resourceManagerActions;
    private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
    private final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
    private final AtomicInteger releaseResourceCalls = new AtomicInteger(0);

    @Before
    public void setup() {
        this.canBeReleasedFuture.set(new CompletableFuture());
        this.releaseFuture = new CompletableFuture();
        this.allocateResourceCalls.getAndSet(0);
        this.releaseResourceCalls.getAndSet(0);
        this.resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> {
            this.releaseFuture.complete((InstanceID)instanceID);
            this.releaseResourceCalls.incrementAndGet();
        }).setAllocateResourceConsumer(ignored -> this.allocateResourceCalls.incrementAndGet()).build();
        this.mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        this.checkTaskManagerTimeout(0);
    }

    @Test
    public void testTaskManagerTimeoutWithRedundantTaskManager() throws Exception {
        this.checkTaskManagerTimeout(1);
    }

    @Test
    public void testTaskManagerTimeoutWithZeroRedundantTaskManager() throws Exception {
        this.registerAndCheckMultiTaskManagers(0);
        Assert.assertThat((Object)this.allocateResourceCalls.get(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)this.releaseResourceCalls.get(), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void testTaskManagerTimeoutWithOneRedundantTaskManager() throws Exception {
        this.registerAndCheckMultiTaskManagers(1);
        Assert.assertThat((Object)this.allocateResourceCalls.get(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)this.releaseResourceCalls.get(), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void testTaskManagerTimeoutWithTwoRedundantTaskManager() throws Exception {
        this.registerAndCheckMultiTaskManagers(2);
        Assert.assertThat((Object)this.allocateResourceCalls.get(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)this.releaseResourceCalls.get(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testTaskManagerTimeoutWithThreeRedundantTaskManager() throws Exception {
        this.registerAndCheckMultiTaskManagers(3);
        Assert.assertThat((Object)this.allocateResourceCalls.get(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)this.releaseResourceCalls.get(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
        try (SlotManagerImpl slotManager = this.createAndStartSlotManagerWithTM();){
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
            this.verifyTmReleased(false);
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
            this.verifyTmReleased(true);
        }
    }

    @Test
    public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
        try (SlotManagerImpl slotManager = this.createAndStartSlotManagerWithTM();){
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> {
                AllocationID allocationID = new AllocationID();
                slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
                this.mainThreadExecutor.triggerAll();
                Thread.sleep(1L);
                slotManager.freeSlot(slotId, allocationID);
            });
            this.verifyTmReleased(false);
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
            this.verifyTmReleased(true);
        }
    }

    private void checkTaskManagerTimeout(int redundantTaskManagerNum) throws Exception {
        this.canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
        try (SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setTaskManagerTimeout(Time.milliseconds((long)10L)).setRedundantTaskManagerNum(redundantTaskManagerNum).buildAndStartWithDirectExec(resourceManagerId, this.resourceManagerActions);){
            slotManager.registerTaskManager(this.taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assert.assertThat((Object)this.releaseFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerConnection.getInstanceID())));
        }
    }

    private void registerAndCheckMultiTaskManagers(int redundantTaskManagerNum) throws Exception {
        SlotManagerImpl slotManager = this.createAndStartSlotManager(redundantTaskManagerNum, 2);
        this.registerTaskManagerWithTwoSlots(slotManager, true, true);
        this.registerTaskManagerWithTwoSlots(slotManager, false, false);
        this.registerTaskManagerWithTwoSlots(slotManager, false, true);
        this.registerTaskManagerWithTwoSlots(slotManager, true, false);
        this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
    }

    private void registerTaskManagerWithTwoSlots(SlotManagerImpl slotManager, boolean slot0Free, boolean slot1Free) {
        this.canBeReleasedFuture.set(new CompletableFuture());
        ResourceID resourceID = ResourceID.generate();
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1);
        JobID jobID = new JobID();
        SlotID slotId0 = new SlotID(resourceID, 0);
        SlotStatus slotStatus0 = slot0Free ? new SlotStatus(slotId0, resourceProfile) : new SlotStatus(slotId0, resourceProfile, jobID, new AllocationID());
        SlotID slotId1 = new SlotID(resourceID, 1);
        SlotStatus slotStatus1 = slot1Free ? new SlotStatus(slotId1, resourceProfile) : new SlotStatus(slotId1, resourceProfile, jobID, new AllocationID());
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus0, slotStatus1));
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setCanBeReleasedSupplier(this.canBeReleasedFuture::get).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        this.mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY));
    }

    private SlotManagerImpl createAndStartSlotManagerWithTM() {
        SlotManagerImpl slotManager = this.createAndStartSlotManager(0, 1);
        this.mainThreadExecutor.execute(() -> slotManager.registerTaskManager(this.taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY));
        return slotManager;
    }

    private SlotManagerImpl createAndStartSlotManager(int redundantTaskManagerNum, int numSlotsPerWorker) {
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setScheduledExecutor(this.mainThreadExecutor).setTaskManagerTimeout(Time.milliseconds((long)0L)).setRedundantTaskManagerNum(redundantTaskManagerNum).setNumSlotsPerWorker(numSlotsPerWorker).build();
        slotManager.start(resourceManagerId, (Executor)((Object)this.mainThreadExecutor), this.resourceManagerActions);
        return slotManager;
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManager, boolean canBeReleased) throws Exception {
        this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {});
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManager, boolean canBeReleased, RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
        this.canBeReleasedFuture.set(new CompletableFuture());
        this.mainThreadExecutor.execute(() -> ((SlotManagerImpl)slotManager).checkTaskManagerTimeoutsAndRedundancy());
        this.mainThreadExecutor.triggerAll();
        doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
        this.canBeReleasedFuture.get().complete(canBeReleased);
        this.mainThreadExecutor.triggerAll();
    }

    private void verifyTmReleased(boolean isTmReleased) {
        Assert.assertThat((Object)this.releaseFuture.isDone(), (Matcher)Matchers.is((Object)isTmReleased));
        if (isTmReleased) {
            Assert.assertThat((Object)this.releaseFuture.join(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerConnection.getInstanceID())));
        }
    }
}

