/*
 * Decompiled with CFR 0.152.
 */
package iep.io.reactivex.netty.protocol.http.websocket;

import iep.io.reactivex.netty.channel.ObservableConnection;
import iep.io.reactivex.netty.client.ClientChannelFactory;
import iep.io.reactivex.netty.client.ClientConnectionFactory;
import iep.io.reactivex.netty.client.ClientMetricsEvent;
import iep.io.reactivex.netty.client.RxClient;
import iep.io.reactivex.netty.client.RxClientImpl;
import iep.io.reactivex.netty.metrics.MetricEventsSubject;
import iep.io.reactivex.netty.pipeline.PipelineConfigurator;
import iep.io.reactivex.netty.protocol.http.websocket.WebSocketClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineException;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public class WebSocketClient<I extends WebSocketFrame, O extends WebSocketFrame>
extends RxClientImpl<I, O> {
    private static final HandshakeOperator HANDSHAKE_OPERATOR = new HandshakeOperator();

    public WebSocketClient(String name, RxClient.ServerInfo serverInfo, Bootstrap clientBootstrap, PipelineConfigurator<O, I> pipelineConfigurator, RxClient.ClientConfig clientConfig, ClientChannelFactory<O, I> channelFactory, ClientConnectionFactory<O, I, ? extends ObservableConnection<O, I>> connectionFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        super(name, serverInfo, clientBootstrap, pipelineConfigurator, clientConfig, channelFactory, connectionFactory, eventsSubject);
    }

    @Override
    public Observable<ObservableConnection<O, I>> connect() {
        return super.connect().lift((Observable.Operator)HANDSHAKE_OPERATOR);
    }

    static class HandshakeOperator<T extends WebSocketFrame>
    implements Observable.Operator<ObservableConnection<T, T>, ObservableConnection<T, T>> {
        HandshakeOperator() {
        }

        public Subscriber<ObservableConnection<T, T>> call(final Subscriber<? super ObservableConnection<T, T>> originalSubscriber) {
            Subscriber liftSubscriber = new Subscriber<ObservableConnection<T, T>>(){

                public void onCompleted() {
                }

                public void onError(Throwable e) {
                    originalSubscriber.onError(e);
                }

                public void onNext(final ObservableConnection<T, T> connection) {
                    ChannelPipeline p = connection.getChannel().pipeline();
                    ChannelHandlerContext hctx = p.context(WebSocketClientHandler.class);
                    if (hctx != null) {
                        WebSocketClientHandler handler = (WebSocketClientHandler)p.get(WebSocketClientHandler.class);
                        handler.addHandshakeFinishedListener(new ChannelFutureListener(){

                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isSuccess()) {
                                    originalSubscriber.onNext((Object)connection);
                                    originalSubscriber.onCompleted();
                                } else {
                                    originalSubscriber.onError(future.cause());
                                }
                            }
                        });
                    } else {
                        originalSubscriber.onError((Throwable)new ChannelPipelineException("invalid pipeline configuration - WebSocket pipeline with no WebSocketClientHandler"));
                    }
                }
            };
            originalSubscriber.add((Subscription)liftSubscriber);
            return liftSubscriber;
        }
    }
}

