/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.resources;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.resources.PoolResources;
import reactor.util.Logger;
import reactor.util.Loggers;

final class DefaultPoolResources
implements PoolResources {
    final ConcurrentMap<SocketAddress, Pool> channelPools;
    final String name;
    final PoolFactory provider;
    static final Logger log = Loggers.getLogger(DefaultPoolResources.class);

    DefaultPoolResources(String name, PoolFactory provider) {
        this.name = name;
        this.provider = provider;
        this.channelPools = PlatformDependent.newConcurrentHashMap();
    }

    @Override
    public ChannelPool selectOrCreate(SocketAddress remote, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
        SocketAddress address = remote;
        Pool pool;
        while ((pool = (Pool)this.channelPools.get(remote)) == null) {
            Bootstrap b = bootstrap.get();
            if (remote != null) {
                b = b.remoteAddress(remote);
            } else {
                address = b.config().remoteAddress();
            }
            if (log.isDebugEnabled()) {
                log.debug("New {} client pool for {}", new Object[]{this.name, address});
            }
            if (this.channelPools.putIfAbsent(address, pool = new Pool(b, this.provider, onChannelCreate, group)) == null) {
                return pool;
            }
            pool.close();
        }
        return pool;
    }

    @Override
    public void dispose() {
        this.disposeLater().subscribe();
    }

    @Override
    public Mono<Void> disposeLater() {
        return Mono.fromRunnable(() -> {
            for (SocketAddress key : this.channelPools.keySet()) {
                Pool pool = (Pool)this.channelPools.remove(key);
                if (pool == null) continue;
                pool.close();
            }
        });
    }

    public boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch(AtomicBoolean::get);
    }

    static final class Pool
    extends AtomicBoolean
    implements ChannelPoolHandler,
    ChannelPool,
    ChannelHealthChecker {
        final ChannelPool pool;
        final Consumer<? super Channel> onChannelCreate;
        final EventLoopGroup defaultGroup;
        final AtomicInteger activeConnections = new AtomicInteger();
        final Future<Boolean> HEALTHY;
        final Future<Boolean> UNHEALTHY;

        Pool(Bootstrap bootstrap, PoolFactory provider, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
            this.pool = provider.newPool(bootstrap, this, this);
            this.onChannelCreate = onChannelCreate;
            this.defaultGroup = group;
            this.HEALTHY = group.next().newSucceededFuture((Object)true);
            this.UNHEALTHY = group.next().newSucceededFuture((Object)false);
        }

        public Future<Boolean> isHealthy(Channel channel) {
            return channel.isActive() ? this.HEALTHY : this.UNHEALTHY;
        }

        public Future<Channel> acquire() {
            return this.pool.acquire();
        }

        public Future<Channel> acquire(Promise<Channel> promise) {
            return this.pool.acquire(promise);
        }

        public Future<Void> release(Channel channel) {
            return this.pool.release(channel);
        }

        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return this.pool.release(channel, promise);
        }

        public void close() {
            if (this.compareAndSet(false, true)) {
                this.pool.close();
            }
        }

        public void channelReleased(Channel ch) throws Exception {
            this.activeConnections.decrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Released {}, now {} active connections", new Object[]{ch.toString(), this.activeConnections});
            }
        }

        public void channelAcquired(Channel ch) throws Exception {
            this.activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Acquired {}, now {} active connections", new Object[]{ch.toString(), this.activeConnections});
            }
        }

        public void channelCreated(Channel ch) throws Exception {
            this.activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Created {}, now {} active connections", new Object[]{ch.toString(), this.activeConnections});
            }
            if (this.onChannelCreate != null) {
                this.onChannelCreate.accept((Channel)ch);
            }
        }

        @Override
        public String toString() {
            return this.pool.getClass().getSimpleName() + "{activeConnections=" + this.activeConnections + '}';
        }
    }

    static interface PoolFactory {
        public ChannelPool newPool(Bootstrap var1, ChannelPoolHandler var2, ChannelHealthChecker var3);
    }
}

