/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import kafka.controller.KafkaController;
import kafka.server.ConfigType$;
import kafka.server.ReplicaManager;
import kafka.server.link.AbstractClusterLinkMetadataManagerTest;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataManagerWithZkSupport;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Logging;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Set;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\u0005Ma\u0001B\u000b\u0017\u0001uAQA\t\u0001\u0005\u0002\rBq!\n\u0001C\u0002\u0013%a\u0005\u0003\u0004-\u0001\u0001\u0006Ia\n\u0005\b[\u0001\u0011\r\u0011\"\u0003/\u0011\u0019)\u0004\u0001)A\u0005_!9a\u0007\u0001b\u0001\n\u00139\u0004B\u0002 \u0001A\u0003%\u0001\bC\u0004@\u0001\t\u0007I\u0011\u0002!\t\r\u0015\u0003\u0001\u0015!\u0003B\u0011\u001d1\u0005A1A\u0005\n\u001dCa\u0001\u0016\u0001!\u0002\u0013A\u0005bB+\u0001\u0005\u0004%IA\u0016\u0005\u0007;\u0002\u0001\u000b\u0011B,\t\u000by\u0003A\u0011A0\t\u000bY\u0004A\u0011A<\t\u000bq\u0004A\u0011A<\t\r\u0005\r\u0001\u0001\"\u0001x\u0011\u0019\t9\u0001\u0001C\u0001o\"1\u00111\u0002\u0001\u0005\u0002]Da!a\u0004\u0001\t\u00039(aK\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014x+\u001b;i5.\u001cV\u000f\u001d9peR$Vm\u001d;\u000b\u0005]A\u0012\u0001\u00027j].T!!\u0007\u000e\u0002\rM,'O^3s\u0015\u0005Y\u0012!B6bM.\f7\u0001A\n\u0003\u0001y\u0001\"a\b\u0011\u000e\u0003YI!!\t\f\u0003M\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002IA\u0011q\u0004A\u0001\u000bG>tGO]8mY\u0016\u0014X#A\u0014\u0011\u0005!RS\"A\u0015\u000b\u0005\u0015R\u0012BA\u0016*\u0005=Y\u0015MZ6b\u0007>tGO]8mY\u0016\u0014\u0018aC2p]R\u0014x\u000e\u001c7fe\u0002\nQ\"\\3uC\u0012\fG/Y\"bG\",W#A\u0018\u0011\u0005A\u001aT\"A\u0019\u000b\u0005IB\u0012\u0001C7fi\u0006$\u0017\r^1\n\u0005Q\n$a\u0004.l\u001b\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0002\u001d5,G/\u00193bi\u0006\u001c\u0015m\u00195fA\u0005A!p[\"mS\u0016tG/F\u00019!\tID(D\u0001;\u0015\tY$$\u0001\u0002{W&\u0011QH\u000f\u0002\u000e\u0017\u000647.\u0019.l\u00072LWM\u001c;\u0002\u0013i\\7\t\\5f]R\u0004\u0013A\u0004:fa2L7-Y'b]\u0006<WM]\u000b\u0002\u0003B\u0011!iQ\u0007\u00021%\u0011A\t\u0007\u0002\u000f%\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s\u0003=\u0011X\r\u001d7jG\u0006l\u0015M\\1hKJ\u0004\u0013AC:feZ,'/\u00138g_V\t\u0001\n\u0005\u0002J%6\t!J\u0003\u0002L\u0019\u0006Q\u0011-\u001e;i_JL'0\u001a:\u000b\u0005ei%BA\u000eO\u0015\ty\u0005+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002#\u0006\u0019qN]4\n\u0005MS%\u0001F!vi\"|'/\u001b>feN+'O^3s\u0013:4w.A\u0006tKJ4XM]%oM>\u0004\u0013A\u00042s_.,'/\u00128ea>Lg\u000e^\u000b\u0002/B\u0011\u0001lW\u0007\u00023*\u0011!,T\u0001\u0007G>lWn\u001c8\n\u0005qK&\u0001C#oIB|\u0017N\u001c;\u0002\u001f\t\u0014xn[3s\u000b:$\u0007o\\5oi\u0002\nQa]3u+B$\"\u0001\u00194\u0011\u0005\u0005$W\"\u00012\u000b\u0003\r\fQa]2bY\u0006L!!\u001a2\u0003\tUs\u0017\u000e\u001e\u0005\u0006O:\u0001\r\u0001[\u0001\u0005S:4w\u000e\u0005\u0002ja6\t!N\u0003\u0002lY\u0006\u0019\u0011\r]5\u000b\u00055t\u0017a\u00026va&$XM\u001d\u0006\u0003_B\u000bQA[;oSRL!!\u001d6\u0003\u0011Q+7\u000f^%oM>D#AD:\u0011\u0005%$\u0018BA;k\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fR8x]R\t\u0001\r\u000b\u0002\u0010sB\u0011\u0011N_\u0005\u0003w*\u0014\u0011\"\u00114uKJ,\u0015m\u00195\u0002UQ,7\u000f^'fi\u0006$\u0017\r^1U_BL7m\u0011:fCRLwN\\,ji\"4\u0015-\u001b7fI\u0006#H/Z7qi\"\u0012\u0001C \t\u0003S~L1!!\u0001k\u0005\u0011!Vm\u001d;\u0002cQ,7\u000f^'fi\u0006$\u0017\r^1U_BL7m\u0011:fCRLwN\\,ji\"$v\u000e]5d\u000bbL7\u000f^:Fq\u000e,\u0007\u000f^5p]\"\u0012\u0011C`\u0001$i\u0016\u001cH\u000fU1si&$\u0018n\u001c8FY\u0016\u001cG/[8o\u0003:$'+Z:jO:\fG/[8oQ\t\u0011b0A\u0013uKN$x)\u001a;U_BL7mQ8oM&<g)\u00197mg\n\u000b7m\u001b+p5.\u001cE.[3oi\"\u00121C`\u0001%i\u0016\u001cHoR3u)>\u0004\u0018nY\"p]\u001aLw-V:fgJ+\u0007\u000f\\5dC6\u000bg.Y4fe\"\u0012AC ")
public class ClusterLinkMetadataManagerWithZkSupportTest
extends AbstractClusterLinkMetadataManagerTest {
    private final KafkaController controller = (KafkaController)Mockito.mock(KafkaController.class);
    private final ZkMetadataCache metadataCache = (ZkMetadataCache)Mockito.mock(ZkMetadataCache.class);
    private final KafkaZkClient zkClient = (KafkaZkClient)Mockito.mock(KafkaZkClient.class);
    private final ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
    private final AuthorizerServerInfo serverInfo = (AuthorizerServerInfo)Mockito.mock(AuthorizerServerInfo.class);
    private final Endpoint brokerEndpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 1234);

    private KafkaController controller() {
        return this.controller;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private AuthorizerServerInfo serverInfo() {
        return this.serverInfo;
    }

    private Endpoint brokerEndpoint() {
        return this.brokerEndpoint;
    }

    @BeforeEach
    public void setUp(TestInfo info) {
        Mockito.reset((Object[])new Object[]{this.destAdmin(), this.metadataCache(), this.zkClient(), this.replicaManager(), this.serverInfo()});
        Mockito.when((Object)this.serverInfo().interBrokerEndpoint()).thenReturn((Object)this.brokerEndpoint());
        Mockito.when((Object)this.zkClient().getChildren(ArgumentMatchers.anyString())).thenReturn((Object)package$.MODULE$.Seq().empty());
        Mockito.when((Object)this.zkClient().getClusterLinks((scala.collection.immutable.Set)ArgumentMatchers.any())).thenReturn((Object)Predef$.MODULE$.Map().empty());
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        if (info.getDisplayName().startsWith("testMetadataTopicCreation")) {
            this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), null, this.controller(), this.zkClient(), (Function0 & Serializable)() -> this.destAdmin(), this.replicaManager(), this.serverInfo(), (Option)None$.MODULE$));
            return;
        }
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)50)));
        this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), null, this.controller(), this.zkClient(), (Function0 & Serializable)() -> this.destAdmin(), this.replicaManager(), this.serverInfo(), (Option)None$.MODULE$));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
    }

    @AfterEach
    public void tearDown() {
        if (this.metadataManager() != null) {
            this.metadataManager().shutdown();
        }
        this.metadataManager_$eq(null);
    }

    @Test
    public void testMetadataTopicCreationWithFailedAttempt() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$);
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicAuthorizationException("")))).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)None$.MODULE$));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
        ((Admin)Mockito.verify((Object)this.destAdmin(), (VerificationMode)Mockito.times((int)2))).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testMetadataTopicCreationWithTopicExistsException() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$, (Object[])new Option[]{new Some((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions())))});
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicExistsException(""))));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
        ((Admin)Mockito.verify((Object)this.destAdmin())).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testPartitionElectionAndResignation() {
        String clusterLinkName = "testLink";
        int partition = (Utils.murmur2((byte[])clusterLinkName.getBytes()) & Integer.MAX_VALUE) % Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions());
        this.metadataManager().onElection(partition, 10);
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator("testLink2"), (String)"Broker is leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)9)));
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)11)));
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 9);
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 12);
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)None$.MODULE$);
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
    }

    @Test
    public void testGetTopicConfigFallsBackToZkClient() {
        Mockito.reset((Object[])new Logging[]{this.zkClient(), this.replicaManager()});
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Mockito.when((Object)this.zkClient().getEntityConfigs(ConfigType$.MODULE$.Topic(), topic)).thenReturn((Object)expected);
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)None$.MODULE$);
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }

    @Test
    public void testGetTopicConfigUsesReplicaManager() {
        Mockito.reset((Object[])new Logging[]{this.zkClient(), this.replicaManager()});
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Properties allProperties = new Properties();
        allProperties.putAll((Map<?, ?>)expected);
        allProperties.put("min.insync.replicas", "1");
        LogConfig logConfig = new LogConfig((Map)allProperties, CollectionConverters$.MODULE$.SetHasAsJava((Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{messageTimestampTypeKey}))).asJava());
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)new Some((Object)logConfig));
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }
}

