/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.ConfigPipeline;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action4;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;

public class HttpStreamingConfigProvider
extends AbstractStateMachine<LifecycleState>
implements ConfigProvider {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(HttpStreamingConfigProvider.class);
    private final AtomicReference<List<InetSocketAddress>> remoteHosts;
    private final Subject<CouchbaseBucketConfig, CouchbaseBucketConfig> configStream;
    private volatile boolean stopped = false;
    private volatile Channel channel;
    private final ClientEnvironment env;

    public HttpStreamingConfigProvider(final ClientEnvironment env) {
        super((Enum)LifecycleState.DISCONNECTED);
        this.env = env;
        this.remoteHosts = new AtomicReference<List<InetSocketAddress>>(env.clusterAt());
        this.configStream = BehaviorSubject.create().toSerialized();
        this.configStream.subscribe((Subscriber)new Subscriber<CouchbaseBucketConfig>(){

            public void onCompleted() {
                LOGGER.debug("Config stream completed.");
            }

            public void onError(Throwable e) {
                LOGGER.warn("Error on config stream!", e);
            }

            public void onNext(CouchbaseBucketConfig config) {
                ArrayList<InetSocketAddress> newNodes = new ArrayList<InetSocketAddress>();
                for (NodeInfo node : config.nodes()) {
                    Integer port = (Integer)(env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.CONFIG);
                    newNodes.add(new InetSocketAddress(node.hostname().nameOrAddress(), (int)port));
                }
                LOGGER.trace("Updated config stream node list to {}.", newNodes);
                HttpStreamingConfigProvider.this.remoteHosts.set(newNodes);
            }
        });
    }

    @Override
    public Completable start() {
        return this.tryConnectHosts();
    }

    @Override
    public Completable stop() {
        this.stopped = true;
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                LOGGER.debug("Initiating streaming config provider shutdown on channel.");
                HttpStreamingConfigProvider.this.transitionState((Enum)LifecycleState.DISCONNECTING);
                if (HttpStreamingConfigProvider.this.channel != null) {
                    Channel ch = HttpStreamingConfigProvider.this.channel;
                    HttpStreamingConfigProvider.this.channel = null;
                    ch.close().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            HttpStreamingConfigProvider.this.transitionState((Enum)LifecycleState.DISCONNECTED);
                            if (future.isSuccess()) {
                                LOGGER.debug("Streaming config provider channel shutdown completed.");
                                subscriber.onCompleted();
                            } else {
                                LOGGER.warn("Error during streaming config provider shutdown!", future.cause());
                                subscriber.onError(future.cause());
                            }
                        }
                    });
                } else {
                    subscriber.onCompleted();
                }
            }
        });
    }

    @Override
    public Observable<CouchbaseBucketConfig> configs() {
        return this.configStream;
    }

    private Completable tryConnectHosts() {
        if (this.stopped) {
            LOGGER.debug("Not trying to connect to hosts, already stopped.");
            return Completable.complete();
        }
        this.transitionState((Enum)LifecycleState.CONNECTING);
        List<InetSocketAddress> hosts = this.remoteHosts.get();
        Completable chain = this.tryConnectHost(hosts.get(0));
        for (int i = 1; i < hosts.size(); ++i) {
            final InetSocketAddress h = hosts.get(i);
            chain = chain.onErrorResumeNext((Func1)new Func1<Throwable, Completable>(){

                public Completable call(Throwable throwable) {
                    LOGGER.warn("Could not get config from Node, trying next in list.", throwable);
                    return HttpStreamingConfigProvider.this.tryConnectHost(h);
                }
            });
        }
        return chain;
    }

    private Completable tryConnectHost(InetSocketAddress address) {
        PooledByteBufAllocator allocator = this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
        final Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().remoteAddress((SocketAddress)address).option(ChannelOption.ALLOCATOR, (Object)allocator)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)this.env.socketConnectTimeout()))).channel(ChannelUtils.channelForEventLoopGroup(this.env.eventLoopGroup()))).handler((ChannelHandler)new ConfigPipeline(this.env, address, this.configStream))).group(this.env.eventLoopGroup());
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                bootstrap.connect().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            HttpStreamingConfigProvider.this.channel = future.channel();
                            HttpStreamingConfigProvider.this.channel.closeFuture().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    HttpStreamingConfigProvider.this.transitionState((Enum)LifecycleState.DISCONNECTED);
                                    HttpStreamingConfigProvider.this.channel = null;
                                    HttpStreamingConfigProvider.this.triggerReconnect();
                                }
                            });
                            LOGGER.debug("Successfully established config connection to Socket {}", (Object)HttpStreamingConfigProvider.this.channel.remoteAddress());
                            HttpStreamingConfigProvider.this.transitionState((Enum)LifecycleState.CONNECTED);
                            subscriber.onCompleted();
                        } else {
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    private void triggerReconnect() {
        this.transitionState((Enum)LifecycleState.CONNECTING);
        if (!this.stopped) {
            this.tryConnectHosts().retryWhen((Func1)RetryBuilder.any().delay(this.env.configProviderReconnectDelay()).max(this.env.configProviderReconnectMaxAttempts()).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

                public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                    LOGGER.info("No host usable to fetch a config from, waiting and retrying (remote hosts: {}).", HttpStreamingConfigProvider.this.remoteHosts.get());
                }
            }).build()).subscribe();
        }
    }
}

