/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.TestJobExecutor;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BoundedSourceITCase
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(BoundedSourceITCase.configuration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    @Rule
    public Timeout timeoutRule = new Timeout(10L, TimeUnit.MINUTES);
    @Parameterized.Parameter
    public TestJobBuilders.TestingGraphBuilder graphBuilder;

    private static Configuration configuration() {
        Configuration conf = new Configuration();
        try {
            FsStateChangelogStorageFactory.configure((Configuration)conf, (File)TEMPORARY_FOLDER.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return conf;
    }

    @Parameterized.Parameters(name="{0}")
    public static Object[] parameters() {
        return new Object[]{TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestJobBuilders.COMPLEX_GRAPH_BUILDER};
    }

    @Test
    public void test() throws Exception {
        TestJobWithDescription testJob = this.graphBuilder.build(this.sharedObjects, (ThrowingConsumer<Configuration, Exception>)((ThrowingConsumer)cfg -> {}), (ThrowingConsumer<StreamExecutionEnvironment, Exception>)((ThrowingConsumer)env -> env.getCheckpointConfig().setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI())));
        TestJobExecutor.execute(testJob, this.miniClusterResource).waitForEvent(CheckpointCompletedEvent.class).sendBroadcastCommand(TestCommand.FINISH_SOURCES, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS).waitForTermination().assertFinishedSuccessfully();
        TestOperatorLifecycleValidator.checkOperatorsLifecycle(testJob, new DrainingValidator(), new FinishingValidator());
        TestJobDataFlowValidator.checkDataFlow(testJob, true);
    }
}

