/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Compressor;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FetcherTest {
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp = new TopicPartition(this.topicName, 0);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100L;
    private MockTime time = new MockTime(1L);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE);
    private MockClient client = new MockClient(this.time, this.metadata);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 1);
    private Node node = (Node)this.cluster.nodes().get(0);
    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
    private Metrics metrics = new Metrics((Time)this.time);
    private static final double EPSILON = 1.0E-4;
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient((KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000L);
    private MemoryRecords records = MemoryRecords.emptyRecords((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE);
    private MemoryRecords nextRecords = MemoryRecords.emptyRecords((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE);
    private Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, this.metrics);
    private Metrics fetcherMetrics = new Metrics((Time)this.time);
    private Fetcher<byte[], byte[]> fetcherNoAutoReset = this.createFetcher(this.subscriptionsNoAutoReset, this.fetcherMetrics);

    @Before
    public void setup() throws Exception {
        this.metadata.update(this.cluster, this.time.milliseconds());
        this.client.setNode(this.node);
        this.records.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
        this.records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
        this.records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
        this.records.close();
        this.nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
        this.nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
        this.nextRecords.close();
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.fetcherMetrics.close();
    }

    @Test
    public void testFetchNormal() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp));
        List records = (List)partitionRecords.get(this.tp);
        Assert.assertEquals((long)3L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp));
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assert.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testFetchError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertFalse((boolean)partitionRecords.containsKey(this.tp));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                FetchRequest fetch = new FetchRequest(request.request().body());
                return fetch.fetchData().containsKey(tp) && ((FetchRequest.PartitionData)fetch.fetchData().get((Object)tp)).offset == offset;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer(){
            int i = 0;

            public byte[] deserialize(String topic, byte[] data) {
                if (this.i++ == 1) {
                    throw new SerializationException();
                }
                return data;
            }
        };
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), (Deserializer)deserializer, (Deserializer)deserializer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp, 1L), this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
        fetcher.sendFetches();
        this.consumerClient.poll(0L);
        try {
            fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (SerializationException e) {
            Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp));
        }
    }

    @Test
    public void testParseInvalidRecord() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
        byte[] key = "foo".getBytes();
        byte[] value = "baz".getBytes();
        long offset = 0L;
        long timestamp = 500L;
        int size = Record.recordSize((byte[])key, (byte[])value);
        long crc = Record.computeChecksum((long)timestamp, (byte[])key, (byte[])value, (CompressionType)CompressionType.NONE, (int)0, (int)-1);
        compressor.putLong(offset);
        compressor.putInt(size);
        Record.write((Compressor)compressor, (long)crc, (byte)Record.computeAttributes((CompressionType)CompressionType.NONE), (long)timestamp, (byte[])key, (byte[])value, (int)0, (int)-1);
        compressor.putLong(offset);
        compressor.putInt(size);
        Record.write((Compressor)compressor, (long)(crc + 1L), (byte)Record.computeAttributes((CompressionType)CompressionType.NONE), (long)timestamp, (byte[])key, (byte[])value, (int)0, (int)-1);
        compressor.close();
        buffer.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
        }
    }

    @Test
    public void testFetchMaxPollRecords() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp, 1L), this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
        this.client.prepareResponse(this.matchesOffset(this.tp, 4L), this.fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
        fetcher.sendFetches();
        this.consumerClient.poll(0L);
        List records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)1L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)2L, (long)((ConsumerRecord)records.get(1)).offset());
        fetcher.sendFetches();
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)3L, (long)((ConsumerRecord)records.get(0)).offset());
        fetcher.sendFetches();
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)4L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)5L, (long)((ConsumerRecord)records.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        MemoryRecords records = MemoryRecords.emptyRecords((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE);
        records.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
        records.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
        records.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
        records.close();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        List consumerRecords = (List)this.fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)3L, (long)consumerRecords.size());
        Assert.assertEquals((long)31L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)15L, (long)((ConsumerRecord)consumerRecords.get(0)).offset());
        Assert.assertEquals((long)20L, (long)((ConsumerRecord)consumerRecords.get(1)).offset());
        Assert.assertEquals((long)30L, (long)((ConsumerRecord)consumerRecords.get(2)).offset());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have thrown");
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.subscriptions.pause(this.tp);
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.subscriptions.pause(this.tp);
        this.fetcher.sendFetches();
        Assert.assertTrue((boolean)this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals(null, (Object)this.subscriptions.position(this.tp));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.subscriptions.seek(this.tp, 1L);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 2L);
        Assert.assertEquals((long)0L, (long)this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        try {
            this.fetcherNoAutoReset.fetchedRecords();
            Assert.fail((String)"Should have thrown OffsetOutOfRangeException");
        }
        catch (OffsetOutOfRangeException e) {
            Assert.assertTrue((boolean)e.offsetOutOfRangePartitions().containsKey(this.tp));
            Assert.assertEquals((long)e.offsetOutOfRangePartitions().size(), (long)1L);
        }
        Assert.assertEquals((long)0L, (long)this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionToCommitted() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testGetAllTopics() {
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.NONE).toStruct());
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.client.prepareResponse(null, true);
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.NONE).toStruct());
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test(expected=TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.fetcher.getAllTopicMetadata(50L);
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED).toStruct());
        try {
            this.fetcher.getAllTopicMetadata(10L);
            Assert.fail();
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test(expected=InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
        this.fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(this.topicName)), 5000L);
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(this.topicName)), 5000L);
        Assert.assertNull(topicMetadata.get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
        this.client.prepareResponse(this.newMetadataResponse(this.topicName, Errors.NONE).toStruct());
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(this.topicName)), 5000L);
        Assert.assertTrue((boolean)topicMetadata.containsKey(this.topicName));
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        for (int i = 1; i < 4; ++i) {
            if (i > 1) {
                this.records = MemoryRecords.emptyRecords((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE);
                for (int v = 0; v < 3; ++v) {
                    this.records.append((long)i * 3L + (long)v, -1L, "key".getBytes(), String.format("value-%d", v).getBytes());
                }
                this.records.close();
            }
            this.fetcher.sendFetches();
            this.client.prepareResponse(this.fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
            this.consumerClient.poll(0L);
            List records = (List)this.fetcher.fetchedRecords().get(this.tp);
            Assert.assertEquals((long)3L, (long)records.size());
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("fetch-throttle-time-avg", this.metricGroup, ""));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("fetch-throttle-time-max", this.metricGroup, ""));
        Assert.assertEquals((double)200.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)300.0, (double)maxMetric.value(), (double)1.0E-4);
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), 100L);
            Assert.fail((String)"Should throw timeout exception.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        Assert.assertTrue((boolean)this.fetcher.getOffsetsByTimes(new HashMap(), 100L).isEmpty());
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    private void testGetOffsetsForTimesWithError(Errors errorForTp0, Errors errorForTp1, long offsetForTp0, long offsetForTp1, Long expectedOffsetForTp0, Long expectedOffsetForTp1) {
        this.client.reset();
        TopicPartition tp0 = this.tp;
        TopicPartition tp1 = new TopicPartition(this.topicName, 1);
        Cluster cluster = TestUtils.clusterWith(2, this.topicName, 2);
        this.metadata.update(cluster, this.time.milliseconds());
        this.client.prepareResponseFrom(this.listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom(this.listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        this.client.prepareResponseFrom(this.listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom(this.listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(tp0, 0L);
        timestampToSearch.put(tp1, 0L);
        Map offsetAndTimestampMap = this.fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
        if (expectedOffsetForTp0 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp0));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).offset());
        }
        if (expectedOffsetForTp1 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp1));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).offset());
        }
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(ClientRequest request) {
                ListOffsetRequest req = new ListOffsetRequest(request.request().body());
                return timestamp == (Long)req.partitionTimestamps().get(FetcherTest.this.tp);
            }
        };
    }

    private Struct listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp, error, timestamp, offset);
    }

    private Struct listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        allPartitionData.put(tp, partitionData);
        ListOffsetResponse response = new ListOffsetResponse(allPartitionData, 1);
        return response.toStruct();
    }

    private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
        FetchResponse response = new FetchResponse(Collections.singletonMap(this.tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime);
        return response.toStruct();
    }

    private MetadataResponse newMetadataResponse(String topic, Errors error) {
        ArrayList<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<MetadataResponse.PartitionMetadata>();
        if (error == Errors.NONE) {
            for (PartitionInfo partitionInfo : this.cluster.partitionsForTopic(topic)) {
                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas())));
            }
        }
        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
        return new MetadataResponse(this.cluster.nodes(), null, -1, Arrays.asList(topicMetadata));
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics, int maxPollRecords) {
        return this.createFetcher(subscriptions, metrics, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), maxPollRecords);
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
        return this.createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return this.createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords) {
        return new Fetcher(this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, maxPollRecords, true, keyDeserializer, valueDeserializer, this.metadata, subscriptions, metrics, "consumer" + this.groupId, (Time)this.time, this.retryBackoffMs);
    }
}

