/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class BeamFnDataTimeBasedBufferingOutboundObserver<@UnknownKeyFor T>
extends BeamFnDataSizeBasedBufferingOutboundObserver<T> {
    @VisibleForTesting
    final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized ScheduledFuture<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> flushFuture;

    BeamFnDataTimeBasedBufferingOutboundObserver(@UnknownKeyFor @NonNull @Initialized int sizeLimit, @UnknownKeyFor @NonNull @Initialized long timeLimit, @UnknownKeyFor @NonNull @Initialized LogicalEndpoint outputLocation, @UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized StreamObserver<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements> outboundObserver) {
        super(sizeLimit, outputLocation, coder, outboundObserver);
        this.flushFuture = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DataBufferOutboundFlusher-thread").build()).scheduleAtFixedRate(this::periodicFlush, timeLimit, timeLimit, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.checkFlushThreadException();
        this.flushFuture.cancel(false);
        try {
            this.flushFuture.get();
        }
        catch (ExecutionException ee) {
            this.unwrapExecutionException(ee);
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
        super.close();
    }

    @Override
    public synchronized void flush() throws @UnknownKeyFor @NonNull @Initialized IOException {
        super.flush();
    }

    @Override
    public void accept(T t) throws @UnknownKeyFor @NonNull @Initialized IOException {
        this.checkFlushThreadException();
        super.accept(t);
    }

    private void periodicFlush() {
        try {
            this.flush();
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    private void checkFlushThreadException() throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (this.flushFuture.isDone()) {
            try {
                this.flushFuture.get();
                throw new IOException("Periodic flushing thread finished unexpectedly.");
            }
            catch (ExecutionException ee) {
                this.unwrapExecutionException(ee);
            }
            catch (CancellationException ce) {
                throw new IOException(ce);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException(ie);
            }
        }
    }

    private void unwrapExecutionException(@UnknownKeyFor @NonNull @Initialized ExecutionException ee) throws @UnknownKeyFor @NonNull @Initialized IOException {
        RuntimeException re = (RuntimeException)ee.getCause();
        if (re.getCause() instanceof IOException) {
            throw (IOException)re.getCause();
        }
        throw new IOException(re.getCause());
    }
}

