/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class InternalTopicManagerTest {
    private final Node broker1 = new Node(0, "dummyHost-1", 1234);
    private final Node broker2 = new Node(1, "dummyHost-2", 1234);
    private final List<Node> cluster = new ArrayList<Node>(2){
        {
            this.add(InternalTopicManagerTest.this.broker1);
            this.add(InternalTopicManagerTest.this.broker2);
        }
    };
    private final String topic = "test_topic";
    private final String topic2 = "test_topic_2";
    private final String topic3 = "test_topic_3";
    private final List<Node> singleReplica = Collections.singletonList(this.broker1);
    private String threadName;
    private MockAdminClient mockAdminClient;
    private InternalTopicManager internalTopicManager;
    private final Map<String, Object> config = new HashMap<String, Object>(){
        {
            this.put("application.id", "app-id");
            this.put("bootstrap.servers", InternalTopicManagerTest.this.broker1.host() + ":" + InternalTopicManagerTest.this.broker1.port());
            this.put("replication.factor", 1);
            this.put(StreamsConfig.producerPrefix((String)"batch.size"), 16384);
            this.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), 100);
            this.put("retry.backoff.ms", 50);
        }
    };

    @Before
    public void init() {
        this.threadName = Thread.currentThread().getName();
        this.mockAdminClient = new MockAdminClient(this.cluster, this.broker1);
        this.internalTopicManager = new InternalTopicManager(Time.SYSTEM, (Admin)this.mockAdminClient, new StreamsConfig(this.config));
    }

    @After
    public void shutdown() {
        this.mockAdminClient.close();
    }

    @Test
    public void shouldReturnCorrectPartitionCounts() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.singleReplica, Collections.emptyList())), null);
        Assert.assertEquals(Collections.singletonMap("test_topic", 1), (Object)this.internalTopicManager.getNumPartitions(Collections.singleton("test_topic"), Collections.emptySet()));
    }

    @Test
    public void shouldCreateRequiredTopics() throws Exception {
        RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        topicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        topicConfig2.setNumberOfPartitions(1);
        WindowedChangelogTopicConfig topicConfig3 = new WindowedChangelogTopicConfig("test_topic_3", Collections.emptyMap());
        topicConfig3.setNumberOfPartitions(1);
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", topicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_2", topicConfig2));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_3", topicConfig3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"test_topic", "test_topic_2", "test_topic_3"}), (Object)this.mockAdminClient.listTopics().names().get());
        Assert.assertEquals((Object)new TopicDescription("test_topic", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic")).values().get("test_topic")).get());
        Assert.assertEquals((Object)new TopicDescription("test_topic_2", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic_2")).values().get("test_topic_2")).get());
        Assert.assertEquals((Object)new TopicDescription("test_topic_3", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic_3")).values().get("test_topic_3")).get());
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        ConfigResource resource3 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_3");
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "delete"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource)).get()).get("cleanup.policy"));
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "compact"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2)).get()).get("cleanup.policy"));
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "compact,delete"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3)).get()).get("cleanup.policy"));
    }

    @Test
    public void shouldCompleteTopicValidationOnRetry() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl();
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionSuccessFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(partitionInfo), Collections.emptySet()));
        topicDescriptionFailFuture.completeExceptionally((Throwable)new UnknownTopicOrPartitionException("KABOOM!"));
        KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl();
        topicCreationFuture.completeExceptionally((Throwable)new TopicExistsException("KABOOM!"));
        EasyMock.expect((Object)admin.describeTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic", "test_topic_2"}))).andReturn((Object)new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessFuture), Utils.mkEntry((Object)"test_topic_2", (Object)topicDescriptionFailFuture)}))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.singleton(new NewTopic("test_topic_2", Optional.of(1), Optional.of((short)1)).configs(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"compact"), Utils.mkEntry((Object)"message.timestamp.type", (Object)"CreateTime")}))))).andReturn((Object)new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", topicCreationFuture))).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic_2"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic_2", topicDescriptionSuccessFuture)));
        EasyMock.replay((Object[])new Object[]{admin});
        UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("test_topic", Collections.emptyMap());
        topicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig topic2Config = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        topic2Config.setNumberOfPartitions(1);
        topicManager.makeReady(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicConfig), Utils.mkEntry((Object)"test_topic_2", (Object)topic2Config)}));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
        this.mockAdminClient.addTopic(false, "test_topic", (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
                this.add(new TopicPartitionInfo(1, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }, null);
        try {
            RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
            internalTopicConfig.setNumberOfPartitions(1);
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        InternalTopicManager internalTopicManager2 = new InternalTopicManager(Time.SYSTEM, (Admin)this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        internalTopicManager2.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    @Test
    public void shouldNotThrowExceptionForEmptyTopicMap() {
        this.internalTopicManager.makeReady(Collections.emptyMap());
    }

    @Test
    public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
        this.mockAdminClient.timeoutNextRequest(1);
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        try {
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
            Assert.fail((String)"Should have thrown StreamsException.");
        }
        catch (StreamsException expected) {
            Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
        }
    }

    @Test
    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        RepartitionTopicConfig internalTopicConfigII = new RepartitionTopicConfig("internal-topic", Collections.emptyMap());
        internalTopicConfigII.setNumberOfPartitions(1);
        HashMap<String, RepartitionTopicConfig> topicConfigMap = new HashMap<String, RepartitionTopicConfig>();
        topicConfigMap.put("test_topic", internalTopicConfig);
        topicConfigMap.put("internal-topic", internalTopicConfigII);
        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class);){
            this.internalTopicManager.makeReady(topicConfigMap);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)Matchers.hasItem((Object)("stream-thread [" + this.threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\nError message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")));
        }
    }

    @Test
    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl();
        topicDescriptionLeaderNotAvailableFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl();
        topicDescriptionUnknownTopicFuture.completeExceptionally((Throwable)new UnknownTopicOrPartitionException("Unknown Topic!"));
        KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl();
        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionLeaderNotAvailableFuture))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.emptySet())).andReturn((Object)new MockCreateTopicsResult(Collections.emptyMap())).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionUnknownTopicFuture))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.singleton(new NewTopic("test_topic", Optional.of(1), Optional.of((short)1)).configs(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"delete"), Utils.mkEntry((Object)"message.timestamp.type", (Object)"CreateTime"), Utils.mkEntry((Object)"segment.bytes", (Object)"52428800"), Utils.mkEntry((Object)"retention.ms", (Object)"-1")}))))).andReturn((Object)new MockCreateTopicsResult(Collections.singletonMap("test_topic", topicCreationFuture))).once();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl();
        topicDescriptionSuccessFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(partitionInfo), Collections.emptySet()));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionFailFuture))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.emptySet())).andReturn((Object)new MockCreateTopicsResult(Collections.emptyMap())).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionSuccessFuture))).once();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionFailFuture))).anyTimes();
        EasyMock.expect((Object)admin.createTopics(Collections.emptySet())).andReturn((Object)new MockCreateTopicsResult(Collections.emptyMap())).anyTimes();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.lambda$shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable$0(topicManager, (InternalTopicConfig)internalTopicConfig));
        Assert.assertNull((Object)exception.getCause());
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        this.mockAdminClient.markTopicForDeletion("test_topic");
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.lambda$shouldExhaustRetriesOnMarkedForDeletionTopic$1((InternalTopicConfig)internalTopicConfig));
        Assert.assertNull((Object)exception.getCause());
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }

    private /* synthetic */ void lambda$shouldExhaustRetriesOnMarkedForDeletionTopic$1(InternalTopicConfig internalTopicConfig) throws Throwable {
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    private /* synthetic */ void lambda$shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable$0(InternalTopicManager topicManager, InternalTopicConfig internalTopicConfig) throws Throwable {
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    private static class MockDescribeTopicsResult
    extends DescribeTopicsResult {
        MockDescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) {
            super(futures);
        }
    }

    private static class MockCreateTopicsResult
    extends CreateTopicsResult {
        MockCreateTopicsResult(Map<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>> futures) {
            super(futures);
        }
    }
}

