/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStreamThreadTest {
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.NONE);
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private GlobalStreamThread globalStreamThread;
    private StreamsConfig config;
    private String baseDirectoryName;
    private static final String GLOBAL_STORE_TOPIC_NAME = "foo";
    private static final String GLOBAL_STORE_NAME = "bar";
    private final TopicPartition topicPartition = new TopicPartition("foo", 0);

    @Before
    public void before() {
        MaterializedInternal materialized = new MaterializedInternal(Materialized.with(null, null), new InternalNameProvider(){

            public String newProcessorName(String prefix) {
                return "processorName";
            }

            public String newStoreName(String prefix) {
                return GlobalStreamThreadTest.GLOBAL_STORE_NAME;
            }
        }, "store-");
        this.builder.addGlobalStore(new TimestampedKeyValueStoreMaterializer(materialized).materialize().withLoggingDisabled(), "sourceName", null, null, null, GLOBAL_STORE_TOPIC_NAME, "processorName", () -> ProcessorAdapter.adapt((Processor)new KTableSource(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));
        this.baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("bootstrap.servers", "blah");
        properties.put("application.id", "testAppId");
        properties.put("state.dir", this.baseDirectoryName);
        properties.put("default.key.serde", Serdes.ByteArraySerde.class.getName());
        properties.put("default.value.serde", Serdes.ByteArraySerde.class.getName());
        this.config = new StreamsConfig(properties);
        this.globalStreamThread = new GlobalStreamThread(this.builder.rewriteTopology(this.config).buildGlobalStateTopology(), this.config, this.mockConsumer, new StateDirectory(this.config, (Time)this.time, true), 0L, new StreamsMetricsImpl(new Metrics(), "test-client", "latest", (Time)this.time), (Time)this.time, "clientId", (StateRestoreListener)this.stateRestoreListener, e -> {});
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() throws Exception {
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        try {
            this.globalStreamThread.start();
            Assert.fail((String)"Should have thrown StreamsException if start up failed");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        this.globalStreamThread.join();
        MatcherAssert.assertThat((Object)globalStore.isOpen(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertFalse((boolean)this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public List<PartitionInfo> partitionsFor(String topic) {
                throw new RuntimeException("KABOOM!");
            }
        };
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        this.globalStreamThread = new GlobalStreamThread(this.builder.buildGlobalStateTopology(), this.config, (Consumer)mockConsumer, new StateDirectory(this.config, (Time)this.time, true), 0L, new StreamsMetricsImpl(new Metrics(), "test-client", "latest", (Time)this.time), (Time)this.time, "clientId", (StateRestoreListener)this.stateRestoreListener, e -> {});
        try {
            this.globalStreamThread.start();
            Assert.fail((String)"Should have thrown StreamsException if start up failed");
        }
        catch (StreamsException e2) {
            MatcherAssert.assertThat((Object)e2.getCause(), (Matcher)IsInstanceOf.instanceOf(RuntimeException.class));
            MatcherAssert.assertThat((Object)e2.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"KABOOM!"));
        }
        this.globalStreamThread.join();
        MatcherAssert.assertThat((Object)globalStore.isOpen(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertFalse((boolean)this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldBeRunningAfterSuccessfulStart() throws Exception {
        this.initializeConsumer();
        this.startAndSwallowError();
        Assert.assertTrue((boolean)this.globalStreamThread.stillRunning());
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
    }

    @Test(timeout=30000L)
    public void shouldStopRunningWhenClosedByUser() throws Exception {
        this.initializeConsumer();
        this.startAndSwallowError();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertEquals((Object)GlobalStreamThread.State.DEAD, (Object)this.globalStreamThread.state());
    }

    @Test
    public void shouldCloseStateStoresOnClose() throws Exception {
        this.initializeConsumer();
        this.startAndSwallowError();
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        Assert.assertTrue((boolean)globalStore.isOpen());
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertFalse((boolean)globalStore.isOpen());
    }

    @Test
    public void shouldStayDeadAfterTwoCloses() throws Exception {
        this.initializeConsumer();
        this.startAndSwallowError();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        this.globalStreamThread.shutdown();
        Assert.assertEquals((Object)GlobalStreamThread.State.DEAD, (Object)this.globalStreamThread.state());
    }

    @Test
    public void shouldTransitionToRunningOnStart() throws Exception {
        this.initializeConsumer();
        this.startAndSwallowError();
        TestUtils.waitForCondition(() -> this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING, (long)10000L, (String)"Thread never started.");
        this.globalStreamThread.shutdown();
    }

    @Test
    public void shouldDieOnInvalidOffsetExceptionDuringStartup() throws Exception {
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        this.initializeConsumer();
        this.mockConsumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

            public Set<TopicPartition> partitions() {
                return Collections.singleton(GlobalStreamThreadTest.this.topicPartition);
            }
        });
        this.startAndSwallowError();
        TestUtils.waitForCondition(() -> this.globalStreamThread.state() == GlobalStreamThread.State.DEAD, (long)10000L, (String)"GlobalStreamThread should have died.");
        this.globalStreamThread.join();
        MatcherAssert.assertThat((Object)globalStore.isOpen(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertFalse((boolean)new File(this.baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists());
    }

    @Test
    public void shouldDieOnInvalidOffsetExceptionWhileRunning() throws Exception {
        StateStore globalStore = (StateStore)this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        this.initializeConsumer();
        this.startAndSwallowError();
        TestUtils.waitForCondition(() -> this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING, (long)10000L, (String)"Thread never started.");
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 1L));
        this.mockConsumer.addRecord(ConsumerRecordUtil.record(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition(() -> this.mockConsumer.position(this.topicPartition) == 1L, (long)10000L, (String)"Input record never consumed");
        this.mockConsumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

            public Set<TopicPartition> partitions() {
                return Collections.singleton(GlobalStreamThreadTest.this.topicPartition);
            }
        });
        TestUtils.waitForCondition(() -> this.globalStreamThread.state() == GlobalStreamThread.State.DEAD, (long)10000L, (String)"GlobalStreamThread should have died.");
        this.globalStreamThread.join();
        MatcherAssert.assertThat((Object)globalStore.isOpen(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertFalse((boolean)new File(this.baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists());
    }

    private void initializeConsumer() {
        this.mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME, 0, null, new Node[0], new Node[0])));
        this.mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.assign(Collections.singleton(this.topicPartition));
    }

    private void startAndSwallowError() {
        try {
            this.globalStreamThread.start();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }
}

