/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster.topology;

import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.topology.ClusterTopologyRefresh;
import io.lettuce.core.cluster.topology.Connections;
import io.lettuce.core.cluster.topology.NodeConnectionFactory;
import io.lettuce.core.cluster.topology.NodeTopologyView;
import io.lettuce.core.cluster.topology.NodeTopologyViews;
import io.lettuce.core.cluster.topology.RedisClusterNodeSnapshot;
import io.lettuce.core.cluster.topology.Requests;
import io.lettuce.core.cluster.topology.TimedAsyncCommand;
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceStrings;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.Timeout;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

class DefaultClusterTopologyRefresh
implements ClusterTopologyRefresh {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultClusterTopologyRefresh.class);
    private final NodeConnectionFactory nodeConnectionFactory;
    private final ClientResources clientResources;

    public DefaultClusterTopologyRefresh(NodeConnectionFactory nodeConnectionFactory, ClientResources clientResources) {
        this.nodeConnectionFactory = nodeConnectionFactory;
        this.clientResources = clientResources;
    }

    @Override
    public CompletionStage<Map<RedisURI, Partitions>> loadViews(Iterable<RedisURI> seed, Duration connectTimeout, boolean discovery) {
        if (!this.isEventLoopActive()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        long commandTimeoutNs = DefaultClusterTopologyRefresh.getCommandTimeoutNs(seed);
        ConnectionTracker tracker = new ConnectionTracker();
        long connectionTimeout = commandTimeoutNs + connectTimeout.toNanos();
        this.openConnections(tracker, seed, connectionTimeout, TimeUnit.NANOSECONDS);
        CompletionStage composition = tracker.whenComplete(map -> new Connections(this.clientResources, (Map<RedisURI, StatefulRedisConnection<String, String>>)map)).thenCompose(connections -> {
            Requests requestedTopology = connections.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS);
            Requests requestedInfo = connections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS);
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(requestedTopology.allCompleted(), requestedInfo.allCompleted()).thenApplyAsync(ignore -> this.getNodeSpecificViews(requestedTopology, requestedInfo), (Executor)this.clientResources.eventExecutorGroup())).thenCompose(views -> {
                if (discovery && this.isEventLoopActive()) {
                    Set<RedisURI> allKnownUris = views.getClusterNodes();
                    Set<RedisURI> discoveredNodes = DefaultClusterTopologyRefresh.difference(allKnownUris, this.toSet(seed));
                    if (discoveredNodes.isEmpty()) {
                        return CompletableFuture.completedFuture(views);
                    }
                    this.openConnections(tracker, discoveredNodes, connectionTimeout, TimeUnit.NANOSECONDS);
                    return tracker.whenComplete(map -> new Connections(this.clientResources, (Map<RedisURI, StatefulRedisConnection<String, String>>)map).retainAll(discoveredNodes)).thenCompose(newConnections -> {
                        Requests additionalTopology = newConnections.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedTopology);
                        Requests additionalInfo = newConnections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedInfo);
                        return CompletableFuture.allOf(additionalTopology.allCompleted(), additionalInfo.allCompleted()).thenApplyAsync(ignore2 -> this.getNodeSpecificViews(additionalTopology, additionalInfo), (Executor)this.clientResources.eventExecutorGroup());
                    });
                }
                return CompletableFuture.completedFuture(views);
            })).whenComplete((ignore, throwable) -> {
                if (throwable != null) {
                    try {
                        tracker.close();
                    }
                    catch (Exception e) {
                        logger.debug("Cannot close ClusterTopologyRefresh connections", (Throwable)e);
                    }
                }
            })).thenCompose(it -> tracker.close().thenApply(ignore -> it))).thenCompose(it -> {
                if (it.isEmpty()) {
                    Exception exception = this.tryFail(requestedTopology, tracker, seed);
                    return Futures.failed(exception);
                }
                return CompletableFuture.completedFuture(it);
            });
        });
        return ((CompletableFuture)composition).thenApply(NodeTopologyViews::toMap);
    }

    private Exception tryFail(Requests requestedTopology, ConnectionTracker tracker, Iterable<RedisURI> seed) {
        LinkedHashMap<RedisURI, String> failures = new LinkedHashMap<RedisURI, String>();
        CannotRetrieveClusterPartitions exception = new CannotRetrieveClusterPartitions(seed, failures);
        for (RedisURI redisURI : requestedTopology.nodes()) {
            Throwable cause;
            TimedAsyncCommand<String, String, String> request = requestedTopology.getRequest(redisURI);
            if (request == null || !request.isCompletedExceptionally() || (cause = DefaultClusterTopologyRefresh.getException(request)) == null) continue;
            failures.put(redisURI, DefaultClusterTopologyRefresh.getExceptionDetail(cause));
            exception.addSuppressed(cause);
        }
        for (Map.Entry entry : tracker.connections.entrySet()) {
            CompletableFuture future = (CompletableFuture)entry.getValue();
            if (!future.isDone() || !future.isCompletedExceptionally()) continue;
            try {
                future.join();
            }
            catch (CompletionException e) {
                Throwable cause = e.getCause();
                if (cause == null) continue;
                failures.put((RedisURI)entry.getKey(), DefaultClusterTopologyRefresh.getExceptionDetail(cause));
                exception.addSuppressed(cause);
            }
        }
        return exception;
    }

    private static String getExceptionDetail(Throwable exception) {
        if (exception instanceof RedisConnectionException && exception.getCause() instanceof IOException) {
            exception = exception.getCause();
        }
        return LettuceStrings.isNotEmpty(exception.getMessage()) ? exception.getMessage() : exception.toString();
    }

    private Set<RedisURI> toSet(Iterable<RedisURI> seed) {
        return StreamSupport.stream(seed.spliterator(), false).collect(Collectors.toCollection(HashSet::new));
    }

    NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requestedInfo) {
        ArrayList allNodes = new ArrayList();
        HashMap<String, NodeTopologyView> self = new HashMap<String, NodeTopologyView>();
        Set<RedisURI> nodes = requestedTopology.nodes();
        ArrayList<NodeTopologyView> views = new ArrayList<NodeTopologyView>();
        for (RedisURI nodeUri : nodes) {
            try {
                NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedInfo);
                if (!nodeTopologyView.isAvailable()) continue;
                RedisClusterNode node = nodeTopologyView.getOwnPartition();
                if (node.getUri() == null) {
                    node.setUri(nodeUri);
                } else {
                    node.addAlias(nodeUri);
                }
                self.put(node.getNodeId(), nodeTopologyView);
                ArrayList<RedisClusterNodeSnapshot> nodeWithStats = new ArrayList<RedisClusterNodeSnapshot>(nodeTopologyView.getPartitions().size());
                for (RedisClusterNode partition : nodeTopologyView.getPartitions()) {
                    if (!DefaultClusterTopologyRefresh.validNode(partition)) continue;
                    nodeWithStats.add(new RedisClusterNodeSnapshot(partition));
                }
                allNodes.addAll(nodeWithStats);
                Partitions partitions = new Partitions();
                partitions.addAll((Collection<? extends RedisClusterNode>)nodeWithStats);
                nodeTopologyView.setPartitions(partitions);
                views.add(nodeTopologyView);
            }
            catch (CompletionException e) {
                logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e));
            }
        }
        for (RedisClusterNodeSnapshot node : allNodes) {
            if (!self.containsKey(node.getNodeId())) continue;
            NodeTopologyView view = (NodeTopologyView)self.get(node.getNodeId());
            node.setConnectedClients(view.getConnectedClients());
            node.setReplOffset(view.getReplicationOffset());
            node.setLatencyNs(view.getLatency());
        }
        for (NodeTopologyView view : views) {
            view.postProcessPartitions();
        }
        return new NodeTopologyViews(views);
    }

    private static boolean validNode(RedisClusterNode redisClusterNode) {
        if (redisClusterNode.is(RedisClusterNode.NodeFlag.NOADDR)) {
            return false;
        }
        return redisClusterNode.getUri() != null && redisClusterNode.getUri().getPort() != 0 && !LettuceStrings.isEmpty(redisClusterNode.getUri().getHost());
    }

    private void openConnections(ConnectionTracker tracker, Iterable<RedisURI> redisURIs, long timeout, TimeUnit timeUnit) {
        for (RedisURI redisURI : redisURIs) {
            CompletableFuture<StatefulRedisConnection<String, String>> sync = new CompletableFuture<StatefulRedisConnection<String, String>>();
            try {
                if (redisURI.getHost() == null || tracker.contains(redisURI) || !this.isEventLoopActive()) continue;
                SocketAddress socketAddress = this.clientResources.socketAddressResolver().resolve(redisURI);
                ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture = this.nodeConnectionFactory.connectToNodeAsync(StringCodec.UTF8, socketAddress);
                Timeout cancelTimeout = this.clientResources.timer().newTimeout(it -> {
                    String message = String.format("Unable to connect to [%s]: Timeout after %s", socketAddress, ExceptionFactory.formatTimeout(Duration.ofNanos(timeUnit.toNanos(timeout))));
                    sync.completeExceptionally(new RedisConnectionException(message));
                }, timeout, timeUnit);
                connectionFuture.whenComplete((connection, throwable) -> {
                    cancelTimeout.cancel();
                    if (throwable != null) {
                        Throwable throwableToUse = Exceptions.unwrap(throwable);
                        String message = String.format("Unable to connect to [%s]: %s", socketAddress, throwableToUse.getMessage() != null ? throwableToUse.getMessage() : throwableToUse.toString());
                        if (throwableToUse instanceof RedisConnectionException || throwableToUse instanceof IOException) {
                            if (logger.isDebugEnabled()) {
                                logger.debug(message, throwableToUse);
                            } else {
                                logger.warn(message);
                            }
                        } else {
                            logger.warn(message, throwableToUse);
                        }
                        sync.completeExceptionally(new RedisConnectionException(message, throwableToUse));
                    } else {
                        connection.async().clientSetname("lettuce#ClusterTopologyRefresh");
                        if (!sync.complete((StatefulRedisConnection<String, String>)connection)) {
                            connection.close();
                        }
                    }
                });
                tracker.addConnection(redisURI, sync);
            }
            catch (RuntimeException e) {
                String message = String.format("Unable to connect to [%s]", redisURI);
                logger.warn(message, (Throwable)e);
                sync.completeExceptionally(new RedisConnectionException(message, e));
                tracker.addConnection(redisURI, sync);
            }
        }
    }

    private boolean isEventLoopActive() {
        EventExecutorGroup eventExecutors = this.clientResources.eventExecutorGroup();
        return !eventExecutors.isShuttingDown();
    }

    private static Set<RedisURI> difference(Set<RedisURI> allKnown, Set<RedisURI> seed) {
        TreeSet<RedisURI> result = new TreeSet<RedisURI>(TopologyComparators.RedisURIComparator.INSTANCE);
        for (RedisURI e : allKnown) {
            if (seed.contains(e)) continue;
            result.add(e);
        }
        return result;
    }

    private static long getCommandTimeoutNs(Iterable<RedisURI> redisURIs) {
        RedisURI redisURI = redisURIs.iterator().next();
        return redisURI.getTimeout().toNanos();
    }

    private static Throwable getException(Future<?> future) {
        try {
            future.get();
        }
        catch (Exception e) {
            return Exceptions.bubble(e);
        }
        return null;
    }

    static class CannotRetrieveClusterPartitions
    extends RedisException {
        private final Map<RedisURI, String> failure;

        public CannotRetrieveClusterPartitions(Iterable<RedisURI> seedNodes, Map<RedisURI, String> failure) {
            super(String.format("Cannot retrieve cluster partitions from %s", seedNodes));
            this.failure = failure;
        }

        @Override
        public String getMessage() {
            StringJoiner joiner = new StringJoiner(SystemPropertyUtil.get((String)"line.separator", (String)"\n"));
            if (!this.failure.isEmpty()) {
                joiner.add(super.getMessage()).add("");
                joiner.add("Details:");
                for (Map.Entry<RedisURI, String> entry : this.failure.entrySet()) {
                    joiner.add(String.format("\t[%s]: %s", entry.getKey(), entry.getValue()));
                }
                joiner.add("");
            }
            return joiner.toString();
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }
    }

    static class ConnectionTracker {
        private final Map<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> connections = new LinkedHashMap<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>>();

        ConnectionTracker() {
        }

        public void addConnection(RedisURI uri, CompletableFuture<StatefulRedisConnection<String, String>> future) {
            CompletableFuture<StatefulRedisConnection<String, String>> existing = this.connections.put(uri, future);
        }

        public CompletableFuture<Void> close() {
            CompletableFuture[] futures = (CompletableFuture[])this.connections.values().stream().map(it -> ((CompletableFuture)it.thenCompose(StatefulConnection::closeAsync)).exceptionally(ignore -> null)).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures);
        }

        public boolean contains(RedisURI uri) {
            return this.connections.containsKey(uri);
        }

        public <T> CompletableFuture<T> whenComplete(Function<? super Map<RedisURI, StatefulRedisConnection<String, String>>, ? extends T> mappingFunction) {
            int expectedCount = this.connections.size();
            AtomicInteger latch = new AtomicInteger();
            CompletableFuture continuation = new CompletableFuture();
            for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.connections.entrySet()) {
                CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();
                future.whenComplete((T it, U ex) -> {
                    if (latch.incrementAndGet() == expectedCount) {
                        try {
                            continuation.complete(mappingFunction.apply(this.collectConnections()));
                        }
                        catch (RuntimeException e) {
                            continuation.completeExceptionally(e);
                        }
                    }
                });
            }
            return continuation;
        }

        protected Map<RedisURI, StatefulRedisConnection<String, String>> collectConnections() {
            LinkedHashMap<RedisURI, StatefulRedisConnection<String, String>> activeConnections = new LinkedHashMap<RedisURI, StatefulRedisConnection<String, String>>();
            for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.connections.entrySet()) {
                CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();
                if (!future.isDone() || future.isCompletedExceptionally()) continue;
                activeConnections.put(entry.getKey(), future.join());
            }
            return activeConnections;
        }
    }
}

