/*
 * Decompiled with CFR 0.152.
 */
package iep.io.reactivex.netty.channel;

import iep.io.reactivex.netty.channel.ChannelMetricEventProvider;
import iep.io.reactivex.netty.channel.DefaultChannelWriter;
import iep.io.reactivex.netty.channel.NewRxConnectionEvent;
import iep.io.reactivex.netty.metrics.Clock;
import iep.io.reactivex.netty.metrics.MetricEventsSubject;
import iep.io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator;
import iep.io.reactivex.netty.util.NoOpSubscriber;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class ObservableConnection<I, O>
extends DefaultChannelWriter<O> {
    public static AttributeKey<Boolean> AUTO_RELEASE_BUFFERS = AttributeKey.valueOf((String)"rxnetty_auto_release_buffers");
    private Subject<I, I> inputSubject;
    private final MetricEventsSubject eventsSubject;
    private final ChannelMetricEventProvider metricEventProvider;
    protected volatile long closeStartTimeMillis = -1L;

    protected ObservableConnection(Channel channel, ChannelMetricEventProvider metricEventProvider, MetricEventsSubject<?> eventsSubject) {
        super(channel, eventsSubject, metricEventProvider);
        this.eventsSubject = eventsSubject;
        this.metricEventProvider = metricEventProvider;
        this.inputSubject = new SerializedSubject((Subject)PublishSubject.create());
    }

    public Observable<I> getInput() {
        return this.inputSubject;
    }

    public static <I, O> ObservableConnection<I, O> create(Channel channel, MetricEventsSubject<?> eventsSubject, ChannelMetricEventProvider metricEventProvider) {
        ObservableConnection<I, O> toReturn = new ObservableConnection<I, O>(channel, metricEventProvider, eventsSubject);
        toReturn.fireNewRxConnectionEvent();
        return toReturn;
    }

    protected void fireNewRxConnectionEvent() {
        ChannelHandlerContext firstContext = this.getChannel().pipeline().firstContext();
        firstContext.fireUserEventTriggered((Object)new NewRxConnectionEvent(this, (Observer)this.inputSubject));
    }

    @Override
    public Observable<Void> close() {
        return super.close();
    }

    @Override
    protected Observable<Void> _close(boolean flush) {
        final Subject<I, I> thisSubject = this.inputSubject;
        ReadTimeoutPipelineConfigurator.disableReadTimeout(this.getChannel().pipeline());
        if (flush) {
            Observable toReturn = this.flush().lift((Observable.Operator)new Observable.Operator<Void, Void>(){

                public Subscriber<? super Void> call(final Subscriber<? super Void> child) {
                    return new Subscriber<Void>(){

                        public void onCompleted() {
                            ObservableConnection.this._closeChannel().subscribe(child);
                            thisSubject.onCompleted();
                        }

                        public void onError(Throwable e) {
                            child.onError(e);
                        }

                        public void onNext(Void aVoid) {
                        }
                    };
                }
            });
            toReturn.subscribe(new NoOpSubscriber());
            return toReturn;
        }
        Observable<Void> toReturn = this._closeChannel();
        thisSubject.onCompleted();
        return toReturn;
    }

    protected Observable<Void> _closeChannel() {
        this.closeStartTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(this.metricEventProvider.getChannelCloseStartEvent());
        final ChannelFuture closeFuture = this.getChannel().close();
        closeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ObservableConnection.this.eventsSubject.onEvent(ObservableConnection.this.metricEventProvider.getChannelCloseSuccessEvent(), Clock.onEndMillis(ObservableConnection.this.closeStartTimeMillis));
                } else {
                    ObservableConnection.this.eventsSubject.onEvent(ObservableConnection.this.metricEventProvider.getChannelCloseFailedEvent(), Clock.onEndMillis(ObservableConnection.this.closeStartTimeMillis), future.cause());
                }
            }
        });
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Void>(){

            public void call(final Subscriber<? super Void> subscriber) {
                closeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            subscriber.onCompleted();
                        } else {
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    protected void updateInputSubject(Subject<I, I> newSubject) {
        this.inputSubject = new SerializedSubject(newSubject);
    }
}

