/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.stream;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class DirectStreamObserver<@UnknownKeyFor T>
implements StreamObserver<T> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
    private static final @UnknownKeyFor @NonNull @Initialized int DEFAULT_MAX_MESSAGES_BEFORE_CHECK = 100;
    private final @UnknownKeyFor @NonNull @Initialized Phaser phaser;
    private final @UnknownKeyFor @NonNull @Initialized CallStreamObserver<T> outboundObserver;
    private final @UnknownKeyFor @NonNull @Initialized int maxMessagesBeforeCheck;
    private final @UnknownKeyFor @NonNull @Initialized Object lock = new Object();
    private @UnknownKeyFor @NonNull @Initialized int numMessages = -1;

    public DirectStreamObserver(@UnknownKeyFor @NonNull @Initialized Phaser phaser, @UnknownKeyFor @NonNull @Initialized CallStreamObserver<T> outboundObserver) {
        this(phaser, outboundObserver, 100);
    }

    DirectStreamObserver(@UnknownKeyFor @NonNull @Initialized Phaser phaser, @UnknownKeyFor @NonNull @Initialized CallStreamObserver<T> outboundObserver, @UnknownKeyFor @NonNull @Initialized int maxMessagesBeforeCheck) {
        this.phaser = phaser;
        this.outboundObserver = outboundObserver;
        this.maxMessagesBeforeCheck = maxMessagesBeforeCheck;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(T value) {
        Object object = this.lock;
        synchronized (object) {
            if (++this.numMessages >= this.maxMessagesBeforeCheck) {
                int phase;
                this.numMessages = 0;
                int waitSeconds = 1;
                int totalSecondsWaited = 0;
                int initialPhase = phase = this.phaser.getPhase();
                while (phase >= 0 && !this.outboundObserver.isReady()) {
                    try {
                        phase = this.phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS);
                    }
                    catch (TimeoutException e) {
                        totalSecondsWaited += waitSeconds;
                        waitSeconds = Math.min(waitSeconds * 2, 60);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
                if (totalSecondsWaited > 0) {
                    if (initialPhase == phase) {
                        LOG.info("Output channel stalled for {}s, outbound thread {}. OnReady notification was not invoked, ensure the inbound gRPC thread is not used for output.", (Object)totalSecondsWaited, (Object)Thread.currentThread().getName());
                    } else if (totalSecondsWaited > 60) {
                        LOG.warn("Output channel stalled for {}s, outbound thread {}.", (Object)totalSecondsWaited, (Object)Thread.currentThread().getName());
                    } else {
                        LOG.debug("Output channel stalled for {}s, outbound thread {}.", (Object)totalSecondsWaited, (Object)Thread.currentThread().getName());
                    }
                }
            }
            this.outboundObserver.onNext(value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t) {
        Object object = this.lock;
        synchronized (object) {
            this.outboundObserver.onError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onCompleted() {
        Object object = this.lock;
        synchronized (object) {
            this.outboundObserver.onCompleted();
        }
    }
}

