/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$;
import kafka.server.link.TopicType$;
import kafka.utils.Implicits;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.SeqOps;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0005\u0005}h\u0001\u0002\n\u0014\u0001aAQ!\b\u0001\u0005\u0002yAQ\u0001\t\u0001\u0005B\u0005BQ\u0001\u000b\u0001\u0005\u0002%BQa\u0016\u0001\u0005\u0002aCQA\u0018\u0001\u0005\u0002}CQ!\u001a\u0001\u0005\u0002\u0019DQ\u0001\u001c\u0001\u0005\n5D\u0011\"!\u000f\u0001#\u0003%I!a\u000f\t\u0013\u0005E\u0003!%A\u0005\n\u0005M\u0003bBA,\u0001\u0011\u0005\u0011\u0011\f\u0005\n\u0003\u0007\u0003\u0011\u0013!C\u0001\u0003\u000bC\u0011\"!#\u0001#\u0003%\t!a\u000f\t\u0013\u0005-\u0005!%A\u0005\u0002\u0005M\u0003bBAG\u0001\u0011%\u0011q\u0012\u0005\b\u0003[\u0003A\u0011BAX\u0011\u001d\t9\f\u0001C\u0005\u0003sCq!!8\u0001\t#\tyN\u0001\u0011CS\u0012L'/Z2uS>t\u0017\r\u001c'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$(B\u0001\u000b\u0016\u0003\u0011a\u0017N\\6\u000b\u0003Y\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00013A\u0011!dG\u0007\u0002'%\u0011Ad\u0005\u0002#\u0003\n\u001cHO]1di\u000ecWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u000e\u0001\u0003ei\u0017-\u001f2f+N,')\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6\u0015\u0003\t\u0002\"a\t\u0014\u000e\u0003\u0011R\u0011!J\u0001\u0006g\u000e\fG.Y\u0005\u0003O\u0011\u0012A!\u00168ji\u0006aC/Z:u\u0005&$\u0017N]3di&|g.\u00197MS:\\w+\u001b;i\u001fV$(m\\;oI\u000e{gN\\3di&|gn\u001d\u000b\u0004E):\u0004\"B\u0016\u0004\u0001\u0004a\u0013AB9v_J,X\u000e\u0005\u0002.i9\u0011aF\r\t\u0003_\u0011j\u0011\u0001\r\u0006\u0003c]\ta\u0001\u0010:p_Rt\u0014BA\u001a%\u0003\u0019\u0001&/\u001a3fM&\u0011QG\u000e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005M\"\u0003\"\u0002\u001d\u0004\u0001\u0004I\u0014aC2p_J$\u0017N\\1u_J\u0004\"a\t\u001e\n\u0005m\"#a\u0002\"p_2,\u0017M\u001c\u0015\u0005\u0007uZE\n\u0005\u0002?\u00136\tqH\u0003\u0002A\u0003\u0006A\u0001O]8wS\u0012,'O\u0003\u0002C\u0007\u00061\u0001/\u0019:b[NT!\u0001R#\u0002\u000f),\b/\u001b;fe*\u0011aiR\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002\u0011\u0006\u0019qN]4\n\u0005){$\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A'\"\u00039\u000bq\"\u00197m\u0007>l'-\u001b8bi&|gn\u001d\u0015\u0005\u0007A#V\u000b\u0005\u0002R%6\t\u0011)\u0003\u0002T\u0003\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0002-\u0006A3\u0010Z5ta2\f\u0017PT1nKvt\u0013/^8sk6l4\u0010M?/G>|'\u000fZ5oCR|'/P>2{\u0006yC/Z:u\u0005&$\u0017N]3di&|g.\u00197MS:\\w+\u001b;i\u001f:,7i\u001c8oK\u000e$\u0018n\u001c8J]&$\u0018.\u0019;peR\u0019!%\u0017.\t\u000b-\"\u0001\u0019\u0001\u0017\t\u000ba\"\u0001\u0019A\u001d)\t\u0011i4\n\u0018\u0017\u0002\u001b\"\"A\u0001\u0015+V\u0003\u0019\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5BkR|W*\u001b:s_JLgn\u001a\u000b\u0004E\u0001\f\u0007\"B\u0016\u0006\u0001\u0004a\u0003\"\u0002\u001d\u0006\u0001\u0004I\u0004\u0006B\u0003>\u0017\u000ed\u0013!\u0014\u0015\u0005\u000bA#V+\u0001\u001auKN$()\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6XSRDw.\u001e;J]\u000edW\u000fZ5oOJ+Wn\u001c;f\u001b&\u0014(o\u001c:t)\r\u0011s\r\u001b\u0005\u0006W\u0019\u0001\r\u0001\f\u0005\u0006q\u0019\u0001\r!\u000f\u0015\u0005\ruZ%\u000eL\u0001NQ\u00111\u0001\u000bV+\u0002/Y,'/\u001b4z\u0005&$\u0017N]3di&|g.\u00197MS:\\GC\u0002\u0012oof\fI\u0003C\u0003p\u000f\u0001\u0007\u0001/\u0001\ffCN$H*\u001b8l\u0007>tg.Z2uS>tWj\u001c3f!\t\tX/D\u0001s\u0015\t!2O\u0003\u0002u+\u000511/\u001a:wKJL!A\u001e:\u0003\u001d\r{gN\\3di&|g.T8eK\")\u0001p\u0002a\u0001a\u00061r/Z:u\u0019&t7nQ8o]\u0016\u001cG/[8o\u001b>$W\rC\u0004{\u000fA\u0005\t\u0019A>\u0002\u0015Q|\u0007/[2UsB,7\u000fE\u0002$yzL!! \u0013\u0003\r=\u0003H/[8o!\u0015y\u0018\u0011BA\b\u001d\u0011\t\t!!\u0002\u000f\u0007=\n\u0019!C\u0001&\u0013\r\t9\u0001J\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY!!\u0004\u0003\u0007M+\u0017OC\u0002\u0002\b\u0011\u0002B!!\u0005\u0002$9!\u00111CA\u0010\u001d\u0011\t)\"!\b\u000f\t\u0005]\u00111\u0004\b\u0004_\u0005e\u0011\"\u0001\f\n\u0005Q,\u0012B\u0001\u000bt\u0013\r\t\tC]\u0001\n)>\u0004\u0018n\u0019+za\u0016LA!!\n\u0002(\tIAk\u001c9jGRK\b/\u001a\u0006\u0004\u0003C\u0011\b\"CA\u0016\u000fA\u0005\t\u0019AA\u0017\u0003=\u0019wN\u001c4jO>3XM\u001d:jI\u0016\u001c\bCBA\u0018\u0003kaC&\u0004\u0002\u00022)\u0019\u00111\u0007\u0013\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u00028\u0005E\"aA'ba\u0006\tc/\u001a:jMf\u0014\u0015\u000eZ5sK\u000e$\u0018n\u001c8bY2Kgn\u001b\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011Q\b\u0016\u0004w\u0006}2FAA!!\u0011\t\u0019%!\u0014\u000e\u0005\u0005\u0015#\u0002BA$\u0003\u0013\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005-C%\u0001\u0006b]:|G/\u0019;j_:LA!a\u0014\u0002F\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002CY,'/\u001b4z\u0005&$\u0017N]3di&|g.\u00197MS:\\G\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u0005U#\u0006BA\u0017\u0003\u007f\t\u0011\u0002\\5oWB\u0013x\u000e]:\u0015\u0019\u0005m\u00131NA8\u0003s\ny(!!\u0011\t\u0005u\u0013qM\u0007\u0003\u0003?RA!!\u0019\u0002d\u0005!Q\u000f^5m\u0015\t\t)'\u0001\u0003kCZ\f\u0017\u0002BA5\u0003?\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019\tiG\u0003a\u0001a\u0006q1m\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0007bBA9\u0015\u0001\u0007\u00111O\u0001\u000ee\u0016lw\u000e^3DYV\u001cH/\u001a:\u0011\u0007i\t)(C\u0002\u0002xM\u0011ac\u00117vgR,'\u000fT5oWR+7\u000f\u001e%be:,7o\u001d\u0005\n\u0003wR\u0001\u0013!a\u0001\u0003{\nQbY8ogVlWM]$s_V\u0004\bcA\u0012}Y!9!P\u0003I\u0001\u0002\u0004Y\b\"CA\u0016\u0015A\u0005\t\u0019AA\u0017\u0003Ma\u0017N\\6Qe>\u00048\u000f\n3fM\u0006,H\u000e\u001e\u00134+\t\t9I\u000b\u0003\u0002~\u0005}\u0012a\u00057j].\u0004&o\u001c9tI\u0011,g-Y;mi\u0012\"\u0014a\u00057j].\u0004&o\u001c9tI\u0011,g-Y;mi\u0012*\u0014\u0001E<bSR4uN]'jeJ|'/\u001b8h)\u0015\u0011\u0013\u0011SAK\u0011\u001d\t\u0019J\u0004a\u0001\u0003g\n1\u0002Z3ti\u000ecWo\u001d;fe\"9\u0011q\u0013\bA\u0002\u0005e\u0015A\u00039beRLG/[8ogB)q0!\u0003\u0002\u001cB!\u0011QTAU\u001b\t\tyJ\u0003\u0003\u0002\"\u0006\r\u0016AB2p[6|gNC\u0002\u0017\u0003KS1!a*H\u0003\u0019\t\u0007/Y2iK&!\u00111VAP\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\fAB^3sS\u001aLX*\u001b:s_J$RAIAY\u0003kCq!a-\u0010\u0001\u0004\t\u0019(A\u0004dYV\u001cH/\u001a:\t\u000f\u0005]u\u00021\u0001\u0002\u001a\u0006\tb/\u001a:jMfd\u0015N\\6NKR\u0014\u0018nY:\u0015\u001f\t\nY,!2\u0002J\u00065\u0017\u0011[Ak\u00033Dq!!0\u0011\u0001\u0004\ty,\u0001\u0004mS:\\\u0017\n\u001a\t\u0005\u0003;\u000b\t-\u0003\u0003\u0002D\u0006}%\u0001B+vS\u0012Dq!a2\u0011\u0001\u0004\t\u0019(A\u0006fCN$8\t\\;ti\u0016\u0014\bbBAf!\u0001\u0007\u00111O\u0001\fo\u0016\u001cHo\u00117vgR,'\u000fC\u0004\u0002PB\u0001\r!a\u0017\u0002\u001b\u0015\f7\u000f\u001e'j].\u0004&o\u001c9t\u0011\u001d\t\u0019\u000e\u0005a\u0001\u00037\nQb^3ti2Kgn\u001b)s_B\u001c\bBBAl!\u0001\u0007A&A\u0005fCN$Hk\u001c9jG\"1\u00111\u001c\tA\u00021\n\u0011b^3tiR{\u0007/[2\u0002/\r\u0014X-\u0019;f\u0005&$\u0017N]3di&|g.\u00197MS:\\G\u0003DA`\u0003C\f)/a:\u0002j\u0006-\bBBAr#\u0001\u0007A&\u0001\u0005mS:\\g*Y7f\u0011\u001d\t9-\u0005a\u0001\u0003gBq!a3\u0012\u0001\u0004\t\u0019\bC\u0004\u0002PF\u0001\r!a\u0017\t\u000f\u0005M\u0017\u00031\u0001\u0002\\!2\u0001!a<L\u0003w\u0004B!!=\u0002x6\u0011\u00111\u001f\u0006\u0004\u0003k\u001c\u0015aA1qS&!\u0011\u0011`Az\u0005\r!\u0016mZ\u0011\u0003\u0003{\f1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
public class BidirectionalLinkIntegrationTest
extends AbstractClusterLinkIntegrationTest {
    @Override
    public void maybeUseBidirectionalLink() {
        this.useBidirectionalLink_$eq(true);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testBidirectionalLinkWithOutboundConnections(String quorum, boolean coordinator) {
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testBidirectionalLinkWithOneConnectionInitiator(String quorum, boolean coordinator) {
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Inbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testBidirectionalLinkWithAutoMirroring(String quorum, boolean coordinator) {
        Map configOverrides = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicFiltersProp()), (Object)this.includeAllTopicsFilter()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.max.age.ms"), (Object)"500")}));
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)configOverrides);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testBidirectionalLinkWithoutIncludingRemoteMirrors(String quorum, boolean coordinator) {
        Some topicTypes = new Some((Object)new .colon.colon((Object)TopicType$.MODULE$.LOCAL_MIRROR(), (List)Nil$.MODULE$));
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)topicTypes, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    private void verifyBidirectionalLink(ConnectionMode eastLinkConnectionMode, ConnectionMode westLinkConnectionMode, Option<scala.collection.immutable.Seq<Enumeration.Value>> topicTypes, Map<String, String> configOverrides) {
        boolean configuredToNotSyncRemoteMirrors;
        Assumptions.assumeTrue((boolean)this.clusterLinkPrefix().isEmpty());
        int numRecords = 20;
        ClusterLinkTestHarness eastCluster = this.destCluster();
        ClusterLinkTestHarness westCluster = this.sourceCluster();
        String eastTopic = "east.topic";
        String westTopic = "west.topic";
        String eastGroup = "east.group";
        String westGroup = "west.group";
        KafkaProducer eastProducer = eastCluster.createProducer(eastCluster.createProducer$default$1(), eastCluster.createProducer$default$2(), eastCluster.createProducer$default$3());
        KafkaProducer westProducer = westCluster.createProducer(westCluster.createProducer$default$1(), westCluster.createProducer$default$2(), westCluster.createProducer$default$3());
        westCluster.createTopic(westTopic, this.numPartitions(), this.replicationFactor(), westCluster.createTopic$default$4(), westCluster.createTopic$default$5(), westCluster.createTopic$default$6());
        eastCluster.createTopic(eastTopic, this.numPartitions(), this.replicationFactor(), eastCluster.createTopic$default$4(), eastCluster.createTopic$default$5(), eastCluster.createTopic$default$6());
        this.produceRecords(westProducer, westTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5());
        this.produceRecords(eastProducer, eastTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5());
        Properties eastLinkProps = this.linkProps(eastLinkConnectionMode, westCluster, (Option<String>)new Some((Object)westGroup), topicTypes, configOverrides);
        Properties westLinkProps = this.linkProps(westLinkConnectionMode, eastCluster, (Option<String>)new Some((Object)eastGroup), topicTypes, configOverrides);
        Uuid linkId = this.createBidirectionalLink(this.linkName(), eastCluster, westCluster, eastLinkProps, westLinkProps);
        if (!configOverrides.get((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).contains((Object)"true")) {
            eastCluster.linkTopic(westTopic, this.replicationFactor(), this.linkName(), eastCluster.linkTopic$default$4(), eastCluster.linkTopic$default$5());
            westCluster.linkTopic(eastTopic, this.replicationFactor(), this.linkName(), westCluster.linkTopic$default$4(), westCluster.linkTopic$default$5());
        }
        IndexedSeq westPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$verifyBidirectionalLink$1(westTopic, BoxesRunTime.unboxToInt((Object)i)));
        IndexedSeq eastPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$verifyBidirectionalLink$2(eastTopic, BoxesRunTime.unboxToInt((Object)i)));
        this.waitForMirroring(eastCluster, (scala.collection.immutable.Seq<TopicPartition>)westPartitions);
        this.waitForMirroring(westCluster, (scala.collection.immutable.Seq<TopicPartition>)eastPartitions);
        this.produceRecords(westProducer, westTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5());
        this.produceRecords(eastProducer, eastTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5());
        this.waitForMirroring(eastCluster, (scala.collection.immutable.Seq<TopicPartition>)westPartitions);
        this.waitForMirroring(westCluster, (scala.collection.immutable.Seq<TopicPartition>)eastPartitions);
        long offset = this.nextOffset(new TopicPartition(eastTopic, 0));
        this.commitOffsets(eastCluster, eastTopic, 0, offset, eastGroup);
        this.commitOffsets(eastCluster, westTopic, 0, offset, eastGroup);
        this.commitOffsets(westCluster, westTopic, 0, offset, westGroup);
        this.commitOffsets(westCluster, eastTopic, 0, offset, westGroup);
        this.verifyOffsetMigration(westTopic, 0, offset, westGroup, eastCluster);
        boolean bl = configuredToNotSyncRemoteMirrors = topicTypes.isDefined() && !((SeqOps)topicTypes.get()).contains((Object)TopicType$.MODULE$.REMOTE_MIRROR());
        if (configuredToNotSyncRemoteMirrors) {
            this.verifyOffsetMigration(eastTopic, 0, 0L, westGroup, eastCluster);
        } else {
            this.verifyOffsetMigration(eastTopic, 0, offset, westGroup, eastCluster);
        }
        this.verifyOffsetMigration(eastTopic, 0, offset, eastGroup, westCluster);
        if (configuredToNotSyncRemoteMirrors) {
            this.verifyOffsetMigration(westTopic, 0, 0L, eastGroup, westCluster);
        } else {
            this.verifyOffsetMigration(westTopic, 0, offset, eastGroup, westCluster);
        }
        this.verifyLinkMetrics(linkId, eastCluster, westCluster, eastLinkProps, westLinkProps, eastTopic, westTopic);
        eastCluster.unlinkTopic(westTopic, this.linkName(), eastCluster.unlinkTopic$default$3(), eastCluster.unlinkTopic$default$4(), eastCluster.unlinkTopic$default$5(), eastCluster.unlinkTopic$default$6());
        westCluster.unlinkTopic(eastTopic, this.linkName(), westCluster.unlinkTopic$default$3(), westCluster.unlinkTopic$default$4(), westCluster.unlinkTopic$default$5(), westCluster.unlinkTopic$default$6());
        this.verifyMirror(eastCluster, (scala.collection.immutable.Seq<TopicPartition>)westPartitions);
        this.verifyMirror(westCluster, (scala.collection.immutable.Seq<TopicPartition>)eastPartitions);
        eastCluster.deleteClusterLink(this.linkName(), eastCluster.deleteClusterLink$default$2(), eastCluster.deleteClusterLink$default$3());
        westCluster.deleteClusterLink(this.linkName(), westCluster.deleteClusterLink$default$2(), westCluster.deleteClusterLink$default$3());
    }

    private Option<scala.collection.immutable.Seq<Enumeration.Value>> verifyBidirectionalLink$default$3() {
        return None$.MODULE$;
    }

    private Map<String, String> verifyBidirectionalLink$default$4() {
        return (Map)Map$.MODULE$.empty();
    }

    public Properties linkProps(ConnectionMode connectionMode, ClusterLinkTestHarness remoteCluster, Option<String> consumerGroup, Option<scala.collection.immutable.Seq<Enumeration.Value>> topicTypes, Map<String, String> configOverrides) {
        Properties props = new Properties();
        ConnectionMode connectionMode2 = connectionMode;
        ConnectionMode.Outbound$ outbound$ = ConnectionMode.Outbound$.MODULE$;
        if (connectionMode2 != null && connectionMode2.equals(outbound$)) {
            String linkJaasConfig = this.createLinkCredentials(this.linkName(), remoteCluster, this.createLinkCredentials$default$3());
            props.put("bootstrap.servers", remoteCluster.bootstrapServers(remoteCluster.bootstrapServers$default$1()));
            new Implicits.PropertiesOps(props).$plus$plus$eq(remoteCluster.clientSecurityProps(this.linkName()));
            props.put("sasl.jaas.config", linkJaasConfig);
        }
        props.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "10000");
        props.put("metadata.max.age.ms", "2000");
        props.put("reconnect.backoff.max.ms", "1000");
        props.setProperty(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        props.setProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        consumerGroup.foreach((Function1 & Serializable)group -> {
            String string;
            props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
            if (topicTypes instanceof Some) {
                scala.collection.immutable.Seq tt = (scala.collection.immutable.Seq)((Some)topicTypes).value();
                string = this.consumerGroupFilter((String)group, (Seq<Enumeration.Value>)tt);
            } else if (None$.MODULE$.equals(topicTypes)) {
                string = this.consumerGroupFilter((String)group);
            } else {
                throw new MatchError((Object)topicTypes);
            }
            String groupFilter = string;
            props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), groupFilter);
            return props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        });
        new Implicits.PropertiesOps(props).$plus$plus$eq(configOverrides);
        return props;
    }

    public Option<String> linkProps$default$3() {
        return None$.MODULE$;
    }

    public Option<scala.collection.immutable.Seq<Enumeration.Value>> linkProps$default$4() {
        return None$.MODULE$;
    }

    public Map<String, String> linkProps$default$5() {
        return (Map)Map$.MODULE$.empty();
    }

    private void waitForMirroring(ClusterLinkTestHarness destCluster, scala.collection.immutable.Seq<TopicPartition> partitions) {
        scala.collection.immutable.Map offsetsByPartition = ((IterableOnceOps)partitions.map((Function1 & Serializable)tp -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)BoxesRunTime.boxToLong((long)this.nextOffset((TopicPartition)tp))))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        this.waitForMirrorPartitions((Seq<TopicPartition>)partitions, (Map<TopicPartition, Object>)offsetsByPartition, (Seq<KafkaBroker>)destCluster.brokers(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
    }

    private void verifyMirror(ClusterLinkTestHarness cluster, scala.collection.immutable.Seq<TopicPartition> partitions) {
        KafkaConsumer consumer = cluster.createConsumer(cluster.createConsumer$default$1(), cluster.createConsumer$default$2(), cluster.createConsumer$default$3(), cluster.createConsumer$default$4());
        consumer.assign((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(partitions).asJava());
        this.consumePartitionRecords(consumer, (Set<TopicPartition>)partitions.toSet(), this.clusterLinkPrefix(), ((TopicPartition)partitions.head()).topic(), this.consumePartitionRecords$default$5());
        consumer.close();
    }

    private void verifyLinkMetrics(Uuid linkId, ClusterLinkTestHarness eastCluster, ClusterLinkTestHarness westCluster, Properties eastLinkProps, Properties westLinkProps, String eastTopic, String westTopic) {
        block5: {
            ConnectionMode eastConnectionMode;
            block4: {
                new .colon.colon((Object)eastCluster, (List)new .colon.colon((Object)westCluster, (List)Nil$.MODULE$)).foreach((Function1 & Serializable)cluster -> {
                    BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$1(this, cluster);
                    return BoxedUnit.UNIT;
                });
                this.destCluster_$eq(eastCluster);
                this.sourceCluster_$eq(westCluster);
                this.verifyDestinationLinkMetrics(linkId, eastLinkProps, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, westTopic);
                this.sourceCluster_$eq(eastCluster);
                this.destCluster_$eq(westCluster);
                this.verifyDestinationLinkMetrics(linkId, westLinkProps, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, eastTopic);
                eastConnectionMode = ConnectionMode$.MODULE$.fromString(eastLinkProps.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
                ConnectionMode westConnectionMode = ConnectionMode$.MODULE$.fromString(westLinkProps.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
                ConnectionMode connectionMode = eastConnectionMode;
                ConnectionMode.Inbound$ inbound$ = ConnectionMode.Inbound$.MODULE$;
                if (connectionMode != null && connectionMode.equals(inbound$)) break block4;
                ConnectionMode connectionMode2 = westConnectionMode;
                ConnectionMode.Inbound$ inbound$2 = ConnectionMode.Inbound$.MODULE$;
                if (connectionMode2 == null) {
                    return;
                }
                if (!connectionMode2.equals(inbound$2)) break block5;
            }
            ConnectionMode connectionMode = eastConnectionMode;
            ConnectionMode.Outbound$ outbound$ = ConnectionMode.Outbound$.MODULE$;
            ConnectionMode connectionMode3 = eastConnectionMode;
            ConnectionMode.Inbound$ inbound$ = ConnectionMode.Inbound$.MODULE$;
            this.verifyReverseConnectionMetrics(this.linkName(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, connectionMode != null && connectionMode.equals(outbound$) ? eastCluster : westCluster, connectionMode3 != null && connectionMode3.equals(inbound$) ? eastCluster : westCluster);
            return;
        }
    }

    public Uuid createBidirectionalLink(String linkName, ClusterLinkTestHarness eastCluster, ClusterLinkTestHarness westCluster, Properties eastLinkProps, Properties westLinkProps) {
        Uuid linkId = eastCluster.createClusterLink(linkName, eastLinkProps, (Option<String>)new Some((Object)((KafkaBroker)westCluster.brokers().head()).clusterId()), true);
        Uuid secondLinkId = westCluster.createClusterLinkWithAllOptions(linkName, westLinkProps, (Option<String>)new Some((Object)((KafkaBroker)eastCluster.brokers().head()).clusterId()), true, (Option<Uuid>)new Some((Object)linkId), westCluster.createClusterLinkWithAllOptions$default$6());
        Assertions.assertEquals((Object)linkId, (Object)secondLinkId);
        return linkId;
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$1(String westTopic$1, int i) {
        return new TopicPartition(westTopic$1, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$2(String eastTopic$1, int i) {
        return new TopicPartition(eastTopic$1, i);
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$3(MetricName metricName) {
        return metricName.group().contains("cluster-link") && metricName.tags().containsKey("mode");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$4(MetricName x$1) {
        String string = x$1.name();
        String string2 = "active-link-count";
        return string == null || !string.equals(string2);
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$5(MetricName metricName) {
        Object v = metricName.tags().get("mode");
        String string = "bidirectional";
        return (v == null || !v.equals(string)) && metricName.tags().containsKey("link-id") && !metricName.name().startsWith("reverse-connection");
    }

    public static final /* synthetic */ void $anonfun$verifyLinkMetrics$1(BidirectionalLinkIntegrationTest $this, ClusterLinkTestHarness cluster) {
        $this.verifyLinkCountMetric(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, "active", cluster);
        $this.verifyActiveLinkCountMetric(cluster, ClusterLinkConfig.LinkMode.BIDIRECTIONAL);
        $this.verifyActiveLinkCountMetric(cluster, ClusterLinkConfig.LinkMode.SOURCE);
        Set nonBidirectionalModeMetrics = ((IterableOnceOps)cluster.aliveServers().flatMap((Function1 & Serializable)server -> ((IterableOnceOps)((IterableOps)((IterableOps)((IterableOps)CollectionConverters$.MODULE$.SetHasAsScala(server.metrics().metrics().keySet()).asScala().filter((Function1 & Serializable)metricName -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$3(metricName)))).filter((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$4(x$1)))).filter((Function1 & Serializable)metricName -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$5(metricName)))).map((Function1 & Serializable)metricName -> new StringBuilder(1).append(metricName.name()).append(":").append(metricName.tags()).toString())).toSet())).toSet();
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().empty(), (Object)nonBidirectionalModeMetrics);
    }

    public BidirectionalLinkIntegrationTest() {
        this.useSourceInitiatedLink_$eq(false);
        this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 0, 2));
        this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 100, 2));
    }
}

