/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.IOUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;

public class TimeoutCallStackTest {
    private static ActorSystem actorSystem;
    private static RpcService rpcService;
    private final List<RpcEndpoint> endpointsToStop = new ArrayList<RpcEndpoint>();

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardown() throws Exception {
        CompletableFuture rpcTerminationFuture = rpcService.stopService();
        CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava((Future)actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get(10000L, TimeUnit.MILLISECONDS);
    }

    @After
    public void stopTestEndpoints() {
        this.endpointsToStop.forEach(IOUtils::closeQuietly);
    }

    @Test
    public void testTimeoutException() throws Exception {
        TestingGateway gateway = this.createTestingGateway();
        CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds((long)1L));
        Throwable failureCause = null;
        try {
            future.get();
            Assert.fail((String)"test buggy: the call should never have completed");
        }
        catch (ExecutionException e) {
            failureCause = e.getCause();
        }
        Assert.assertThat((Object)failureCause, (Matcher)Matchers.instanceOf(TimeoutException.class));
        Assert.assertThat((Object)failureCause.getMessage(), (Matcher)Matchers.containsString((String)"callThatTimesOut"));
        Assert.assertThat((Object)failureCause.getStackTrace()[0].getMethodName(), (Matcher)Matchers.equalTo((Object)"testTimeoutException"));
    }

    private TestingGateway createTestingGateway() throws Exception {
        TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name");
        this.endpointsToStop.add(endpoint);
        endpoint.start();
        return (TestingGateway)rpcService.connect(endpoint.getAddress(), TestingGateway.class).get();
    }

    private static final class TestingRpcEndpoint
    extends RpcEndpoint
    implements TestingGateway {
        TestingRpcEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }

        @Override
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
            return new CompletableFuture<Void>();
        }
    }

    private static interface TestingGateway
    extends RpcGateway {
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time var1);
    }
}

