/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.data.AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleExecutionState;
import org.apache.beam.runners.core.metrics.SimpleStateRegistry;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;

public class PCollectionConsumerRegistry {
    private ListMultimap<String, ConsumerAndMetadata> pCollectionIdsToConsumers;
    private Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer;
    private MetricsContainerStepMap metricsContainerRegistry;
    private ExecutionStateTracker stateTracker;
    private SimpleStateRegistry executionStates = new SimpleStateRegistry();

    public PCollectionConsumerRegistry(MetricsContainerStepMap metricsContainerRegistry, ExecutionStateTracker stateTracker) {
        this.metricsContainerRegistry = metricsContainerRegistry;
        this.stateTracker = stateTracker;
        this.pCollectionIdsToConsumers = ArrayListMultimap.create();
        this.pCollectionIdsToWrappedConsumer = new HashMap<String, FnDataReceiver>();
    }

    public <T> void register(String pCollectionId, String pTransformId, FnDataReceiver<WindowedValue<T>> consumer, Coder<T> valueCoder) {
        if (this.pCollectionIdsToWrappedConsumer.containsKey(pCollectionId)) {
            throw new RuntimeException("New consumers for a pCollectionId cannot be register()-d after calling getMultiplexingConsumer.");
        }
        HashMap<String, String> labelsMetadata = new HashMap<String, String>();
        labelsMetadata.put("PTRANSFORM", pTransformId);
        SimpleExecutionState state = new SimpleExecutionState("process", MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS, labelsMetadata);
        this.executionStates.register(state);
        this.pCollectionIdsToConsumers.put(pCollectionId, ConsumerAndMetadata.forConsumer(consumer, pTransformId, state, valueCoder, this.metricsContainerRegistry.getContainer(pTransformId)));
    }

    public void reset() {
        this.executionStates.reset();
    }

    public Set<String> keySet() {
        return this.pCollectionIdsToConsumers.keySet();
    }

