/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;

@Deprecated
class SinkV2SinkWriterOperatorDeprecatedTest
extends SinkWriterOperatorTestBase {
    SinkV2SinkWriterOperatorDeprecatedTest() {
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithoutCommitter() {
        TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSinkV2.newBuilder().setWriter(sinkWriter).build(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, TestSinkV2.StringSerializer::new);
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithCommitter() {
        TestSinkV2.DefaultCommittingSinkWriter sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, TestSinkV2.StringSerializer::new);
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, TestSinkV2.StringSerializer::new);
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) {
        SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter();
        TestSinkV2.Builder builder = TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().setWithPostCommitTopology(true);
        if (withState) {
            builder.setWriterState(true);
        }
        if (stateName != null) {
            builder.setCompatibleStateNames(stateName);
        }
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(builder.build(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> sinkWriter.lastCheckpointId, () -> new TestSinkV2.StringSerializer());
    }

    private static class SnapshottingBufferingSinkWriter
    extends TestSinkV2.DefaultStatefulSinkWriter {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId = -1L;
        boolean endOfInput = false;

        private SnapshottingBufferingSinkWriter() {
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {
            this.endOfInput = endOfInput;
        }

        @Override
        public List<String> snapshotState(long checkpointId) throws IOException {
            this.lastCheckpointId = checkpointId;
            return super.snapshotState(checkpointId);
        }

        @Override
        public Collection<String> prepareCommit() {
            if (!this.endOfInput) {
                return ImmutableList.of();
            }
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(time + 1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        @Override
        public void init(Sink.InitContext context) {
            this.processingTimeService = context.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

