/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class AbstractCoordinatorTest {
    private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
    private static final int REBALANCE_TIMEOUT_MS = 60000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final int HEARTBEAT_INTERVAL_MS = 3000;
    private static final int RETRY_BACKOFF_MS = 100;
    private static final int REQUEST_TIMEOUT_MS = 40000;
    private static final String GROUP_ID = "dummy-group";
    private static final String METRIC_GROUP_PREFIX = "consumer";
    private static final String PROTOCOL_TYPE = "dummy";
    private static final String PROTOCOL_NAME = "dummy-subprotocol";
    private Node node;
    private Metrics metrics;
    private MockTime mockTime;
    private Node coordinatorNode;
    private MockClient mockClient;
    private DummyCoordinator coordinator;
    private ConsumerNetworkClient consumerClient;
    private final String memberId = "memberId";
    private final String leaderId = "leaderId";
    private final int defaultGeneration = -1;

    private void setupCoordinator() {
        this.setupCoordinator(100, 60000, Optional.empty());
    }

    private void setupCoordinator(int retryBackoffMs) {
        this.setupCoordinator(retryBackoffMs, 60000, Optional.empty());
    }

    private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId) {
        LogContext logContext = new LogContext();
        this.mockTime = new MockTime();
        ConsumerMetadata metadata = new ConsumerMetadata((long)retryBackoffMs, 3600000L, false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext, new ClusterResourceListeners());
        this.mockClient = new MockClient((Time)this.mockTime, (Metadata)metadata);
        this.consumerClient = new ConsumerNetworkClient(logContext, (KafkaClient)this.mockClient, (Metadata)metadata, (Time)this.mockTime, (long)retryBackoffMs, 40000, 3000);
        this.metrics = new Metrics((Time)this.mockTime);
        this.mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
        this.node = (Node)metadata.fetch().nodes().get(0);
        this.coordinatorNode = new Node(Integer.MAX_VALUE - this.node.id(), this.node.host(), this.node.port());
        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(10000, rebalanceTimeoutMs, 3000, GROUP_ID, groupInstanceId, (long)retryBackoffMs, !groupInstanceId.isPresent());
        this.coordinator = new DummyCoordinator(rebalanceConfig, this.consumerClient, this.metrics, this.mockTime);
    }

    private void joinGroup() {
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        boolean generation = true;
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        this.coordinator.ensureActiveGroup();
    }

    @Test
    public void testMetrics() {
        this.setupCoordinator();
        Assert.assertNotNull((Object)this.getMetric("heartbeat-response-time-max"));
        Assert.assertNotNull((Object)this.getMetric("heartbeat-rate"));
        Assert.assertNotNull((Object)this.getMetric("heartbeat-total"));
        Assert.assertNotNull((Object)this.getMetric("last-heartbeat-seconds-ago"));
        Assert.assertNotNull((Object)this.getMetric("join-time-avg"));
        Assert.assertNotNull((Object)this.getMetric("join-time-max"));
        Assert.assertNotNull((Object)this.getMetric("join-rate"));
        Assert.assertNotNull((Object)this.getMetric("join-total"));
        Assert.assertNotNull((Object)this.getMetric("sync-time-avg"));
        Assert.assertNotNull((Object)this.getMetric("sync-time-max"));
        Assert.assertNotNull((Object)this.getMetric("sync-rate"));
        Assert.assertNotNull((Object)this.getMetric("sync-total"));
        Assert.assertNotNull((Object)this.getMetric("rebalance-latency-avg"));
        Assert.assertNotNull((Object)this.getMetric("rebalance-latency-max"));
        Assert.assertNotNull((Object)this.getMetric("rebalance-latency-total"));
        Assert.assertNotNull((Object)this.getMetric("rebalance-rate-per-hour"));
        Assert.assertNotNull((Object)this.getMetric("rebalance-total"));
        Assert.assertNotNull((Object)this.getMetric("last-rebalance-seconds-ago"));
        Assert.assertNotNull((Object)this.getMetric("failed-rebalance-rate-per-hour"));
        Assert.assertNotNull((Object)this.getMetric("failed-rebalance-total"));
        this.metrics.sensor("heartbeat-latency").record(1.0);
        this.metrics.sensor("heartbeat-latency").record(6.0);
        this.metrics.sensor("heartbeat-latency").record(2.0);
        Assert.assertEquals((Object)6.0, (Object)this.getMetric("heartbeat-response-time-max").metricValue());
        Assert.assertEquals((Object)0.1, (Object)this.getMetric("heartbeat-rate").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("heartbeat-total").metricValue());
        Assert.assertEquals((Object)-1.0, (Object)this.getMetric("last-heartbeat-seconds-ago").metricValue());
        this.coordinator.heartbeat().sentHeartbeat(this.mockTime.milliseconds());
        Assert.assertEquals((Object)0.0, (Object)this.getMetric("last-heartbeat-seconds-ago").metricValue());
        this.mockTime.sleep(10000L);
        Assert.assertEquals((Object)10.0, (Object)this.getMetric("last-heartbeat-seconds-ago").metricValue());
        this.metrics.sensor("join-latency").record(1.0);
        this.metrics.sensor("join-latency").record(6.0);
        this.metrics.sensor("join-latency").record(2.0);
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("join-time-avg").metricValue());
        Assert.assertEquals((Object)6.0, (Object)this.getMetric("join-time-max").metricValue());
        Assert.assertEquals((Object)0.1, (Object)this.getMetric("join-rate").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("join-total").metricValue());
        this.metrics.sensor("sync-latency").record(1.0);
        this.metrics.sensor("sync-latency").record(6.0);
        this.metrics.sensor("sync-latency").record(2.0);
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("sync-time-avg").metricValue());
        Assert.assertEquals((Object)6.0, (Object)this.getMetric("sync-time-max").metricValue());
        Assert.assertEquals((Object)0.1, (Object)this.getMetric("sync-rate").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("sync-total").metricValue());
        this.metrics.sensor("rebalance-latency").record(1.0);
        this.metrics.sensor("rebalance-latency").record(6.0);
        this.metrics.sensor("rebalance-latency").record(2.0);
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("rebalance-latency-avg").metricValue());
        Assert.assertEquals((Object)6.0, (Object)this.getMetric("rebalance-latency-max").metricValue());
        Assert.assertEquals((Object)9.0, (Object)this.getMetric("rebalance-latency-total").metricValue());
        Assert.assertEquals((Object)360.0, (Object)this.getMetric("rebalance-rate-per-hour").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("rebalance-total").metricValue());
        this.metrics.sensor("failed-rebalance").record(1.0);
        this.metrics.sensor("failed-rebalance").record(6.0);
        this.metrics.sensor("failed-rebalance").record(2.0);
        Assert.assertEquals((Object)360.0, (Object)this.getMetric("failed-rebalance-rate-per-hour").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("failed-rebalance-total").metricValue());
        Assert.assertEquals((Object)-1.0, (Object)this.getMetric("last-rebalance-seconds-ago").metricValue());
        this.coordinator.setLastRebalanceTime(this.mockTime.milliseconds());
        Assert.assertEquals((Object)0.0, (Object)this.getMetric("last-rebalance-seconds-ago").metricValue());
        this.mockTime.sleep(10000L);
        Assert.assertEquals((Object)10.0, (Object)this.getMetric("last-rebalance-seconds-ago").metricValue());
    }

    private KafkaMetric getMetric(String name) {
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(name, "consumer-coordinator-metrics"));
    }

    @Test
    public void testCoordinatorDiscoveryBackoff() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.blackout(this.coordinatorNode, 10L);
        long initialTime = this.mockTime.milliseconds();
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(Long.MAX_VALUE));
        long endTime = this.mockTime.milliseconds();
        Assert.assertTrue((endTime - initialTime >= 100L ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            Timer firstAttemptTimer = this.mockTime.timer(40000L);
            Future<Boolean> firstAttempt = executor.submit(() -> this.coordinator.joinGroupIfNeeded(firstAttemptTimer));
            this.mockTime.sleep(40000L);
            Assert.assertFalse((boolean)firstAttempt.get());
            Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests(this.coordinatorNode));
            this.mockClient.respond((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
            this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
            Timer secondAttemptTimer = this.mockTime.timer(40000L);
            Future<Boolean> secondAttempt = executor.submit(() -> this.coordinator.joinGroupIfNeeded(secondAttemptTimer));
            Assert.assertTrue((boolean)secondAttempt.get());
        }
        finally {
            executor.shutdownNow();
            executor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testGroupMaxSizeExceptionIsFatal() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.GROUP_MAX_SIZE_REACHED));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.GROUP_MAX_SIZE_REACHED.exception()));
        Assert.assertFalse((boolean)future.isRetriable());
    }

    @Test
    public void testJoinGroupRequestTimeout() {
        this.setupCoordinator(100, 60000, Optional.empty());
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        this.mockTime.sleep(40001L);
        Assert.assertFalse((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
        this.mockTime.sleep(25000L);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() {
        int rebalanceTimeoutMs = 30000;
        this.setupCoordinator(100, rebalanceTimeoutMs, Optional.empty());
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        long expectedRequestDeadline = this.mockTime.milliseconds() + 40000L;
        this.mockTime.sleep(rebalanceTimeoutMs + 5000 + 1);
        Assert.assertFalse((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
        this.mockTime.sleep(expectedRequestDeadline - this.mockTime.milliseconds() + 1L);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testJoinGroupRequestMaxTimeout() {
        this.setupCoordinator(100, Integer.MAX_VALUE, Optional.empty());
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        Assert.assertFalse((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
        this.mockTime.sleep(0x80000000L);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(0L)));
    }

    @Test
    public void testJoinGroupRequestWithMemberIdRequired() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.MEMBER_ID_REQUIRED));
        this.mockClient.prepareResponse(body -> {
            if (!(body instanceof JoinGroupRequest)) {
                return false;
            }
            JoinGroupRequest joinGroupRequest = (JoinGroupRequest)body;
            return joinGroupRequest.data().memberId().equals("memberId");
        }, (AbstractResponse)this.joinGroupResponse(Errors.UNKNOWN_MEMBER_ID));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertEquals((Object)Errors.MEMBER_ID_REQUIRED.message(), (Object)future.exception().getMessage());
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertTrue((boolean)this.coordinator.hasValidMemberId());
        Assert.assertTrue((boolean)this.coordinator.hasMatchingGenerationId(-1));
        future = this.coordinator.sendJoinGroupRequest();
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(60000L)));
    }

    @Test
    public void testJoinGroupRequestWithFencedInstanceIdException() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.FENCED_INSTANCE_ID));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertEquals((Object)Errors.FENCED_INSTANCE_ID.message(), (Object)future.exception().getMessage());
        Assert.assertFalse((boolean)future.isRetriable());
    }

    @Test
    public void testJoinGroupProtocolTypeAndName() {
        String wrongProtocolType = "wrong-type";
        String wrongProtocolName = "wrong-name";
        Assert.assertTrue((boolean)this.joinGroupWithProtocolTypeAndName(null, null, null));
        Assert.assertTrue((boolean)this.joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, PROTOCOL_NAME));
        Assert.assertThrows(InconsistentGroupProtocolException.class, () -> this.joinGroupWithProtocolTypeAndName("wrong", null, null));
        Assert.assertThrows(InconsistentGroupProtocolException.class, () -> this.joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, "wrong-type", PROTOCOL_NAME));
        Assert.assertThrows(InconsistentGroupProtocolException.class, () -> this.joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, "wrong-name"));
    }

    @Test
    public void testNoGenerationWillNotTriggerProtocolNameCheck() {
        String wrongProtocolName = "wrong-name";
        this.setupCoordinator();
        this.mockClient.reset();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse(body -> {
            if (!(body instanceof JoinGroupRequest)) {
                return false;
            }
            JoinGroupRequest joinGroupRequest = (JoinGroupRequest)body;
            return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
        }, (AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "memberid", Errors.NONE, PROTOCOL_TYPE));
        this.mockClient.prepareResponse(body -> {
            if (!(body instanceof SyncGroupRequest)) {
                return false;
            }
            this.coordinator.resetGenerationOnLeaveGroup();
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest)body;
            return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE) && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, "wrong-name"));
        this.coordinator.joinGroupIfNeeded(this.mockTime.timer(100L));
    }

    private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, String syncGroupResponseProtocolType, String syncGroupResponseProtocolName) {
        this.setupCoordinator();
        this.mockClient.reset();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse(body -> {
            if (!(body instanceof JoinGroupRequest)) {
                return false;
            }
            JoinGroupRequest joinGroupRequest = (JoinGroupRequest)body;
            return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
        }, (AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "memberid", Errors.NONE, joinGroupResponseProtocolType));
        this.mockClient.prepareResponse(body -> {
            if (!(body instanceof SyncGroupRequest)) {
                return false;
            }
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest)body;
            return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE) && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName));
        return this.coordinator.joinGroupIfNeeded(this.mockTime.timer(5000L));
    }

    @Test
    public void testSyncGroupRequestWithFencedInstanceIdException() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        int generation = -1;
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.FENCED_INSTANCE_ID));
        Assert.assertThrows(FencedInstanceIdException.class, () -> this.coordinator.ensureActiveGroup());
    }

    @Test
    public void testJoinGroupUnknownMemberResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        TestUtils.waitForCondition(() -> !this.mockClient.requests().isEmpty(), 2000L, "The join-group request was not sent");
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId, currGen.memberId + "-new", currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.joinGroupFollowerResponse(currGen.generationId + 1, "memberId", "", Errors.UNKNOWN_MEMBER_ID));
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.UNKNOWN_MEMBER_ID.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testSyncGroupUnknownMemberResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        TestUtils.waitForCondition(() -> {
            this.consumerClient.poll(this.mockTime.timer(40000L));
            return !this.mockClient.requests().isEmpty();
        }, 2000L, "The join-group request was not sent");
        this.mockClient.respond((AbstractResponse)this.joinGroupFollowerResponse(currGen.generationId, "memberId", "", Errors.NONE));
        Assert.assertTrue((boolean)this.mockClient.requests().isEmpty());
        TestUtils.waitForCondition(() -> {
            this.consumerClient.poll(this.mockTime.timer(40000L));
            return !this.mockClient.requests().isEmpty();
        }, 2000L, "The sync-group request was not sent");
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId, currGen.memberId + "-new", currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.syncGroupResponse(Errors.UNKNOWN_MEMBER_ID));
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.UNKNOWN_MEMBER_ID.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        TestUtils.waitForCondition(() -> {
            this.consumerClient.poll(this.mockTime.timer(40000L));
            return !this.mockClient.requests().isEmpty();
        }, 2000L, "The join-group request was not sent");
        this.mockClient.respond((AbstractResponse)this.joinGroupFollowerResponse(currGen.generationId, "memberId", "", Errors.NONE));
        Assert.assertTrue((boolean)this.mockClient.requests().isEmpty());
        TestUtils.waitForCondition(() -> {
            this.consumerClient.poll(this.mockTime.timer(40000L));
            return !this.mockClient.requests().isEmpty();
        }, 2000L, "The sync-group request was not sent");
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId, currGen.memberId + "-new", currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.syncGroupResponse(Errors.ILLEGAL_GENERATION));
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.ILLEGAL_GENERATION.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testHeartbeatIllegalGenerationResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(() -> !this.mockClient.requests().isEmpty(), 2000L, "The heartbeat request was not sent");
        Assert.assertTrue((boolean)this.coordinator.heartbeat().hasInflight());
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId + 1, currGen.memberId, currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.heartbeatResponse(Errors.ILLEGAL_GENERATION));
        TestUtils.waitForCondition(() -> {
            this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            return !this.coordinator.heartbeat().hasInflight();
        }, 2000L, "The heartbeat response was not received");
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testHeartbeatUnknownMemberResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(() -> !this.mockClient.requests().isEmpty(), 2000L, "The heartbeat request was not sent");
        Assert.assertTrue((boolean)this.coordinator.heartbeat().hasInflight());
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId, currGen.memberId + "-new", currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        TestUtils.waitForCondition(() -> {
            this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            return !this.coordinator.heartbeat().hasInflight();
        }, 2000L, "The heartbeat response was not received");
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(() -> !this.mockClient.requests().isEmpty(), 2000L, "The heartbeat request was not sent");
        Assert.assertTrue((boolean)this.coordinator.heartbeat().hasInflight());
        this.mockClient.respond((AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        this.coordinator.requestRejoin();
        TestUtils.waitForCondition(() -> {
            this.coordinator.ensureActiveGroup(new MockTime(1L).timer(100L));
            return !this.coordinator.heartbeat().hasInflight();
        }, 2000L, "The heartbeat response was not received");
        Assert.assertEquals((Object)AbstractCoordinator.Generation.NO_GENERATION, (Object)this.coordinator.generation());
        this.mockClient.respond((AbstractResponse)this.joinGroupFollowerResponse(currGen.generationId, "memberId", "", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((Object)currGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testHeartbeatInstanceFencedResponseWithOldGeneration() throws InterruptedException {
        this.setupCoordinator();
        this.joinGroup();
        AbstractCoordinator.Generation currGen = this.coordinator.generation();
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(() -> !this.mockClient.requests().isEmpty(), 2000L, "The heartbeat request was not sent");
        Assert.assertTrue((boolean)this.coordinator.heartbeat().hasInflight());
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(currGen.generationId, currGen.memberId + "-new", currGen.protocolName);
        this.coordinator.setNewGeneration(newGen);
        this.mockClient.respond((AbstractResponse)this.heartbeatResponse(Errors.FENCED_INSTANCE_ID));
        TestUtils.waitForCondition(() -> {
            this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            return !this.coordinator.heartbeat().hasInflight();
        }, 2000L, "The heartbeat response was not received");
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testHeartbeatRequestWithFencedInstanceIdException() throws InterruptedException {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        int generation = -1;
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.FENCED_INSTANCE_ID));
        try {
            this.coordinator.ensureActiveGroup();
            this.mockTime.sleep(3000L);
            long startMs = System.currentTimeMillis();
            while (System.currentTimeMillis() - startMs < 1000L) {
                Thread.sleep(10L);
                this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            }
            Assert.fail((String)"Expected pollHeartbeat to raise fenced instance id exception in 1 second");
        }
        catch (RuntimeException exception) {
            Assert.assertTrue((boolean)(exception instanceof FencedInstanceIdException));
        }
    }

    @Test
    public void testJoinGroupRequestWithGroupInstanceIdNotFound() {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(0L));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "memberId", "", Errors.UNKNOWN_MEMBER_ID));
        RequestFuture future = this.coordinator.sendJoinGroupRequest();
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.mockTime.timer(40000L)));
        Assert.assertEquals((Object)Errors.UNKNOWN_MEMBER_ID.message(), (Object)future.exception().getMessage());
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertTrue((boolean)this.coordinator.hasUnknownGeneration());
    }

    @Test
    public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
        this.checkLeaveGroupRequestSent(Optional.empty());
        this.checkLeaveGroupRequestSent(Optional.of("groupInstanceId"));
    }

    private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
        this.setupCoordinator(100, Integer.MAX_VALUE, groupInstanceId);
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        RuntimeException e = new RuntimeException();
        this.mockClient.prepareResponse(body -> {
            if (body instanceof LeaveGroupRequest) {
                throw e;
            }
            return false;
        }, (AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
        try {
            this.coordinator.ensureActiveGroup();
            this.coordinator.close();
            if (this.coordinator.isDynamicMember()) {
                Assert.fail((String)"Expected leavegroup to raise an error.");
            }
        }
        catch (RuntimeException exception) {
            if (this.coordinator.isDynamicMember()) {
                Assert.assertEquals((Object)exception, (Object)e);
            }
            Assert.fail((String)"Coordinator with group.instance.id set shouldn't send leave group request.");
        }
    }

    @Test
    public void testHandleNormalLeaveGroupResponse() {
        LeaveGroupResponseData.MemberResponse memberResponse = new LeaveGroupResponseData.MemberResponse().setMemberId("memberId").setErrorCode(Errors.NONE.code());
        LeaveGroupResponse response = this.leaveGroupResponse(Collections.singletonList(memberResponse));
        RequestFuture<Void> leaveGroupFuture = this.setupLeaveGroup(response);
        Assert.assertNotNull(leaveGroupFuture);
        Assert.assertTrue((boolean)leaveGroupFuture.succeeded());
    }

    @Test
    public void testHandleMultipleMembersLeaveGroupResponse() {
        LeaveGroupResponseData.MemberResponse memberResponse = new LeaveGroupResponseData.MemberResponse().setMemberId("memberId").setErrorCode(Errors.NONE.code());
        LeaveGroupResponse response = this.leaveGroupResponse(Arrays.asList(memberResponse, memberResponse));
        RequestFuture<Void> leaveGroupFuture = this.setupLeaveGroup(response);
        Assert.assertNotNull(leaveGroupFuture);
        Assert.assertTrue((boolean)(leaveGroupFuture.exception() instanceof IllegalStateException));
    }

    @Test
    public void testHandleLeaveGroupResponseWithEmptyMemberResponse() {
        LeaveGroupResponse response = this.leaveGroupResponse(Collections.emptyList());
        RequestFuture<Void> leaveGroupFuture = this.setupLeaveGroup(response);
        Assert.assertNotNull(leaveGroupFuture);
        Assert.assertTrue((boolean)leaveGroupFuture.succeeded());
    }

    @Test
    public void testHandleLeaveGroupResponseWithException() {
        LeaveGroupResponseData.MemberResponse memberResponse = new LeaveGroupResponseData.MemberResponse().setMemberId("memberId").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
        LeaveGroupResponse response = this.leaveGroupResponse(Collections.singletonList(memberResponse));
        RequestFuture<Void> leaveGroupFuture = this.setupLeaveGroup(response);
        Assert.assertNotNull(leaveGroupFuture);
        Assert.assertTrue((boolean)(leaveGroupFuture.exception() instanceof UnknownMemberIdException));
    }

    private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) {
        this.setupCoordinator(100, Integer.MAX_VALUE, Optional.empty());
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)leaveGroupResponse);
        this.coordinator.ensureActiveGroup();
        return this.coordinator.maybeLeaveGroup("test maybe leave group");
    }

    @Test
    public void testUncaughtExceptionInHeartbeatThread() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        RuntimeException e = new RuntimeException();
        this.mockClient.prepareResponse(body -> {
            if (body instanceof HeartbeatRequest) {
                throw e;
            }
            return false;
        }, (AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
        try {
            this.coordinator.ensureActiveGroup();
            this.mockTime.sleep(3000L);
            long startMs = System.currentTimeMillis();
            while (System.currentTimeMillis() - startMs < 1000L) {
                Thread.sleep(10L);
                this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
            }
            Assert.fail((String)"Expected pollHeartbeat to raise an error in 1 second");
        }
        catch (RuntimeException exception) {
            Assert.assertEquals((Object)exception, (Object)e);
        }
    }

    @Test
    public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
        int longRetryBackoffMs = 10000;
        this.setupCoordinator(10000);
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        this.coordinator.ensureActiveGroup();
        CountDownLatch heartbeatDone = new CountDownLatch(1);
        this.mockClient.prepareResponse(body -> {
            heartbeatDone.countDown();
            return body instanceof HeartbeatRequest;
        }, (AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.mockTime.sleep(3000L);
        this.coordinator.pollHeartbeat(this.mockTime.milliseconds());
        if (!heartbeatDone.await(1L, TimeUnit.SECONDS)) {
            Assert.fail((String)"Should have received a heartbeat request after calling pollHeartbeat");
        }
    }

    @Test
    public void testLookupCoordinator() {
        this.setupCoordinator();
        this.mockClient.blackout(this.node, 50L);
        RequestFuture noBrokersAvailableFuture = this.coordinator.lookupCoordinator();
        Assert.assertTrue((String)"Failed future expected", (boolean)noBrokersAvailableFuture.failed());
        this.mockTime.sleep(50L);
        RequestFuture future = this.coordinator.lookupCoordinator();
        Assert.assertFalse((String)"Request not sent", (boolean)future.isDone());
        Assert.assertSame((String)"New request sent while one is in progress", (Object)future, (Object)this.coordinator.lookupCoordinator());
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.mockTime.timer(Long.MAX_VALUE));
        Assert.assertNotSame((String)"New request not sent after previous completed", (Object)future, (Object)this.coordinator.lookupCoordinator());
    }

    @Test
    public void testWakeupAfterJoinGroupSent() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(AbstractRequest body) {
                ++this.invocations;
                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                if (isJoinGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isJoinGroupRequest;
            }
        }, (AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(AbstractRequest body) {
                ++this.invocations;
                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                if (isJoinGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isJoinGroupRequest;
            }
        }, (AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(this.mockTime.timer(0L));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupReceived() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(body -> {
            boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
            if (isJoinGroupRequest) {
                this.consumerClient.wakeup();
            }
            return isJoinGroupRequest;
        }, (AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse(body -> {
            boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
            if (isJoinGroupRequest) {
                this.consumerClient.wakeup();
            }
            return isJoinGroupRequest;
        }, (AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(this.mockTime.timer(0L));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupSent() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(AbstractRequest body) {
                ++this.invocations;
                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                if (isSyncGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isSyncGroupRequest;
            }
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(new MockClient.RequestMatcher(){
            private int invocations = 0;

            @Override
            public boolean matches(AbstractRequest body) {
                ++this.invocations;
                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                if (isSyncGroupRequest && this.invocations == 1) {
                    throw new WakeupException();
                }
                return isSyncGroupRequest;
            }
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.consumerClient.poll(this.mockTime.timer(0L));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupReceived() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(body -> {
            boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
            if (isSyncGroupRequest) {
                this.consumerClient.wakeup();
            }
            return isSyncGroupRequest;
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
        this.setupCoordinator();
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse(body -> {
            boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
            if (isSyncGroupRequest) {
                this.consumerClient.wakeup();
            }
            return isSyncGroupRequest;
        }, (AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testWakeupInOnJoinComplete() throws Exception {
        this.setupCoordinator();
        this.coordinator.wakeupOnJoinComplete = true;
        this.mockClient.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
        this.mockClient.prepareResponse((AbstractResponse)this.syncGroupResponse(Errors.NONE));
        AtomicBoolean heartbeatReceived = this.prepareFirstHeartbeat();
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Should have woken up from ensureActiveGroup()");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)0L, (long)this.coordinator.onJoinCompleteInvokes);
        Assert.assertFalse((boolean)heartbeatReceived.get());
        this.coordinator.wakeupOnJoinComplete = false;
        this.consumerClient.poll(this.mockTime.timer(0L));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinPrepareInvokes);
        Assert.assertEquals((long)1L, (long)this.coordinator.onJoinCompleteInvokes);
        this.awaitFirstHeartbeat(heartbeatReceived);
    }

    @Test
    public void testAuthenticationErrorInEnsureCoordinatorReady() {
        this.setupCoordinator();
        this.mockClient.createPendingAuthenticationError(this.node, 300L);
        try {
            this.coordinator.ensureCoordinatorReady(this.mockTime.timer(Long.MAX_VALUE));
            Assert.fail((String)"Expected an authentication error.");
        }
        catch (AuthenticationException authenticationException) {
            // empty catch block
        }
    }

    private AtomicBoolean prepareFirstHeartbeat() {
        AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
        this.mockClient.prepareResponse(body -> {
            boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
            if (isHeartbeatRequest) {
                heartbeatReceived.set(true);
            }
            return isHeartbeatRequest;
        }, (AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
        return heartbeatReceived;
    }

    private void awaitFirstHeartbeat(AtomicBoolean heartbeatReceived) throws Exception {
        this.mockTime.sleep(3000L);
        TestUtils.waitForCondition(heartbeatReceived::get, 3000L, "Should have received a heartbeat request after joining the group");
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
        return FindCoordinatorResponse.prepareResponse((Errors)error, (Node)node);
    }

    private HeartbeatResponse heartbeatResponse(Errors error) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
    }

    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
        return this.joinGroupFollowerResponse(generationId, memberId, leaderId, error, null);
    }

    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error, String protocolType) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(error.code()).setGenerationId(generationId).setProtocolType(protocolType).setProtocolName(PROTOCOL_NAME).setMemberId(memberId).setLeader(leaderId).setMembers(Collections.emptyList()));
    }

    private JoinGroupResponse joinGroupResponse(Errors error) {
        return this.joinGroupFollowerResponse(-1, "", "", error);
    }

    private SyncGroupResponse syncGroupResponse(Errors error) {
        return this.syncGroupResponse(error, null, null);
    }

    private SyncGroupResponse syncGroupResponse(Errors error, String protocolType, String protocolName) {
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(error.code()).setProtocolType(protocolType).setProtocolName(protocolName).setAssignment(new byte[0]));
    }

    private LeaveGroupResponse leaveGroupResponse(List<LeaveGroupResponseData.MemberResponse> members) {
        return new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(members));
    }

    public static class DummyCoordinator
    extends AbstractCoordinator {
        private int onJoinPrepareInvokes = 0;
        private int onJoinCompleteInvokes = 0;
        private boolean wakeupOnJoinComplete = false;

        DummyCoordinator(GroupRebalanceConfig rebalanceConfig, ConsumerNetworkClient client, Metrics metrics, Time time) {
            super(rebalanceConfig, new LogContext(), client, metrics, AbstractCoordinatorTest.METRIC_GROUP_PREFIX, time);
        }

        protected String protocolType() {
            return AbstractCoordinatorTest.PROTOCOL_TYPE;
        }

        protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
            return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(AbstractCoordinatorTest.PROTOCOL_NAME).setMetadata(EMPTY_DATA.array())).iterator());
        }

        protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
            HashMap<String, ByteBuffer> assignment = new HashMap<String, ByteBuffer>();
            for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
                assignment.put(member.memberId(), EMPTY_DATA);
            }
            return assignment;
        }

        protected void onJoinPrepare(int generation, String memberId) {
            ++this.onJoinPrepareInvokes;
        }

        protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
            if (this.wakeupOnJoinComplete) {
                throw new WakeupException();
            }
            ++this.onJoinCompleteInvokes;
        }
    }
}

