/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.runners.direct.AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.RootInputProvider;
import org.apache.beam.runners.direct.SourceShard;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.ReadTranslation;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

final class BoundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    private static final @UnknownKeyFor @NonNull @Initialized long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0L;
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setDaemon(true).setNameFormat("direct-dynamic-split-requester").build());
    private final @UnknownKeyFor @NonNull @Initialized long minimumDynamicSplitSize;

    BoundedReadEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this(evaluationContext, options, 0L);
    }

    @VisibleForTesting
    BoundedReadEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized long minimumDynamicSplitSize) {
        this.evaluationContext = evaluationContext;
        this.options = options;
        this.minimumDynamicSplitSize = minimumDynamicSplitSize;
    }

    @Override
    public <InputT> @Nullable @UnknownKeyFor @Initialized TransformEvaluator<InputT> forApplication(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> application, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> inputBundle) throws @UnknownKeyFor @NonNull @Initialized IOException {
        return this.createEvaluator(application);
    }

    private <OutputT> /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized TransformEvaluator<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> createEvaluator(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized PCollection<OutputT>, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform) {
        return new BoundedReadEvaluator<OutputT>(transform, this.evaluationContext, this.options, this.minimumDynamicSplitSize, this.executor);
    }

    @Override
    public void cleanup() {
        this.executor.shutdown();
    }

    private static class GenerateSplitAtHalfwayPoint<@UnknownKeyFor T>
    implements Callable<BoundedSource<T>> {
        private final @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> reader;

        private GenerateSplitAtHalfwayPoint(@UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> reader) {
            this.reader = reader;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized BoundedSource<T> call() throws @UnknownKeyFor @NonNull @Initialized Exception {
            Double currentlyConsumed = this.reader.getFractionConsumed();
            if (currentlyConsumed == null || currentlyConsumed == 1.0) {
                return null;
            }
            double halfwayBetweenCurrentAndCompletion = 0.5 + currentlyConsumed / 2.0;
            return this.reader.splitAtFraction(halfwayBetweenCurrentAndCompletion);
        }
    }

    static class InputProvider<@UnknownKeyFor T>
    implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> {
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;

        InputProvider(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            this.evaluationContext = evaluationContext;
            this.options = options;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @NonNull @Initialized BoundedSourceShard<T>>> getInitialInputs(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>, @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>>> transform, @UnknownKeyFor @NonNull @Initialized int targetParallelism) throws @UnknownKeyFor @NonNull @Initialized Exception {
            BoundedSource<T> source = ReadTranslation.boundedSourceFromTransform(transform);
            long estimatedBytes = source.getEstimatedSizeBytes(this.options);
            long bytesPerBundle = estimatedBytes / (long)targetParallelism;
            List<BoundedSource<T>> bundles = source.split(bytesPerBundle, this.options);
            ImmutableList.Builder shards = ImmutableList.builder();
            for (BoundedSource<T> bundle : bundles) {
                CommittedBundle<BoundedSourceShard<T>> inputShard = this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(bundle))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
                shards.add(inputShard);
            }
            return shards.build();
        }
    }

    @AutoValue
    static abstract class BoundedSourceShard<@UnknownKeyFor T>
    implements SourceShard<T> {
        BoundedSourceShard() {
        }

        static <T> @UnknownKeyFor @NonNull @Initialized BoundedSourceShard<T> of(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> source) {
            return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<T>(source);
        }

        @Override
        public abstract @UnknownKeyFor @NonNull @Initialized BoundedSource<T> getSource();
    }

    private static class BoundedReadEvaluator<@UnknownKeyFor OutputT>
    implements TransformEvaluator<BoundedSourceShard<OutputT>> {
        private final @UnknownKeyFor @NonNull @Initialized PCollection<OutputT> outputPCollection;
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
        private @UnknownKeyFor @NonNull @Initialized StepTransformResult.Builder resultBuilder;
        private final @UnknownKeyFor @NonNull @Initialized long minimumDynamicSplitSize;
        private final @UnknownKeyFor @NonNull @Initialized ExecutorService produceSplitExecutor;

        public BoundedReadEvaluator(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized PCollection<OutputT>, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform, @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized long minimumDynamicSplitSize, @UnknownKeyFor @NonNull @Initialized ExecutorService executor) {
            this.evaluationContext = evaluationContext;
            this.outputPCollection = (PCollection)Iterables.getOnlyElement(transform.getOutputs().values());
            this.resultBuilder = StepTransformResult.withoutHold(transform);
            this.options = options;
            this.minimumDynamicSplitSize = minimumDynamicSplitSize;
            this.produceSplitExecutor = executor;
        }

        @Override
        public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized BoundedSourceShard<OutputT>> element) throws @UnknownKeyFor @NonNull @Initialized Exception {
            Source source = element.getValue().getSource();
            try (BoundedSource.BoundedReader reader = ((BoundedSource)source).createReader(this.options);){
                boolean contentsRemaining = reader.start();
                Future residualFuture = this.startDynamicSplitThread((BoundedSource<OutputT>)source, reader);
                UncommittedBundle<OutputT> output = this.evaluationContext.createBundle(this.outputPCollection);
                while (contentsRemaining) {
                    output.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
                    contentsRemaining = reader.advance();
                }
                this.resultBuilder.addOutput(output, new UncommittedBundle[0]);
                try {
                    BoundedSource residual = residualFuture.get();
                    if (residual != null) {
                        this.resultBuilder.addUnprocessedElements(element.withValue(BoundedSourceShard.of(residual)));
                    }
                }
                catch (ExecutionException exex) {
                    throw UserCodeException.wrap(exex.getCause());
                }
            }
        }

        private @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @NonNull @Initialized BoundedSource<OutputT>> startDynamicSplitThread(@UnknownKeyFor @NonNull @Initialized BoundedSource<OutputT> source, @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<OutputT> reader) throws @UnknownKeyFor @NonNull @Initialized Exception {
            if (source.getEstimatedSizeBytes(this.options) > this.minimumDynamicSplitSize) {
                return this.produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint(reader));
            }
            SettableFuture emptyFuture = SettableFuture.create();
            emptyFuture.set(null);
            return emptyFuture;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized TransformResult<@UnknownKeyFor @NonNull @Initialized BoundedSourceShard<OutputT>> finishBundle() {
            return this.resultBuilder.build();
        }
    }
}

