/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.upserttest.sink;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.upserttest.sink.UpsertTestFileUtil;
import org.apache.flink.connector.upserttest.sink.UpsertTestSinkWriter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={TestLoggerExtension.class})
class UpsertTestSinkWriterITCase {
    @RegisterExtension
    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    @TempDir
    private File tempDir;
    private File outputFile;
    private UpsertTestSinkWriter<Tuple2<String, String>> writer;
    private List<Tuple2<String, String>> expectedRecords;

    UpsertTestSinkWriterITCase() {
    }

    @BeforeEach
    void setup() {
        this.outputFile = new File(this.tempDir, "records.out");
        this.writer = this.createSinkWriter(this.outputFile);
        this.expectedRecords = this.writeTestData(this.writer);
    }

    @AfterEach
    void tearDown() throws Exception {
        this.writer.close();
    }

    @Test
    void testWrite() throws Exception {
        this.writer.close();
        this.testRecordPresence(this.outputFile, this.expectedRecords);
    }

    @Test
    void testWriteOnCheckpoint() throws Exception {
        this.writer.flush(false);
        this.testRecordPresence(this.outputFile, this.expectedRecords);
    }

    private UpsertTestSinkWriter<Tuple2<String, String>> createSinkWriter(File outputFile) {
        SerializationSchema & Serializable keySerializationSchema = (SerializationSchema & Serializable)element -> ((String)element.f0).getBytes();
        SerializationSchema & Serializable valueSerializationSchema = (SerializationSchema & Serializable)element -> ((String)element.f1).getBytes();
        return new UpsertTestSinkWriter(outputFile, (SerializationSchema)keySerializationSchema, (SerializationSchema)valueSerializationSchema);
    }

    private List<Tuple2<String, String>> writeTestData(UpsertTestSinkWriter<Tuple2<String, String>> writer) {
        ArrayList<Tuple2<String, String>> expectedRecords = new ArrayList<Tuple2<String, String>>();
        for (int i = 0; i < 10; ++i) {
            Tuple2 record = Tuple2.of((Object)("Key #" + i), (Object)("Value #" + i));
            expectedRecords.add((Tuple2<String, String>)record);
            writer.write((Object)record, null);
        }
        return expectedRecords;
    }

    private void testRecordPresence(File outputFile, List<Tuple2<String, String>> expectedRecords) throws IOException {
        SimpleStringSchema keyDeserializationSchema = new SimpleStringSchema();
        SimpleStringSchema valueDeserializationSchema = new SimpleStringSchema();
        Map resultMap = UpsertTestFileUtil.readRecords((File)outputFile, (DeserializationSchema)keyDeserializationSchema, (DeserializationSchema)valueDeserializationSchema);
        for (Tuple2<String, String> record : expectedRecords) {
            Assertions.assertThat((Map)resultMap).containsEntry(record.f0, record.f1);
        }
    }
}

