/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.AbstractClusterNodeConnectionFactory;
import io.lettuce.core.cluster.AsyncClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterEventListener;
import io.lettuce.core.cluster.ClusterNodeConnectionFactory;
import io.lettuce.core.cluster.PartitionSelectorException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.UnknownPartitionException;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.AsyncConnectionProvider;
import io.lettuce.core.internal.HostAndPort;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.models.role.RedisInstance;
import io.lettuce.core.models.role.RedisNodeDescription;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

class PooledClusterConnectionProvider<K, V>
implements ClusterConnectionProvider,
AsyncClusterConnectionProvider {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class);
    private final Object stateLock = new Object();
    private final boolean debugEnabled = logger.isDebugEnabled();
    private final CompletableFuture<StatefulRedisConnection<K, V>>[] writers = new CompletableFuture[16384];
    private final CompletableFuture<StatefulRedisConnection<K, V>>[][] readers = new CompletableFuture[16384][];
    private final RedisClusterClient redisClusterClient;
    private final ClusterNodeConnectionFactory<K, V> connectionFactory;
    private final RedisChannelWriter clusterWriter;
    private final ClusterEventListener clusterEventListener;
    private final RedisCodec<K, V> redisCodec;
    private final AsyncConnectionProvider<ClusterNodeConnectionFactory.ConnectionKey, StatefulRedisConnection<K, V>, ConnectionFuture<StatefulRedisConnection<K, V>>> connectionProvider;
    private Partitions partitions;
    private boolean autoFlushCommands = true;
    private ReadFrom readFrom;

    public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, RedisCodec<K, V> redisCodec, ClusterEventListener clusterEventListener) {
        this.redisCodec = redisCodec;
        this.redisClusterClient = redisClusterClient;
        this.clusterWriter = clusterWriter;
        this.clusterEventListener = clusterEventListener;
        this.connectionFactory = new NodeConnectionPostProcessor(this.getConnectionFactory(redisClusterClient));
        this.connectionProvider = new AsyncConnectionProvider(this.connectionFactory);
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ClusterConnectionProvider.Intent intent, int slot) {
        try {
            return this.getConnectionAsync(intent, slot).get();
        }
        catch (RedisException e) {
            throw e;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException(e);
        }
        catch (ExecutionException e) {
            throw new RedisException(e.getCause());
        }
        catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ClusterConnectionProvider.Intent intent, int slot) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)intent) + ", " + slot + ")");
        }
        if (intent == ClusterConnectionProvider.Intent.READ && this.readFrom != null && this.readFrom != ReadFrom.MASTER) {
            return this.getReadConnection(slot);
        }
        return this.getWriteConnection(slot).toCompletableFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int slot) {
        CompletableFuture<StatefulRedisConnection<K, V>> writer;
        Object object = this.stateLock;
        synchronized (object) {
            writer = this.writers[slot];
        }
        if (writer == null) {
            RedisClusterNode partition = this.partitions.getPartitionBySlot(slot);
            if (partition == null) {
                throw new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".", this.partitions.clone());
            }
            RedisURI uri = partition.getUri();
            ClusterNodeConnectionFactory.ConnectionKey key = new ClusterNodeConnectionFactory.ConnectionKey(ClusterConnectionProvider.Intent.WRITE, uri.getHost(), uri.getPort());
            ConnectionFuture<StatefulRedisConnection<K, V>> future = this.getConnectionAsync(key);
            return future.thenApply(connection -> {
                Object object = this.stateLock;
                synchronized (object) {
                    if (this.writers[slot] == null) {
                        this.writers[slot] = CompletableFuture.completedFuture(connection);
                    }
                }
                return connection;
            }).toCompletableFuture();
        }
        return writer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<StatefulRedisConnection<K, V>> getReadConnection(int slot) {
        CompletableFuture[] readerCandidates;
        boolean cached = true;
        Object object = this.stateLock;
        synchronized (object) {
            readerCandidates = this.readers[slot];
        }
        if (readerCandidates == null) {
            RedisClusterNode master = this.partitions.getPartitionBySlot(slot);
            if (master == null) {
                throw new PartitionSelectorException(String.format("Cannot determine a partition to read for slot %d.", slot), this.partitions.clone());
            }
            final List<RedisNodeDescription> candidates2 = this.getReadCandidates(master);
            List<RedisNodeDescription> selection = this.readFrom.select(new ReadFrom.Nodes(){

                @Override
                public List<RedisNodeDescription> getNodes() {
                    return candidates2;
                }

                @Override
                public Iterator<RedisNodeDescription> iterator() {
                    return candidates2.iterator();
                }
            });
            if (selection.isEmpty()) {
                throw new PartitionSelectorException(String.format("Cannot determine a partition to read for slot %d with setting %s.", slot, this.readFrom), this.partitions.clone());
            }
            readerCandidates = this.getReadFromConnections(selection);
            cached = false;
        }
        CompletableFuture[] selectedReaderCandidates = readerCandidates;
        if (cached) {
            return CompletableFuture.allOf(readerCandidates).thenCompose(v -> {
                for (CompletableFuture candidate : selectedReaderCandidates) {
                    if (!((StatefulRedisConnection)candidate.join()).isOpen()) continue;
                    return candidate;
                }
                return selectedReaderCandidates[0];
            });
        }
        CompletableFuture filteredReaderCandidates = new CompletableFuture();
        ((CompletableFuture)CompletableFuture.allOf(readerCandidates).thenApply(v -> selectedReaderCandidates)).whenComplete((candidates, throwable) -> {
            if (throwable == null) {
                filteredReaderCandidates.complete(this.getConnections((CompletableFuture<StatefulRedisConnection<K, V>>[])candidates));
                return;
            }
            StatefulRedisConnection<K, V>[] connections = this.getConnections(selectedReaderCandidates);
            if (connections.length == 0) {
                filteredReaderCandidates.completeExceptionally((Throwable)throwable);
                return;
            }
            filteredReaderCandidates.complete(connections);
        });
        return filteredReaderCandidates.thenApply(statefulRedisConnections -> {
            CompletableFuture[] toCache = new CompletableFuture[((StatefulRedisConnection[])statefulRedisConnections).length];
            for (int i = 0; i < toCache.length; ++i) {
                toCache[i] = CompletableFuture.completedFuture(statefulRedisConnections[i]);
            }
            StatefulRedisConnection[] statefulRedisConnectionArray = this.stateLock;
            synchronized (this.stateLock) {
                this.readers[slot] = toCache;
                // ** MonitorExit[var4_5] (shouldn't be in output)
                for (StatefulRedisConnection candidate : statefulRedisConnections) {
                    if (!candidate.isOpen()) continue;
                    return candidate;
                }
                return statefulRedisConnections[0];
            }
        });
    }

    private StatefulRedisConnection<K, V>[] getConnections(CompletableFuture<StatefulRedisConnection<K, V>>[] selectedReaderCandidates) {
        ArrayList<StatefulRedisConnection<K, V>> connections = new ArrayList<StatefulRedisConnection<K, V>>(selectedReaderCandidates.length);
        for (CompletableFuture<StatefulRedisConnection<K, V>> candidate : selectedReaderCandidates) {
            try {
                connections.add(candidate.join());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        StatefulRedisConnection[] result = new StatefulRedisConnection[connections.size()];
        connections.toArray(result);
        return result;
    }

    private CompletableFuture<StatefulRedisConnection<K, V>>[] getReadFromConnections(List<RedisNodeDescription> selection) {
        CompletableFuture[] readerCandidates = new CompletableFuture[selection.size()];
        for (int i = 0; i < selection.size(); ++i) {
            RedisNodeDescription redisClusterNode = selection.get(i);
            RedisURI uri = redisClusterNode.getUri();
            ClusterNodeConnectionFactory.ConnectionKey key = new ClusterNodeConnectionFactory.ConnectionKey(redisClusterNode.getRole() == RedisInstance.Role.MASTER ? ClusterConnectionProvider.Intent.WRITE : ClusterConnectionProvider.Intent.READ, uri.getHost(), uri.getPort());
            readerCandidates[i] = this.getConnectionAsync(key).toCompletableFuture();
        }
        return readerCandidates;
    }

    private List<RedisNodeDescription> getReadCandidates(RedisClusterNode master) {
        return this.partitions.stream().filter(partition -> this.isReadCandidate(master, (RedisClusterNode)partition)).collect(Collectors.toList());
    }

    private boolean isReadCandidate(RedisClusterNode master, RedisClusterNode partition) {
        return master.getNodeId().equals(partition.getNodeId()) || master.getNodeId().equals(partition.getSlaveOf());
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ClusterConnectionProvider.Intent intent, String nodeId) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)intent) + ", " + nodeId + ")");
        }
        return this.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(intent, nodeId));
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ClusterConnectionProvider.Intent intent, String nodeId) {
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)intent) + ", " + nodeId + ")");
        }
        return this.getConnectionAsync(new ClusterNodeConnectionFactory.ConnectionKey(intent, nodeId)).toCompletableFuture();
    }

    protected ConnectionFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ClusterNodeConnectionFactory.ConnectionKey key) {
        ConnectionFuture connectionFuture = this.connectionProvider.getConnection(key);
        CompletableFuture result = new CompletableFuture();
        connectionFuture.handle((connection, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(RedisConnectionException.create(connectionFuture.getRemoteAddress(), throwable));
            } else {
                result.complete(connection);
            }
            return null;
        });
        return ConnectionFuture.from(connectionFuture.getRemoteAddress(), result);
    }

    @Override
    public StatefulRedisConnection<K, V> getConnection(ClusterConnectionProvider.Intent intent, String host, int port) {
        try {
            this.beforeGetConnection(intent, host, port);
            return this.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(intent, host, port));
        }
        catch (RedisException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

    private StatefulRedisConnection<K, V> getConnection(ClusterNodeConnectionFactory.ConnectionKey key) {
        ConnectionFuture<StatefulRedisConnection<K, V>> future = this.getConnectionAsync(key);
        try {
            return future.join();
        }
        catch (CompletionException e) {
            throw RedisConnectionException.create(future.getRemoteAddress(), e.getCause());
        }
    }

    @Override
    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ClusterConnectionProvider.Intent intent, String host, int port) {
        try {
            this.beforeGetConnection(intent, host, port);
            return this.connectionProvider.getConnection(new ClusterNodeConnectionFactory.ConnectionKey(intent, host, port)).toCompletableFuture();
        }
        catch (RedisException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

    private void beforeGetConnection(ClusterConnectionProvider.Intent intent, String host, int port) {
        RedisClusterNode redisClusterNode;
        if (this.debugEnabled) {
            logger.debug("getConnection(" + (Object)((Object)intent) + ", " + host + ", " + port + ")");
        }
        if ((redisClusterNode = this.partitions.getPartition(host, port)) == null) {
            this.clusterEventListener.onUnknownNode();
            if (this.validateClusterNodeMembership()) {
                HostAndPort hostAndPort = HostAndPort.of(host, port);
                throw PooledClusterConnectionProvider.connectionAttemptRejected(hostAndPort.toString());
            }
        }
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.resetFastConnectionCache();
        return this.connectionProvider.close();
    }

    @Override
    public void reset() {
        this.connectionProvider.forEach(StatefulConnection::reset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setPartitions(Partitions partitions) {
        boolean reconfigurePartitions = false;
        Object object = this.stateLock;
        synchronized (object) {
            if (this.partitions != null) {
                reconfigurePartitions = true;
            }
            this.partitions = partitions;
            this.connectionFactory.setPartitions(partitions);
        }
        if (reconfigurePartitions) {
            this.reconfigurePartitions();
        }
    }

    protected Partitions getPartitions() {
        return this.partitions;
    }

    private void reconfigurePartitions() {
        this.resetFastConnectionCache();
        if (this.redisClusterClient.expireStaleConnections()) {
            this.closeStaleConnections();
        }
    }

    @Override
    public void closeStaleConnections() {
        logger.debug("closeStaleConnections() count before expiring: {}", (Object)this.getConnectionCount());
        this.connectionProvider.forEach((key, connection) -> {
            if (this.isStale((ClusterNodeConnectionFactory.ConnectionKey)key)) {
                this.connectionProvider.close((ClusterNodeConnectionFactory.ConnectionKey)key);
            }
        });
        logger.debug("closeStaleConnections() count after expiring: {}", (Object)this.getConnectionCount());
    }

    private boolean isStale(ClusterNodeConnectionFactory.ConnectionKey connectionKey) {
        if (connectionKey.nodeId != null && this.partitions.getPartitionByNodeId(connectionKey.nodeId) != null) {
            return false;
        }
        return connectionKey.host == null || this.partitions.getPartition(connectionKey.host, connectionKey.port) == null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        Object object = this.stateLock;
        synchronized (object) {
            this.autoFlushCommands = autoFlush;
        }
        this.connectionProvider.forEach(connection -> connection.setAutoFlushCommands(autoFlush));
    }

    @Override
    public void flushCommands() {
        this.connectionProvider.forEach(StatefulConnection::flushCommands);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setReadFrom(ReadFrom readFrom) {
        Object object = this.stateLock;
        synchronized (object) {
            this.readFrom = readFrom;
            Arrays.fill(this.readers, null);
        }
    }

    @Override
    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    long getConnectionCount() {
        return this.connectionProvider.getConnectionCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetFastConnectionCache() {
        Object object = this.stateLock;
        synchronized (object) {
            Arrays.fill(this.writers, null);
            Arrays.fill(this.readers, null);
        }
    }

    private static RuntimeException connectionAttemptRejected(String message) {
        return new UnknownPartitionException("Connection to " + message + " not allowed. This partition is not known in the cluster view.");
    }

    private boolean validateClusterNodeMembership() {
        return this.redisClusterClient.getClusterClientOptions() == null || this.redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership();
    }

    protected ClusterNodeConnectionFactory<K, V> getConnectionFactory(RedisClusterClient redisClusterClient) {
        return new DefaultClusterNodeConnectionFactory<K, V>(redisClusterClient, this.redisCodec, this.clusterWriter);
    }

    static class DefaultClusterNodeConnectionFactory<K, V>
    extends AbstractClusterNodeConnectionFactory<K, V> {
        private final RedisClusterClient redisClusterClient;
        private final RedisCodec<K, V> redisCodec;
        private final RedisChannelWriter clusterWriter;

        DefaultClusterNodeConnectionFactory(RedisClusterClient redisClusterClient, RedisCodec<K, V> redisCodec, RedisChannelWriter clusterWriter) {
            super(redisClusterClient.getResources());
            this.redisClusterClient = redisClusterClient;
            this.redisCodec = redisCodec;
            this.clusterWriter = clusterWriter;
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            if (key.nodeId != null) {
                return this.redisClusterClient.connectToNodeAsync(this.redisCodec, key.nodeId, null, this.getSocketAddressSupplier(key));
            }
            return this.redisClusterClient.connectToNodeAsync(this.redisCodec, key.host + ":" + key.port, this.clusterWriter, this.getSocketAddressSupplier(key));
        }
    }

    class NodeConnectionPostProcessor
    implements ClusterNodeConnectionFactory<K, V> {
        private final ClusterNodeConnectionFactory<K, V> delegate;

        NodeConnectionPostProcessor(ClusterNodeConnectionFactory<K, V> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void setPartitions(Partitions partitions) {
            this.delegate.setPartitions(partitions);
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            if (key.nodeId != null && PooledClusterConnectionProvider.this.getPartitions().getPartitionByNodeId(key.nodeId) == null) {
                PooledClusterConnectionProvider.this.clusterEventListener.onUnknownNode();
                throw PooledClusterConnectionProvider.connectionAttemptRejected("node id " + key.nodeId);
            }
            if (key.host != null && PooledClusterConnectionProvider.this.partitions.getPartition(key.host, key.port) == null) {
                PooledClusterConnectionProvider.this.clusterEventListener.onUnknownNode();
                if (PooledClusterConnectionProvider.this.validateClusterNodeMembership()) {
                    throw PooledClusterConnectionProvider.connectionAttemptRejected(key.host + ":" + key.port);
                }
            }
            CompletionStage connection = (ConnectionFuture)this.delegate.apply(key);
            LettuceAssert.notNull(connection, "Connection is null. Check ConnectionKey because host and nodeId are null.");
            if (key.intent == ClusterConnectionProvider.Intent.READ) {
                connection = connection.thenCompose(c -> {
                    RedisFuture<String> stringRedisFuture = c.async().readOnly();
                    return stringRedisFuture.thenApply(s -> c).whenCompleteAsync((s, throwable) -> {
                        if (throwable != null) {
                            c.close();
                        }
                    });
                });
            }
            connection = connection.thenApply(c -> {
                Object object = PooledClusterConnectionProvider.this.stateLock;
                synchronized (object) {
                    c.setAutoFlushCommands(PooledClusterConnectionProvider.this.autoFlushCommands);
                }
                return c;
            });
            return connection;
        }
    }
}

