/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.message.ReverseConnectionResponseData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SourceReverseConnectionManagerTest
implements ReverseNode.ReverseCallback {
    private final MockTime time = new MockTime();
    private NetworkClient networkClient;
    private Selector selector;
    private SourceReverseConnectionManager connManager;
    private Node remoteNode;
    private int metadataUpdatesRequested;

    @BeforeEach
    public void setUp() {
        Uuid linkId = Uuid.randomUuid();
        this.remoteNode = new Node(1, "localhost", 9092);
        this.networkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
        this.selector = (Selector)Mockito.mock(Selector.class);
        ManualMetadataUpdater metadataUpdater = new ManualMetadataUpdater(Collections.singletonList(this.remoteNode)){

            public long maybeUpdate(long now) {
                SourceReverseConnectionManagerTest.this.metadataUpdatesRequested++;
                return super.maybeUpdate(now);
            }
        };
        ReverseConnectionRequestData reversalData = new ReverseConnectionRequestData().setClusterLinkId(new Uuid(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits())).setSourceClusterId("sourceCluster").setTargetClusterId("targetCluster");
        this.connManager = new SourceReverseConnectionManager(this.networkClient, this.selector, (MetadataUpdater)metadataUpdater, linkId, reversalData, (ReverseNode.ReverseCallback)this, new LogContext());
    }

    @Test
    public void createReversibleConnection() {
        this.createReversibleConnection(123, this.remoteNode.id());
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
        ((NetworkClient)Mockito.verify((Object)this.networkClient, (VerificationMode)Mockito.times((int)1))).wakeup();
        Assertions.assertEquals((int)1, (int)this.connManager.reverseNodes().size());
    }

    @Test
    public void createMultipleReversibleConnectionsToSameBroker() {
        this.createReversibleConnection(12, this.remoteNode.id());
        this.createReversibleConnection(13, this.remoteNode.id());
        Assertions.assertEquals((int)2, (int)this.connManager.reverseNodes().size());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Integer[]{-1073741823, -1073741822}), this.connManager.reverseNodes().stream().map(Node::id).collect(Collectors.toSet()));
    }

    @Test
    public void reverseConnectionDisconnect() {
        ReverseNode node = this.createReversibleConnection(123, this.remoteNode.id());
        this.connManager.processDisconnection(node.idString());
        Assertions.assertEquals(Collections.emptyList(), (Object)this.connManager.reverseNodes());
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)node.future().get();
        });
        Assertions.assertEquals(NetworkException.class, e.getCause().getClass());
    }

    @Test
    public void reverseRequestBrokerNotAvailable() {
        Assertions.assertThrows(NetworkException.class, () -> this.connManager.createReversibleConnection(123, 5, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), KafkaPrincipal.ANONYMOUS, Optional.empty(), null, 0L));
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
    }

    @Test
    public void apiVersionsWithoutReversal() {
        ApiVersionsResponse response = new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()));
        this.connManager.handleApiVersionsResponse(this.remoteNode.idString(), response);
        Assertions.assertEquals(Collections.emptyList(), (Object)this.connManager.reverseNodes());
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
    }

    @Test
    public void reverseRequestNotSupported() {
        ReverseNode node = this.createReversibleConnection(123, this.remoteNode.id());
        ApiVersionsResponse response = new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()));
        Assertions.assertThrows(UnsupportedVersionException.class, () -> this.connManager.handleApiVersionsResponse(node.idString(), response));
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
    }

    @Test
    public void apiVersionsWithReversal() {
        ReverseNode node = this.createReversibleConnection(123, this.remoteNode.id());
        this.connManager.handleApiVersionsResponse(node.idString(), this.apiVersionsResponse());
        Assertions.assertEquals(Optional.of(123), (Object)((ReverseNode)this.connManager.reverseNodes().iterator().next()).requestId());
        long now = this.time.milliseconds();
        this.connManager.handleReverseConnectionsRequests(now);
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
        Assertions.assertEquals(Optional.of(123), (Object)((ReverseNode)this.connManager.reverseNodes().iterator().next()).requestId());
        ((NetworkClient)Mockito.verify((Object)this.networkClient, (VerificationMode)Mockito.times((int)1))).ready((Node)node, now);
    }

    @Test
    public void apiVersionsFailure() {
        ApiVersionsResponse response = new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        this.connManager.handleApiVersionsResponse(this.remoteNode.idString(), response);
        Assertions.assertEquals(Collections.emptyList(), (Object)this.connManager.reverseNodes());
        Assertions.assertEquals((int)0, (int)this.metadataUpdatesRequested);
    }

    @Test
    public void reverseConnectionResponse() {
        ReverseNode node = this.createReversibleConnection(123, this.remoteNode.id());
        this.connManager.handleApiVersionsResponse(node.idString(), this.apiVersionsResponse());
        ReverseConnectionResponse reverseResponse = new ReverseConnectionResponse(new ReverseConnectionResponseData().setErrorCode(Errors.NONE.code()));
        KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        Mockito.when((Object)this.selector.channel(node.idString())).thenReturn((Object)channel);
        this.connManager.handleReverseConnectionResponse(node.idString(), reverseResponse);
        ((Selector)Mockito.verify((Object)this.selector, (VerificationMode)Mockito.times((int)1))).removeChannelWithoutClosing(channel);
        Assertions.assertEquals(Collections.emptyList(), (Object)this.connManager.reverseNodes());
    }

    public void onReverseConnection(KafkaChannel channel, ReverseNode reverseNode) {
    }

    private ReverseNode createReversibleConnection(int requestId, int nodeId) {
        this.connManager.createReversibleConnection(requestId, nodeId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), KafkaPrincipal.ANONYMOUS, Optional.empty(), null, this.time.milliseconds());
        Optional<ReverseNode> reverseNode = this.connManager.reverseNodes().stream().filter(n -> n.requestId().equals(Optional.of(requestId))).findFirst();
        Assertions.assertTrue((boolean)reverseNode.isPresent());
        return reverseNode.get();
    }

    private ApiVersionsResponse apiVersionsResponse() {
        ApiVersionsResponseData.ApiVersion key = new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.REVERSE_CONNECTION.id).setMaxVersion((short)0);
        return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(new ApiVersionsResponseData.ApiVersionCollection(Collections.singleton(key).iterator())).setErrorCode(Errors.NONE.code()));
    }
}

