/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.broker.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.broker.rsocket.DelegatingRSocket;
import java.util.concurrent.CancellationException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class ErrorOnDisconnectRSocket
implements DelegatingRSocket {
    private static final CancellationException CANCELLATION_EXCEPTION = new CancellationException("Connection has closed");
    private final RSocket delegate;
    private final MonoProcessor<Boolean> onCancelHook = MonoProcessor.create();

    public ErrorOnDisconnectRSocket(RSocket source) {
        this.delegate = source;
    }

    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    public void dispose() {
        this.onCancelHook.onNext((Object)true);
        this.onCancelHook.onComplete();
        this.delegate.dispose();
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.wrapMono(this.delegate.requestResponse(payload));
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)t);
        }
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.delegate.fireAndForget(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)t);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.wrap(this.delegate.requestStream(payload));
        }
        catch (Throwable t) {
            payload.release();
            return Flux.error((Throwable)t);
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.delegate.metadataPush(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)t);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        try {
            return this.wrap(this.delegate.requestChannel(payloads));
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public RSocket getDelegate() {
        return this.delegate;
    }

    private <T> Mono<T> wrapMono(Mono<T> source) {
        return Mono.from(this.wrap(Flux.from(source)));
    }

    private <T> Flux<T> wrap(Flux<T> source) {
        return Flux.from(new CancelSubscriptionPublisher(source));
    }

    private class CancelSubscriptionPublisher<T>
    implements Publisher<T> {
        private final Flux<T> delegate;

        private CancelSubscriptionPublisher(Flux<T> delegate) {
            this.delegate = delegate;
        }

        public void subscribe(Subscriber<? super T> s) {
            Disposable subscription = ErrorOnDisconnectRSocket.this.onCancelHook.subscribe(b -> s.onError((Throwable)CANCELLATION_EXCEPTION));
            this.delegate.subscribe(this.wrapSubscriber(s, subscription));
        }

        private Subscriber<? super T> wrapSubscriber(Subscriber<? super T> s, final Disposable cancelSubscription) {
            final Subscriber<? super T> delegate = s;
            return new Subscriber<T>(){

                public void onSubscribe(Subscription s) {
                    delegate.onSubscribe(CancelSubscriptionPublisher.this.wrapSubscription(s, cancelSubscription));
                }

                public void onNext(T t) {
                    delegate.onNext(t);
                }

                public void onError(Throwable t) {
                    if (!cancelSubscription.isDisposed()) {
                        cancelSubscription.dispose();
                    }
                    delegate.onError(t);
                }

                public void onComplete() {
                    if (!cancelSubscription.isDisposed()) {
                        cancelSubscription.dispose();
                    }
                    delegate.onComplete();
                }
            };
        }

        private Subscription wrapSubscription(final Subscription s, final Disposable cancelSubscription) {
            return new Subscription(){

                public void request(long n) {
                    s.request(n);
                }

                public void cancel() {
                    if (!cancelSubscription.isDisposed()) {
                        cancelSubscription.dispose();
                    }
                    s.cancel();
                }
            };
        }
    }
}