    public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollectionId) {
        return this.pCollectionIdsToWrappedConsumer.computeIfAbsent(pCollectionId, pcId -> {
            Collection consumerAndMetadatas = this.pCollectionIdsToConsumers.get(pcId);
            if (consumerAndMetadatas == null) {
                throw new IllegalArgumentException(String.format("Unknown PCollectionId %s", pCollectionId));
            }
            if (consumerAndMetadatas.size() == 1) {
                ConsumerAndMetadata consumerAndMetadata = (ConsumerAndMetadata)consumerAndMetadatas.get(0);
                if (consumerAndMetadata.getConsumer() instanceof HandlesSplits) {
                    return new SplittingMetricTrackingFnDataReceiver((String)pcId, consumerAndMetadata);
                }
                return new MetricTrackingFnDataReceiver((String)pcId, consumerAndMetadata);
            }
            return new MultiplexingMetricTrackingFnDataReceiver((String)pcId, (List<ConsumerAndMetadata>)ImmutableList.copyOf(consumerAndMetadatas));
        });
    }

    public List<MetricsApi.MonitoringInfo> getExecutionTimeMonitoringInfos() {
        return this.executionStates.getExecutionTimeMonitoringInfos();
    }

    public Map<String, ByteString> getExecutionTimeMonitoringData(ShortIdMap shortIds) {
        return this.executionStates.getExecutionTimeMonitoringData(shortIds);
    }

    private static class SampleByteSizeDistribution<T> {
        final Distribution distribution;
        ByteSizeObserver byteCountObserver;
        private static final int RESERVOIR_SIZE = 10;
        private static final int SAMPLING_THRESHOLD = 30;
        private long samplingToken = 0L;
        private long nextSamplingToken = 0L;
        private Random randomGenerator = new Random();

        public SampleByteSizeDistribution(Distribution distribution) {
            this.distribution = distribution;
            this.byteCountObserver = null;
        }

        public void tryUpdate(T value, Coder<T> coder) throws Exception {
            if (this.shouldSampleElement()) {
                this.byteCountObserver = new ByteSizeObserver();
                coder.registerByteSizeObserver(value, this.byteCountObserver);
                if (!this.byteCountObserver.getIsLazy()) {
                    this.byteCountObserver.advance();
                    this.distribution.update(this.byteCountObserver.observedSize);
                }
            } else {
                this.byteCountObserver = null;
            }
        }

        public void finishLazyUpdate() {
            if (this.byteCountObserver != null && this.byteCountObserver.getIsLazy()) {
                this.byteCountObserver.advance();
                this.distribution.update(this.byteCountObserver.observedSize);
            }
        }

        private boolean shouldSampleElement() {
            if (this.samplingToken + 1L == Long.MAX_VALUE) {
                this.samplingToken = 0L;
                this.nextSamplingToken = this.getNextSamplingToken(this.samplingToken);
            }
            ++this.samplingToken;
            if (this.nextSamplingToken == 0L) {
                if (this.samplingToken <= 10L || this.randomGenerator.nextInt((int)this.samplingToken) < 10) {
                    if (this.samplingToken > 30L) {
                        this.nextSamplingToken = this.getNextSamplingToken(this.samplingToken);
                    }
                    return true;
                }
            } else if (this.samplingToken >= this.nextSamplingToken) {
                this.nextSamplingToken = this.getNextSamplingToken(this.samplingToken);
                return true;
            }
            return false;
        }

        private long getNextSamplingToken(long samplingToken) {
            double gap = Math.log(1.0 - this.randomGenerator.nextDouble()) / Math.log(1.0 - 10.0 / (double)samplingToken);
            return samplingToken + (long)((int)gap);
        }

        private static class ByteSizeObserver
        extends ElementByteSizeObserver {
            private long observedSize = 0L;

            private ByteSizeObserver() {
            }

            @Override
            protected void reportElementSize(long elementSize) {
                this.observedSize += elementSize;
            }
        }
    }

    private class SplittingMetricTrackingFnDataReceiver<T>
    extends MetricTrackingFnDataReceiver<T>
    implements HandlesSplits {
        private final HandlesSplits delegate;

        public SplittingMetricTrackingFnDataReceiver(String pCollection, ConsumerAndMetadata consumerAndMetadata) {
            super(pCollection, consumerAndMetadata);
            this.delegate = (HandlesSplits)((Object)consumerAndMetadata.getConsumer());
        }

        @Override
        public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
            return this.delegate.trySplit(fractionOfRemainder);
        }

        @Override
        public double getProgress() {
            return this.delegate.getProgress();
        }
    }

    private class MultiplexingMetricTrackingFnDataReceiver<T>
    implements FnDataReceiver<WindowedValue<T>> {
        private final List<ConsumerAndMetadata> consumerAndMetadatas;
        private final Counter unboundedElementCountCounter;
        private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;

        public MultiplexingMetricTrackingFnDataReceiver(String pCollectionId, List<ConsumerAndMetadata> consumerAndMetadatas) {
            this.consumerAndMetadatas = consumerAndMetadatas;
            HashMap<String, String> labels = new HashMap<String, String>();
            labels.put("PCOLLECTION", pCollectionId);
            MetricsContainerImpl unboundMetricContainer = PCollectionConsumerRegistry.this.metricsContainerRegistry.getUnboundContainer();
            MonitoringInfoMetricName elementCountMetricName = MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
            this.unboundedElementCountCounter = unboundMetricContainer.getCounter(elementCountMetricName);
            MonitoringInfoMetricName sampledByteSizeMetricName = MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, labels);
            this.unboundedSampledByteSizeDistribution = new SampleByteSizeDistribution(unboundMetricContainer.getDistribution(sampledByteSizeMetricName));
        }

        @Override
        public void accept(WindowedValue<T> input) throws Exception {
            this.unboundedElementCountCounter.inc(input.getWindows().size());
            for (ConsumerAndMetadata consumerAndMetadata : this.consumerAndMetadatas) {
                if (consumerAndMetadata.getValueCoder() != null) {
                    this.unboundedSampledByteSizeDistribution.tryUpdate(input.getValue(), consumerAndMetadata.getValueCoder());
                }
                try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(consumerAndMetadata.getMetricsContainer());
                     Closeable trackerCloseable = PCollectionConsumerRegistry.this.stateTracker.enterState(consumerAndMetadata.getExecutionState());){
                    consumerAndMetadata.getConsumer().accept(input);
                }
                this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
            }
        }
    }

    private class MetricTrackingFnDataReceiver<T>
    implements FnDataReceiver<WindowedValue<T>> {
        private final FnDataReceiver<WindowedValue<T>> delegate;
        private final SimpleExecutionState state;
        private final Counter unboundedElementCountCounter;
        private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;
        private final Coder<T> coder;
        private final MetricsContainer metricsContainer;

        public MetricTrackingFnDataReceiver(String pCollectionId, ConsumerAndMetadata consumerAndMetadata) {
            this.delegate = consumerAndMetadata.getConsumer();
            this.state = consumerAndMetadata.getExecutionState();
            HashMap<String, String> labels = new HashMap<String, String>();
            labels.put("PCOLLECTION", pCollectionId);
            MetricsContainerImpl unboundMetricContainer = PCollectionConsumerRegistry.this.metricsContainerRegistry.getUnboundContainer();
            MonitoringInfoMetricName elementCountMetricName = MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
            this.unboundedElementCountCounter = unboundMetricContainer.getCounter(elementCountMetricName);
            MonitoringInfoMetricName sampledByteSizeMetricName = MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, labels);
            this.unboundedSampledByteSizeDistribution = new SampleByteSizeDistribution(unboundMetricContainer.getDistribution(sampledByteSizeMetricName));
            this.coder = consumerAndMetadata.getValueCoder();
            this.metricsContainer = consumerAndMetadata.getMetricsContainer();
        }

        @Override
        public void accept(WindowedValue<T> input) throws Exception {
            this.unboundedElementCountCounter.inc(input.getWindows().size());
            this.unboundedSampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
            try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(this.metricsContainer);
                 Closeable trackerCloseable = PCollectionConsumerRegistry.this.stateTracker.enterState(this.state);){
                this.delegate.accept(input);
            }
            this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    static abstract class ConsumerAndMetadata {
        ConsumerAndMetadata() {
        }

        public static ConsumerAndMetadata forConsumer(FnDataReceiver consumer, String pTransformId, SimpleExecutionState state, Coder valueCoder, MetricsContainer metricsContainer) {
            return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(consumer, pTransformId, state, valueCoder, metricsContainer);
        }

        public abstract FnDataReceiver getConsumer();

        public abstract String getPTransformId();

        public abstract SimpleExecutionState getExecutionState();

        public abstract Coder getValueCoder();

        public abstract MetricsContainer getMetricsContainer();
    }
}

