/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Identify;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaBasedEndpoint;
import org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActor;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.akka.ControlMessages;
import org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler;
import org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;

@ThreadSafe
public class AkkaRpcService
implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
    static final int VERSION = 1;
    private final Object lock = new Object();
    private final ActorSystem actorSystem;
    private final AkkaRpcServiceConfiguration configuration;
    @GuardedBy(value="lock")
    private final Map<ActorRef, RpcEndpoint> actors = new HashMap<ActorRef, RpcEndpoint>(4);
    private final String address;
    private final int port;
    private final ScheduledExecutor internalScheduledExecutor;
    private final CompletableFuture<Void> terminationFuture;
    private volatile boolean stopped;

    public AkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
        this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)actorSystem, (String)"actor system");
        this.configuration = (AkkaRpcServiceConfiguration)Preconditions.checkNotNull((Object)configuration, (String)"akka rpc service configuration");
        Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
        this.address = actorSystemAddress.host().isDefined() ? (String)actorSystemAddress.host().get() : "";
        this.port = actorSystemAddress.port().isDefined() ? (Integer)actorSystemAddress.port().get() : -1;
        this.internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
        this.terminationFuture = new CompletableFuture();
        this.stopped = false;
    }

    public ActorSystem getActorSystem() {
        return this.actorSystem;
    }

    protected int getVersion() {
        return 1;
    }

    @Override
    public String getAddress() {
        return this.address;
    }

    @Override
    public int getPort() {
        return this.port;
    }

    @Override
    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
        return this.connectInternal(address, clazz, actorRef -> {
            Tuple2<String, String> addressHostname = this.extractAddressHostname((ActorRef)actorRef);
            return new AkkaInvocationHandler((String)addressHostname.f0, (String)addressHostname.f1, (ActorRef)actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), null);
        });
    }

    @Override
    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        return this.connectInternal(address, clazz, actorRef -> {
            Tuple2<String, String> addressHostname = this.extractAddressHostname((ActorRef)actorRef);
            return new FencedAkkaInvocationHandler<Serializable>((String)addressHostname.f0, (String)addressHostname.f1, (ActorRef)actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), null, () -> fencingToken);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <C extends RpcEndpoint> RpcServer startServer(C rpcEndpoint) {
        FencedAkkaInvocationHandler<Serializable> akkaInvocationHandler;
        ActorRef actorRef;
        Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        Props akkaRpcActorProps = rpcEndpoint instanceof FencedRpcEndpoint ? Props.create(FencedAkkaRpcActor.class, (Object[])new Object[]{rpcEndpoint, terminationFuture, this.getVersion(), this.configuration.getMaximumFramesize()}) : Props.create(AkkaRpcActor.class, (Object[])new Object[]{rpcEndpoint, terminationFuture, this.getVersion(), this.configuration.getMaximumFramesize()});
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
            actorRef = this.actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            this.actors.put(actorRef, rpcEndpoint);
        }
        LOG.info("Starting RPC endpoint for {} at {} .", (Object)rpcEndpoint.getClass().getName(), (Object)actorRef.path());
        String akkaAddress = AkkaUtils.getAkkaURL(this.actorSystem, actorRef);
        Option host = actorRef.path().address().host();
        String hostname = host.isEmpty() ? "localhost" : (String)host.get();
        HashSet<Class<? extends RpcGateway>> implementedRpcGateways = new HashSet<Class<? extends RpcGateway>>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(AkkaBasedEndpoint.class);
        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            akkaInvocationHandler = new FencedAkkaInvocationHandler<Serializable>(akkaAddress, hostname, actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), terminationFuture, ((FencedRpcEndpoint)rpcEndpoint)::getFencingToken);
            implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {
            akkaInvocationHandler = new FencedAkkaInvocationHandler<Serializable>(akkaAddress, hostname, actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), terminationFuture);
        }
        ClassLoader classLoader = this.getClass().getClassLoader();
        RpcServer server = (RpcServer)Proxy.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]), akkaInvocationHandler);
        return server;
    }

    @Override
    public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
        if (rpcServer instanceof AkkaBasedEndpoint) {
            FencedAkkaInvocationHandler<Serializable> fencedInvocationHandler = new FencedAkkaInvocationHandler<Serializable>(rpcServer.getAddress(), rpcServer.getHostname(), ((AkkaBasedEndpoint)((Object)rpcServer)).getActorRef(), this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), null, () -> fencingToken);
            ClassLoader classLoader = this.getClass().getClassLoader();
            return (RpcServer)Proxy.newProxyInstance(classLoader, new Class[]{RpcServer.class, AkkaBasedEndpoint.class}, fencedInvocationHandler);
        }
        throw new RuntimeException("The given RpcServer must implement the AkkaGateway in order to fence it.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stopServer(RpcServer selfGateway) {
        if (selfGateway instanceof AkkaBasedEndpoint) {
            RpcEndpoint rpcEndpoint;
            AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint)((Object)selfGateway);
            Object object = this.lock;
            synchronized (object) {
                if (this.stopped) {
                    return;
                }
                rpcEndpoint = this.actors.remove(akkaClient.getActorRef());
            }
            if (rpcEndpoint != null) {
                this.terminateAkkaRpcActor(akkaClient.getActorRef(), rpcEndpoint);
            } else {
                LOG.debug("RPC endpoint {} already stopped or from different RPC service", (Object)selfGateway.getAddress());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> stopService() {
        CompletableFuture<Void> akkaRpcActorsTerminationFuture;
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return this.terminationFuture;
            }
            LOG.info("Stopping Akka RPC service.");
            this.stopped = true;
            akkaRpcActorsTerminationFuture = this.terminateAkkaRpcActors();
        }
        CompletableFuture<Void> actorSystemTerminationFuture = FutureUtils.composeAfterwards(akkaRpcActorsTerminationFuture, () -> FutureUtils.toJava(this.actorSystem.terminate()));
        actorSystemTerminationFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.terminationFuture.completeExceptionally((Throwable)throwable);
            } else {
                this.terminationFuture.complete(null);
            }
            LOG.info("Stopped Akka RPC service.");
        });
        return this.terminationFuture;
    }

    @Nonnull
    @GuardedBy(value="lock")
    private CompletableFuture<Void> terminateAkkaRpcActors() {
        ArrayList<CompletableFuture<Void>> akkaRpcActorTerminationFutures = new ArrayList<CompletableFuture<Void>>(this.actors.size());
        for (Map.Entry<ActorRef, RpcEndpoint> actorRefRpcEndpointEntry : this.actors.entrySet()) {
            akkaRpcActorTerminationFutures.add(this.terminateAkkaRpcActor(actorRefRpcEndpointEntry.getKey(), actorRefRpcEndpointEntry.getValue()));
        }
        this.actors.clear();
        return FutureUtils.waitForAll(akkaRpcActorTerminationFutures);
    }

    private CompletableFuture<Void> terminateAkkaRpcActor(ActorRef akkaRpcActorRef, RpcEndpoint rpcEndpoint) {
        akkaRpcActorRef.tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
        return rpcEndpoint.getTerminationFuture();
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    @Override
    public Executor getExecutor() {
        return this.actorSystem.dispatcher();
    }

    @Override
    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    @Override
    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
        Preconditions.checkNotNull((Object)runnable, (String)"runnable");
        Preconditions.checkNotNull((Object)((Object)unit), (String)"unit");
        Preconditions.checkArgument((delay >= 0L ? 1 : 0) != 0, (Object)"delay must be zero or larger");
        return this.internalScheduledExecutor.schedule(runnable, delay, unit);
    }

    @Override
    public void execute(Runnable runnable) {
        this.actorSystem.dispatcher().execute(runnable);
    }

    @Override
    public <T> CompletableFuture<T> execute(Callable<T> callable) {
        Future scalaFuture = Futures.future(callable, (ExecutionContext)this.actorSystem.dispatcher());
        return FutureUtils.toJava(scalaFuture);
    }

    private Tuple2<String, String> extractAddressHostname(ActorRef actorRef) {
        String actorAddress = AkkaUtils.getAkkaURL(this.actorSystem, actorRef);
        Option host = actorRef.path().address().host();
        String hostname = host.isEmpty() ? "localhost" : (String)host.get();
        return Tuple2.of((Object)actorAddress, (Object)hostname);
    }

    private <C extends RpcGateway> CompletableFuture<C> connectInternal(String address, Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", (Object)address, (Object)clazz.getName());
        ActorSelection actorSel = this.actorSystem.actorSelection(address);
        Future identify = Patterns.ask((ActorSelection)actorSel, (Object)new Identify((Object)42), (long)this.configuration.getTimeout().toMilliseconds()).mapTo(ClassTag$.MODULE$.apply(ActorIdentity.class));
        CompletableFuture identifyFuture = FutureUtils.toJava(identify);
        CompletionStage actorRefFuture = identifyFuture.thenApply(actorIdentity -> {
            if (actorIdentity.getRef() == null) {
                throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
            }
            return actorIdentity.getRef();
        });
        CompletionStage handshakeFuture = ((CompletableFuture)actorRefFuture).thenCompose(actorRef -> FutureUtils.toJava(Patterns.ask((ActorRef)actorRef, (Object)new RemoteHandshakeMessage(clazz, this.getVersion()), (long)this.configuration.getTimeout().toMilliseconds()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class))));
        return ((CompletableFuture)actorRefFuture).thenCombineAsync(handshakeFuture, (actorRef, ignored) -> {
            InvocationHandler invocationHandler = (InvocationHandler)invocationHandlerFactory.apply((ActorRef)actorRef);
            ClassLoader classLoader = this.getClass().getClassLoader();
            RpcGateway proxy = (RpcGateway)Proxy.newProxyInstance(classLoader, new Class[]{clazz}, invocationHandler);
            return proxy;
        }, (Executor)this.actorSystem.dispatcher());
    }
}

