/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;

@Internal
public class DataStreamV2SinkTransformationTranslator<Input, Output>
implements TransformationTranslator<Output, DataStreamV2SinkTransformation<Input, Output>> {
    private static final String COMMITTER_NAME = "Committer";
    private static final String WRITER_NAME = "Writer";

    public Collection<Integer> translateForBatch(DataStreamV2SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, true);
    }

    public Collection<Integer> translateForStreaming(DataStreamV2SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, false);
    }

    private Collection<Integer> translateInternal(DataStreamV2SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context, boolean batch) {
        SinkExpander<Input> expander = new SinkExpander<Input>(transformation.getInputStream(), transformation.getSink(), transformation, context, batch);
        expander.expand();
        return Collections.emptyList();
    }

    public static void registerSinkTransformationTranslator() throws Exception {
        Field translatorMapField = StreamGraphGenerator.class.getDeclaredField("translatorMap");
        translatorMapField.setAccessible(true);
        Map translatorMap = (Map)translatorMapField.get(null);
        Field underlyingMapField = translatorMap.getClass().getDeclaredField("m");
        underlyingMapField.setAccessible(true);
        Map underlyingMap = (Map)underlyingMapField.get(translatorMap);
        underlyingMap.put(DataStreamV2SinkTransformation.class, new DataStreamV2SinkTransformationTranslator());
    }

    private static class SinkExpander<T> {
        private final DataStreamV2SinkTransformation<T, ?> transformation;
        private final Sink<T> sink;
        private final TransformationTranslator.Context context;
        private final AbstractDataStream<T> inputStream;
        private final ExecutionEnvironmentImpl executionEnvironment;
        private final boolean isCheckpointingEnabled;
        private final boolean isBatchMode;

        public SinkExpander(AbstractDataStream<T> inputStream, Sink<T> sink, DataStreamV2SinkTransformation<T, ?> transformation, TransformationTranslator.Context context, boolean isBatchMode) {
            this.inputStream = inputStream;
            this.executionEnvironment = inputStream.getEnvironment();
            this.isCheckpointingEnabled = this.executionEnvironment.getCheckpointCfg().isCheckpointingEnabled();
            this.transformation = transformation;
            this.sink = sink;
            this.context = context;
            this.isBatchMode = isBatchMode;
        }

        private <CommT, WriteResultT> boolean checkSinkDoNotAddCommitTopology(Sink<T> sink) {
            SupportsCommitter committingSink = (SupportsCommitter)sink;
            SupportsPreCommitTopology preCommittingSink = (SupportsPreCommitTopology)sink;
            TypeInformation writeResultTypeInformation = CommittableMessageTypeInfo.of(() -> ((SupportsPreCommitTopology)preCommittingSink).getWriteResultSerializer());
            Transformation dummyTransformation = new Transformation<CommittableMessage<WriteResultT>>("dummy", writeResultTypeInformation, 1){

                protected List<Transformation<?>> getTransitivePredecessorsInternal() {
                    return List.of();
                }

                public List<Transformation<?>> getInputs() {
                    return List.of();
                }
            };
            DataStream committableMessageDataStream = preCommittingSink.addPreCommitTopology(new DataStream(new StreamExecutionEnvironment(), dummyTransformation));
            return committableMessageDataStream.getExecutionEnvironment().getTransformations().isEmpty();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void expand() {
            int sizeBefore = this.executionEnvironment.getTransformations().size();
            AbstractDataStream<T> prewritten = this.inputStream;
            if (this.sink instanceof SupportsPreWriteTopology) {
                throw new UnsupportedOperationException("Sink with pre-write topology is not supported for DataStream v2 atm.");
            }
            if (this.sink instanceof SupportsPreCommitTopology) {
                if (!this.sink.getClass().getName().equals("org.apache.flink.connector.file.sink.FileSink")) throw new UnsupportedOperationException("Sink with pre-commit topology is not supported for DataStream v2 atm.");
                if (!this.checkSinkDoNotAddCommitTopology(this.sink)) {
                    throw new UnsupportedOperationException("Sink with pre-commit topology is not supported for DataStream v2 atm.");
                }
            } else if (this.sink instanceof SupportsPostCommitTopology) {
                throw new UnsupportedOperationException("Sink with post-commit topology is not supported for DataStream v2 atm.");
            }
            if (this.sink instanceof SupportsCommitter) {
                this.addCommittingTopology(this.sink, prewritten);
            } else {
                this.adjustTransformations(prewritten, input -> StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.WRITER_NAME, input, CommittableMessageTypeInfo.noOutput(), new SinkWriterOperatorFactory(this.sink)), this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            List<Transformation<?>> sinkTransformations = this.executionEnvironment.getTransformations().subList(sizeBefore, this.executionEnvironment.getTransformations().size());
            sinkTransformations.forEach(arg_0 -> ((TransformationTranslator.Context)this.context).transform(arg_0));
            while (this.executionEnvironment.getTransformations().size() > sizeBefore) {
                this.executionEnvironment.getTransformations().remove(this.executionEnvironment.getTransformations().size() - 1);
            }
        }

        private <CommT> void addCommittingTopology(Sink<T> sink, AbstractDataStream<T> inputStream) {
            SupportsCommitter committingSink = (SupportsCommitter)sink;
            TypeInformation committableTypeInformation = CommittableMessageTypeInfo.of(() -> ((SupportsCommitter)committingSink).getCommittableSerializer());
            this.adjustTransformations(this.addWriter(sink, inputStream, committableTypeInformation), stream -> StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.COMMITTER_NAME, stream, committableTypeInformation, new CommitterOperatorFactory(committingSink, this.isBatchMode, this.isCheckpointingEnabled)), false);
        }

        private <WriteResultT> AbstractDataStream<CommittableMessage<WriteResultT>> addWriter(Sink<T> sink, AbstractDataStream<T> inputStream, TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
            AbstractDataStream written = this.adjustTransformations(inputStream, input -> StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.WRITER_NAME, input, typeInformation, new SinkWriterOperatorFactory(sink)), sink instanceof SupportsConcurrentExecutionAttempts);
            return this.addFailOverRegion(written);
        }

        private <I> AbstractDataStream<I> addFailOverRegion(AbstractDataStream<I> input) {
            return new NonKeyedPartitionStreamImpl(input.getEnvironment(), new PartitionTransformation(input.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.BATCH));
        }

        private <I, R> R adjustTransformations(AbstractDataStream<I> inputStream, Function<AbstractDataStream<I>, R> action, boolean supportsConcurrentExecutionAttempts) {
            int numTransformsBefore = this.executionEnvironment.getTransformations().size();
            R result = action.apply(inputStream);
            List<Transformation<?>> transformations = this.executionEnvironment.getTransformations();
            List<Transformation<?>> expandedTransformations = transformations.subList(numTransformsBefore, transformations.size());
            CustomSinkOperatorUidHashes operatorsUidHashes = CustomSinkOperatorUidHashes.DEFAULT;
            for (Transformation<?> subTransformation : expandedTransformations) {
                this.setOperatorUidHashIfPossible(subTransformation, DataStreamV2SinkTransformationTranslator.WRITER_NAME, operatorsUidHashes.getWriterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, DataStreamV2SinkTransformationTranslator.COMMITTER_NAME, operatorsUidHashes.getCommitterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, "Global Committer", operatorsUidHashes.getGlobalCommitterUidHash());
                this.concatUid(subTransformation, Transformation::getUid, Transformation::setUid, subTransformation.getName());
                this.concatProperty(subTransformation, Transformation::getCoLocationGroupKey, Transformation::setCoLocationGroupKey);
                this.concatProperty(subTransformation, Transformation::getName, Transformation::setName);
                this.concatProperty(subTransformation, Transformation::getDescription, Transformation::setDescription);
                Optional ssg = this.transformation.getSlotSharingGroup();
                if (ssg.isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) {
                    subTransformation.setSlotSharingGroup((SlotSharingGroup)ssg.get());
                }
                subTransformation.setParallelism(this.transformation.getParallelism());
                if (subTransformation.getMaxParallelism() < 0 && this.transformation.getMaxParallelism() > 0) {
                    subTransformation.setMaxParallelism(this.transformation.getMaxParallelism());
                }
                if (!(subTransformation instanceof PhysicalTransformation)) continue;
                PhysicalTransformation physicalSubTransformation = (PhysicalTransformation)subTransformation;
                if (this.transformation.getChainingStrategy() != null) {
                    physicalSubTransformation.setChainingStrategy(this.transformation.getChainingStrategy());
                }
                physicalSubTransformation.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
            }
            return result;
        }

        private void setOperatorUidHashIfPossible(Transformation<?> transformation, String writerName, @Nullable String operatorUidHash) {
            if (operatorUidHash == null || !transformation.getName().equals(writerName)) {
                return;
            }
            transformation.setUidHash(operatorUidHash);
        }

        private void concatUid(Transformation<?> subTransformation, Function<Transformation<?>, String> getter, BiConsumer<Transformation<?>, String> setter, @Nullable String transformationName) {
            if (transformationName != null && getter.apply((Transformation<?>)this.transformation) != null) {
                if (transformationName.equals(DataStreamV2SinkTransformationTranslator.COMMITTER_NAME)) {
                    String committerFormat = "Sink Committer: %s";
                    setter.accept(subTransformation, String.format("Sink Committer: %s", getter.apply((Transformation<?>)this.transformation)));
                    return;
                }
                if (transformationName.equals(DataStreamV2SinkTransformationTranslator.WRITER_NAME)) {
                    setter.accept(subTransformation, getter.apply((Transformation<?>)this.transformation));
                    return;
                }
                if (transformationName.equals("Global Committer")) {
                    String committerFormat = "Sink %s Global Committer";
                    setter.accept(subTransformation, String.format("Sink %s Global Committer", getter.apply((Transformation<?>)this.transformation)));
                    return;
                }
            }
            this.concatProperty(subTransformation, getter, setter);
        }

        private void concatProperty(Transformation<?> subTransformation, Function<Transformation<?>, String> getter, BiConsumer<Transformation<?>, String> setter) {
            if (getter.apply((Transformation<?>)this.transformation) != null && getter.apply(subTransformation) != null) {
                setter.accept(subTransformation, getter.apply((Transformation<?>)this.transformation) + ": " + getter.apply(subTransformation));
            }
        }
    }
}

