/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.event;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.GenericJobEventSerializer;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.jobmaster.event.TestingJobEvent;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FileSystemJobEventStoreTest {
    @TempDir
    private java.nio.file.Path temporaryFolder;

    FileSystemJobEventStoreTest() {
    }

    @Test
    void testReadAndWriteEvent() throws Exception {
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore store = new FileSystemJobEventStore(rootPath, new Configuration());
        store.registerJobEventSerializer(99999, (SimpleVersionedSerializer)new GenericJobEventSerializer());
        store.start();
        TestingJobEvent event0 = new TestingJobEvent(0);
        TestingJobEvent event1 = new TestingJobEvent(1);
        TestingJobEvent event2 = new TestingJobEvent(2);
        TestingJobEvent event3 = new TestingJobEvent(3);
        TestingJobEvent event4 = new TestingJobEvent(4);
        store.writeEvent((JobEvent)event0);
        store.writeEvent((JobEvent)event1);
        store.writeEvent((JobEvent)event2);
        store.writeEvent((JobEvent)event3);
        store.writeEvent((JobEvent)event4);
        store.stop(false);
        store.start();
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event0);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event1);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event2);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event3);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event4);
        Assertions.assertThat((Object)store.readEvent()).isNull();
        store.stop(false);
        store.start();
        store.writeEvent((JobEvent)event0);
        store.writeEvent((JobEvent)event1);
        store.stop(false);
        store.start();
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event0);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event1);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event2);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event3);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event4);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event0);
        Assertions.assertThat((Object)store.readEvent()).isEqualTo((Object)event1);
        Assertions.assertThat((Object)store.readEvent()).isNull();
        store.stop(true);
        store.start();
        Assertions.assertThat((Object)store.readEvent()).isNull();
        store.stop(true);
    }

    @Test
    void testMultiThreadWriteEvent() throws Exception {
        JobEvent event;
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore store = new FileSystemJobEventStore(rootPath, new Configuration());
        store.registerJobEventSerializer(99999, (SimpleVersionedSerializer)new GenericJobEventSerializer());
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        store.start();
        TestingJobEvent event0 = new TestingJobEvent(0);
        TestingJobEvent event1 = new TestingJobEvent(1);
        TestingJobEvent event2 = new TestingJobEvent(2);
        TestingJobEvent event3 = new TestingJobEvent(3);
        TestingJobEvent event4 = new TestingJobEvent(4);
        TestingJobEvent event5 = new TestingJobEvent(5);
        TestingJobEvent event6 = new TestingJobEvent(6);
        TestingJobEvent event7 = new TestingJobEvent(7);
        TestingJobEvent event8 = new TestingJobEvent(8);
        TestingJobEvent event9 = new TestingJobEvent(9);
        executor.schedule(() -> {
            store.writeEvent(event5);
            store.writeEvent(event6);
            store.writeEvent(event7);
            store.writeEvent(event8);
            store.writeEvent(event9);
        }, 0L, TimeUnit.MILLISECONDS);
        store.writeEvent((JobEvent)event0);
        store.writeEvent((JobEvent)event1);
        store.writeEvent((JobEvent)event2);
        store.writeEvent((JobEvent)event3);
        store.writeEvent((JobEvent)event4);
        executor.shutdown();
        executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        store.stop(false);
        store.start();
        ArrayList<JobEvent> eventList = new ArrayList<JobEvent>();
        while ((event = store.readEvent()) != null) {
            eventList.add(event);
        }
        Assertions.assertThat(eventList).hasSize(10);
        Assertions.assertThat(eventList).containsExactlyInAnyOrder((Object[])new JobEvent[]{event0, event1, event2, event3, event4, event5, event6, event7, event8, event9});
        store.stop(true);
    }

    @Test
    void testStopWithClear() throws IOException {
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore store = new FileSystemJobEventStore(rootPath, new Configuration());
        store.registerJobEventSerializer(99999, (SimpleVersionedSerializer)new GenericJobEventSerializer());
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isZero();
        store.start();
        store.writeEvent((JobEvent)new TestingJobEvent(0));
        store.stop(false);
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isEqualTo(1);
        store.start();
        store.stop(true);
        Assertions.assertThat((boolean)new File(rootPath.getPath()).exists()).isFalse();
    }

    @Test
    void testReadUnexpectedLengthEvent() throws Exception {
        JobEvent event;
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore store = new FileSystemJobEventStore(rootPath, new Configuration());
        store.registerJobEventSerializer(99999, (SimpleVersionedSerializer)new GenericJobEventSerializer());
        store.start();
        TestingJobEvent event0 = new TestingJobEvent(0);
        TestingJobEvent event1 = new TestingJobEvent(1);
        TestingJobEvent event2 = new TestingJobEvent(2);
        TestingJobEvent event3 = new TestingJobEvent(3);
        TestingJobEvent event4 = new TestingJobEvent(4);
        store.writeEvent((JobEvent)event0);
        store.writeEvent((JobEvent)event1);
        store.writeEvent((JobEvent)event2);
        store.writeEvent((JobEvent)event3);
        store.writeEvent((JobEvent)event4);
        store.getEventWriterExecutor().submit(() -> {
            try {
                store.getOutputStream().writeInt(99999);
                store.getOutputStream().writeInt(5000);
                store.writeEvent((JobEvent)new TestingJobEvent(100));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).get();
        store.stop(false);
        store.start();
        TestingJobEvent event5 = new TestingJobEvent(5);
        TestingJobEvent event6 = new TestingJobEvent(6);
        TestingJobEvent event7 = new TestingJobEvent(7);
        TestingJobEvent event8 = new TestingJobEvent(8);
        TestingJobEvent event9 = new TestingJobEvent(9);
        store.writeEvent((JobEvent)event5);
        store.writeEvent((JobEvent)event6);
        store.writeEvent((JobEvent)event7);
        store.writeEvent((JobEvent)event8);
        store.writeEvent((JobEvent)event9);
        store.stop(false);
        store.start();
        ArrayList<JobEvent> eventList = new ArrayList<JobEvent>();
        while ((event = store.readEvent()) != null) {
            eventList.add(event);
        }
        Assertions.assertThat(eventList).hasSize(10);
        Assertions.assertThat(eventList).containsExactlyInAnyOrder((Object[])new JobEvent[]{event0, event1, event2, event3, event4, event5, event6, event7, event8, event9});
        store.stop(true);
    }

    @Test
    void testGenerateWorkingDirCorrectly() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///tmp/flink");
        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "cluster_id");
        JobID jobID = new JobID();
        FileSystemJobEventStore store = new FileSystemJobEventStore(jobID, configuration);
        Assertions.assertThat((String)store.getWorkingDir().getPath()).isEqualTo(String.format("/tmp/flink/cluster_id/%s/job-events", jobID));
    }

    @Test
    void testCutBlock() throws Exception {
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore store = new FileSystemJobEventStore(rootPath, new Configuration());
        store.registerJobEventSerializer(99999, (SimpleVersionedSerializer)new GenericJobEventSerializer());
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isZero();
        store.start();
        store.writeEvent((JobEvent)new TestingJobEvent(0));
        this.tryFlushOutputStream(store);
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isEqualTo(1);
        store.writeEvent((JobEvent)new TestingJobEvent(1), true);
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isEqualTo(1);
        store.writeEvent((JobEvent)new TestingJobEvent(2));
        this.tryFlushOutputStream(store);
        Assertions.assertThat((int)new File(rootPath.getPath()).listFiles().length).isEqualTo(2);
        store.stop(true);
    }

    private void tryFlushOutputStream(FileSystemJobEventStore store) throws Exception {
        this.runInEventWriterExecutor(store, () -> {
            try {
                if (store.getOutputStream() != null) {
                    store.getOutputStream().flush();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
    }

    private void runInEventWriterExecutor(FileSystemJobEventStore store, Runnable runnable) throws Exception {
        store.getEventWriterExecutor().submit(runnable).get();
    }
}

