/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatListener;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatListenerBuilder;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatTarget;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatTargetBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatManagerTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
    public static final long HEARTBEAT_INTERVAL = 50L;
    public static final long HEARTBEAT_TIMEOUT = 200L;

    @Test
    public void testRegularHeartbeat() throws InterruptedException {
        long heartbeatTimeout = 1000L;
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        int outputPayload = 42;
        ArrayBlockingQueue reportedPayloads = new ArrayBlockingQueue(2);
        TestingHeartbeatListener<String, Integer> heartbeatListener = new TestingHeartbeatListenerBuilder().setReportPayloadConsumer((ignored, payload) -> reportedPayloads.offer(payload)).setRetrievePayloadFunction(ignored -> 42).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(1000L, ownResourceID, heartbeatListener, TestingUtils.defaultScheduledExecutor(), LOG);
        ArrayBlockingQueue reportedPayloadsHeartbeatTarget = new ArrayBlockingQueue(2);
        TestingHeartbeatTarget<Integer> heartbeatTarget = new TestingHeartbeatTargetBuilder<Integer>().setReceiveHeartbeatConsumer((ignoredA, payload) -> reportedPayloadsHeartbeatTarget.offer(payload)).createTestingHeartbeatTarget();
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        String inputPayload1 = "foobar";
        heartbeatManager.requestHeartbeat(targetResourceID, (Object)"foobar");
        Assert.assertThat(reportedPayloads.take(), (Matcher)Matchers.is((Object)"foobar"));
        Assert.assertThat(reportedPayloadsHeartbeatTarget.take(), (Matcher)Matchers.is((Object)42));
        String inputPayload2 = "barfoo";
        heartbeatManager.receiveHeartbeat(targetResourceID, (Object)"barfoo");
        Assert.assertThat(reportedPayloads.take(), (Matcher)Matchers.is((Object)"barfoo"));
    }

    @Test
    public void testHeartbeatMonitorUpdate() {
        long heartbeatTimeout = 1000L;
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        ScheduledFuture scheduledFuture = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ((ScheduledExecutor)Mockito.doReturn((Object)scheduledFuture).when((Object)scheduledExecutor)).schedule((Runnable)org.mockito.Matchers.any(Runnable.class), org.mockito.Matchers.anyLong(), (TimeUnit)((Object)org.mockito.Matchers.any(TimeUnit.class)));
        Object expectedObject = new Object();
        Mockito.when((Object)heartbeatListener.retrievePayload((ResourceID)org.mockito.Matchers.any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, ownResourceID, heartbeatListener, scheduledExecutor, LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
        ((ScheduledFuture)Mockito.verify((Object)scheduledFuture, (VerificationMode)Mockito.times((int)1))).cancel(true);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)2))).schedule((Runnable)org.mockito.Matchers.any(Runnable.class), org.mockito.Matchers.eq((long)heartbeatTimeout), (TimeUnit)((Object)org.mockito.Matchers.eq((Object)((Object)TimeUnit.MILLISECONDS))));
    }

    @Test
    public void testHeartbeatTimeout() throws Exception {
        int numHeartbeats = 6;
        int payload = 42;
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        CompletableFuture timeoutFuture = new CompletableFuture();
        TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(ignored -> 42).setNotifyHeartbeatTimeoutConsumer(timeoutFuture::complete).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(200L, ownResourceID, heartbeatListener, TestingUtils.defaultScheduledExecutor(), LOG);
        TestingHeartbeatTarget heartbeatTarget = new TestingHeartbeatTargetBuilder().createTestingHeartbeatTarget();
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        for (int i = 0; i < numHeartbeats; ++i) {
            heartbeatManager.receiveHeartbeat(targetResourceID, (Object)42);
            Thread.sleep(50L);
        }
        Assert.assertFalse((boolean)timeoutFuture.isDone());
        ResourceID timeoutResourceID = (ResourceID)timeoutFuture.get(400L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)targetResourceID, (Object)timeoutResourceID);
    }

    @Test
    public void testHeartbeatCluster() throws Exception {
        ResourceID resourceIdTarget = new ResourceID("foobar");
        ResourceID resourceIDSender = new ResourceID("barfoo");
        int targetPayload = 42;
        AtomicInteger numReportPayloadCallsTarget = new AtomicInteger(0);
        TestingHeartbeatListener<String, Integer> heartbeatListenerTarget = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(ignored -> 42).setReportPayloadConsumer((ignoredA, ignoredB) -> numReportPayloadCallsTarget.incrementAndGet()).createNewTestingHeartbeatListener();
        String senderPayload = "1337";
        CompletableFuture targetHeartbeatTimeoutFuture = new CompletableFuture();
        AtomicInteger numReportPayloadCallsSender = new AtomicInteger(0);
        TestingHeartbeatListener<Integer, String> heartbeatListenerSender = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(ignored -> "1337").setNotifyHeartbeatTimeoutConsumer(targetHeartbeatTimeoutFuture::complete).setReportPayloadConsumer((ignoredA, ignoredB) -> numReportPayloadCallsSender.incrementAndGet()).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManagerTarget = new HeartbeatManagerImpl(200L, resourceIdTarget, heartbeatListenerTarget, TestingUtils.defaultScheduledExecutor(), LOG);
        HeartbeatManagerSenderImpl heartbeatManagerSender = new HeartbeatManagerSenderImpl(50L, 200L, resourceIDSender, heartbeatListenerSender, TestingUtils.defaultScheduledExecutor(), LOG);
        heartbeatManagerTarget.monitorTarget(resourceIDSender, (HeartbeatTarget)heartbeatManagerSender);
        heartbeatManagerSender.monitorTarget(resourceIdTarget, (HeartbeatTarget)heartbeatManagerTarget);
        Thread.sleep(400L);
        Assert.assertFalse((boolean)targetHeartbeatTimeoutFuture.isDone());
        heartbeatManagerTarget.stop();
        ResourceID timeoutResourceID = (ResourceID)targetHeartbeatTimeoutFuture.get(400L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)timeoutResourceID, (Matcher)Matchers.is((Object)resourceIdTarget));
        int numberHeartbeats = 8;
        Matcher numberHeartbeatsMatcher = Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(numberHeartbeats / 2));
        Assert.assertThat((Object)numReportPayloadCallsTarget.get(), (Matcher)Matchers.is((Matcher)numberHeartbeatsMatcher));
        Assert.assertThat((Object)numReportPayloadCallsSender.get(), (Matcher)Matchers.is((Matcher)numberHeartbeatsMatcher));
    }

    @Test
    public void testTargetUnmonitoring() throws Exception {
        long heartbeatTimeout = 50L;
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID targetID = new ResourceID("target");
        int payload = 42;
        CompletableFuture timeoutFuture = new CompletableFuture();
        TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(ignored -> 42).setNotifyHeartbeatTimeoutConsumer(timeoutFuture::complete).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, resourceID, heartbeatListener, TestingUtils.defaultScheduledExecutor(), LOG);
        TestingHeartbeatTarget heartbeatTarget = new TestingHeartbeatTargetBuilder().createTestingHeartbeatTarget();
        heartbeatManager.monitorTarget(targetID, heartbeatTarget);
        heartbeatManager.unmonitorTarget(targetID);
        try {
            timeoutFuture.get(2L * heartbeatTimeout, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Timeout should time out.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLastHeartbeatFromUnregisteredTarget() {
        long heartbeatTimeout = 100L;
        ResourceID resourceId = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(100L, resourceId, heartbeatListener, (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            Assert.assertEquals((long)-1L, (long)heartbeatManager.getLastHeartbeatFrom(ResourceID.generate()));
        }
        finally {
            heartbeatManager.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLastHeartbeatFrom() {
        long heartbeatTimeout = 100L;
        ResourceID resourceId = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        ResourceID target = ResourceID.generate();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(100L, resourceId, heartbeatListener, (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            heartbeatManager.monitorTarget(target, heartbeatTarget);
            Assert.assertEquals((long)0L, (long)heartbeatManager.getLastHeartbeatFrom(target));
            long currentTime = System.currentTimeMillis();
            heartbeatManager.receiveHeartbeat(target, null);
            Assert.assertTrue((heartbeatManager.getLastHeartbeatFrom(target) >= currentTime ? 1 : 0) != 0);
        }
        finally {
            heartbeatManager.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatManagerTargetPayload() throws Exception {
        long heartbeatTimeout = 100L;
        ResourceID someTargetId = ResourceID.generate();
        ResourceID specialTargetId = ResourceID.generate();
        HashMap<ResourceID, Integer> payloads = new HashMap<ResourceID, Integer>(2);
        payloads.put(someTargetId, 0);
        payloads.put(specialTargetId, 1);
        CompletableFuture someHeartbeatPayloadFuture = new CompletableFuture();
        TestingHeartbeatTarget<Integer> someHeartbeatTarget = new TestingHeartbeatTargetBuilder<Integer>().setReceiveHeartbeatConsumer((ignored, payload) -> someHeartbeatPayloadFuture.complete(payload)).createTestingHeartbeatTarget();
        CompletableFuture specialHeartbeatPayloadFuture = new CompletableFuture();
        TestingHeartbeatTarget<Integer> specialHeartbeatTarget = new TestingHeartbeatTargetBuilder<Integer>().setReceiveHeartbeatConsumer((ignored, payload) -> specialHeartbeatPayloadFuture.complete(payload)).createTestingHeartbeatTarget();
        TestingHeartbeatListener testingHeartbeatListener = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(payloads::get).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(100L, ResourceID.generate(), testingHeartbeatListener, TestingUtils.defaultScheduledExecutor(), LOG);
        try {
            heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget);
            heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget);
            heartbeatManager.requestHeartbeat(someTargetId, null);
            Assert.assertThat(someHeartbeatPayloadFuture.get(), (Matcher)Matchers.is(payloads.get(someTargetId)));
            heartbeatManager.requestHeartbeat(specialTargetId, null);
            Assert.assertThat(specialHeartbeatPayloadFuture.get(), (Matcher)Matchers.is(payloads.get(specialTargetId)));
        }
        finally {
            heartbeatManager.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatManagerSenderTargetPayload() throws Exception {
        long heartbeatTimeout = 100L;
        long heartbeatPeriod = 2000L;
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ResourceID someTargetId = ResourceID.generate();
        ResourceID specialTargetId = ResourceID.generate();
        OneShotLatch someTargetReceivedLatch = new OneShotLatch();
        OneShotLatch specialTargetReceivedLatch = new OneShotLatch();
        TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(someTargetReceivedLatch);
        TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(specialTargetReceivedLatch);
        boolean defaultResponse = false;
        boolean specialResponse = true;
        HeartbeatManagerSenderImpl heartbeatManager = new HeartbeatManagerSenderImpl(2000L, 100L, ResourceID.generate(), (HeartbeatListener)new TargetDependentHeartbeatSender(specialTargetId, 1, 0), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)scheduledThreadPoolExecutor), LOG);
        try {
            heartbeatManager.monitorTarget(someTargetId, (HeartbeatTarget)someHeartbeatTarget);
            heartbeatManager.monitorTarget(specialTargetId, (HeartbeatTarget)specialHeartbeatTarget);
            someTargetReceivedLatch.await(5L, TimeUnit.SECONDS);
            specialTargetReceivedLatch.await(5L, TimeUnit.SECONDS);
            Assert.assertEquals((long)0L, (long)someHeartbeatTarget.getLastRequestedHeartbeatPayload());
            Assert.assertEquals((long)1L, (long)specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
        }
        finally {
            heartbeatManager.stop();
            scheduledThreadPoolExecutor.shutdown();
        }
    }

    private static class TargetDependentHeartbeatSender
    implements HeartbeatListener<Object, Integer> {
        private final ResourceID specialId;
        private final int specialResponse;
        private final int defaultResponse;

        TargetDependentHeartbeatSender(ResourceID specialId, int specialResponse, int defaultResponse) {
            this.specialId = specialId;
            this.specialResponse = specialResponse;
            this.defaultResponse = defaultResponse;
        }

        public void notifyHeartbeatTimeout(ResourceID resourceID) {
        }

        public void reportPayload(ResourceID resourceID, Object payload) {
        }

        public Integer retrievePayload(ResourceID resourceID) {
            if (resourceID.equals((Object)this.specialId)) {
                return this.specialResponse;
            }
            return this.defaultResponse;
        }
    }

    private static class TargetDependentHeartbeatReceiver
    implements HeartbeatTarget<Integer> {
        private volatile int lastReceivedHeartbeatPayload = -1;
        private volatile int lastRequestedHeartbeatPayload = -1;
        private final OneShotLatch latch;

        public TargetDependentHeartbeatReceiver() {
            this(new OneShotLatch());
        }

        public TargetDependentHeartbeatReceiver(OneShotLatch latch) {
            this.latch = latch;
        }

        public void receiveHeartbeat(ResourceID heartbeatOrigin, Integer heartbeatPayload) {
            this.lastReceivedHeartbeatPayload = heartbeatPayload;
            this.latch.trigger();
        }

        public void requestHeartbeat(ResourceID requestOrigin, Integer heartbeatPayload) {
            this.lastRequestedHeartbeatPayload = heartbeatPayload;
            this.latch.trigger();
        }

        public int getLastReceivedHeartbeatPayload() {
            return this.lastReceivedHeartbeatPayload;
        }

        public int getLastRequestedHeartbeatPayload() {
            return this.lastRequestedHeartbeatPayload;
        }
    }
}

