/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class FetchCollectorTest {
    private static final int DEFAULT_RECORD_COUNT = 10;
    private static final int DEFAULT_MAX_POLL_RECORDS = 500;
    private static final long PRODUCER_ID = 100L;
    private final Time time = new MockTime(0L, 0L, 0L);
    private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
    private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
    private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
    private final Set<TopicPartition> allPartitions = FetchCollectorTest.partitions(this.topicAPartition0, this.topicAPartition1, this.topicAPartition2);
    private final LogContext logContext = new LogContext();
    private SubscriptionState subscriptions;
    private FetchConfig fetchConfig;
    private FetchMetricsManager metricsManager;
    private ConsumerMetadata metadata;
    private FetchBuffer fetchBuffer;
    private Deserializers<String, String> deserializers;
    private FetchCollector<String, String> fetchCollector;
    private CompletedFetchBuilder completedFetchBuilder;

    @Test
    public void testFetchNormal() {
        int recordCount = 500;
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        OffsetAndMetadata nextOffsetAndMetadata = new OffsetAndMetadata((long)recordCount, Optional.empty(), "");
        CompletedFetch completedFetch = this.completedFetchBuilder.recordCount(recordCount).build();
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertFalse((boolean)completedFetch.isInitialized());
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)recordCount, (int)fetch.numRecords());
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)nextOffsetAndMetadata, fetch.nextOffsets().get(this.topicAPartition0));
        Assertions.assertTrue((boolean)completedFetch.isInitialized());
        Assertions.assertFalse((boolean)completedFetch.isConsumed());
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertNull((Object)this.fetchBuffer.peek());
        Assertions.assertNull((Object)this.fetchBuffer.poll());
        Assertions.assertNotNull((Object)this.fetchBuffer.nextInLineFetch());
        SubscriptionState.FetchPosition position = this.subscriptions.position(this.topicAPartition0);
        Assertions.assertEquals((long)recordCount, (long)position.offset);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)nextOffsetAndMetadata, fetch.nextOffsets().get(this.topicAPartition0));
        Assertions.assertTrue((boolean)completedFetch.isConsumed());
    }

    @Test
    public void testFetchWithReadReplica() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        int preferredReadReplicaId = 67;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)10, (int)fetch.numRecords());
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)new OffsetAndMetadata(10L, Optional.empty(), ""), fetch.nextOffsets().get(this.topicAPartition0));
    }

    @Test
    public void testNoResultsIfInitializing() {
        this.buildDependencies();
        this.assign(this.topicAPartition0);
        Assertions.assertNull((Object)this.subscriptions.position(this.topicAPartition0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.topicAPartition0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.topicAPartition0));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
    }

    @ParameterizedTest
    @MethodSource(value={"testErrorInInitializeSource"})
    public void testErrorInInitialize(int recordCount, final RuntimeException expectedException) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        this.fetchCollector = new FetchCollector<String, String>(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers, this.metricsManager, this.time){

            protected CompletedFetch initialize(CompletedFetch completedFetch) {
                throw expectedException;
            }
        };
        CompletedFetch completedFetch = this.completedFetchBuilder.recordCount(recordCount).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertThrows(expectedException.getClass(), () -> this.fetchCollector.collectFetch(this.fetchBuffer));
        Assertions.assertEquals((Object)(recordCount == 0 ? 1 : 0), (Object)this.fetchBuffer.isEmpty());
    }

    @Test
    public void testFetchingPausedPartitionsYieldsNoRecords() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        Assertions.assertFalse((boolean)this.subscriptions.isPaused(this.topicAPartition0));
        this.subscriptions.pause(this.topicAPartition0);
        Assertions.assertTrue((boolean)this.subscriptions.isPaused(this.topicAPartition0));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.setNextInLineFetch(completedFetch);
        Assertions.assertSame((Object)this.fetchBuffer.nextInLineFetch(), (Object)completedFetch);
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertTrue((boolean)this.subscriptions.isPaused(completedFetch.partition));
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertNull((Object)this.fetchBuffer.nextInLineFetch());
    }

    @ParameterizedTest
    @MethodSource(value={"testFetchWithMetadataRefreshErrorsSource"})
    public void testFetchWithMetadataRefreshErrors(Errors error) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(error).build();
        this.fetchBuffer.add(completedFetch);
        int preferredReadReplicaId = 5;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        Assertions.assertEquals(Optional.empty(), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
    }

    @Test
    public void testFetchWithOffsetOutOfRange() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)10, (int)fetch.numRecords());
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)new OffsetAndMetadata(10L, Optional.empty(), ""), fetch.nextOffsets().get(this.topicAPartition0));
        completedFetch = this.completedFetchBuilder.fetchOffset(fetch.numRecords()).error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertTrue((boolean)fetch.isEmpty());
        completedFetch = this.completedFetchBuilder.fetchOffset(fetch.numRecords()).error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithOffsetOutOfRangeWithPreferredReadReplica() {
        int records = 10;
        this.buildDependencies(records);
        this.assignAndSeek(this.topicAPartition0);
        int preferredReadReplicaId = 67;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertEquals(Optional.empty(), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
    }

    @Test
    public void testFetchWithTopicAuthorizationFailed() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.TOPIC_AUTHORIZATION_FAILED).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(TopicAuthorizationException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    @Test
    public void testFetchWithUnknownLeaderEpoch() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_LEADER_EPOCH).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithUnknownServerError() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_SERVER_ERROR).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithCorruptMessage() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.CORRUPT_MESSAGE).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(KafkaException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    @ParameterizedTest
    @MethodSource(value={"testFetchWithOtherErrorsSource"})
    public void testFetchWithOtherErrors(Errors error) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(error).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(IllegalStateException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    @Test
    public void testCollectFetchInitializationWithNullPosition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn(null);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        Records records = this.createRecords();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records);
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationWithUpdateHighWatermarkOnNotAssignedPartition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        long fetchOffset = 42L;
        long highWatermark = 1000L;
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn((Object)new SubscriptionState.FetchPosition(42L));
        Mockito.when((Object)subscriptions.tryUpdatingHighWatermark(topicPartition0, 1000L)).thenReturn((Object)false);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        Records records = this.createRecords();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records);
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).fetchOffset(42L).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationWithUpdateLogStartOffsetOnNotAssignedPartition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        long fetchOffset = 42L;
        long highWatermark = 1000L;
        long logStartOffset = 10L;
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn((Object)new SubscriptionState.FetchPosition(42L));
        Mockito.when((Object)subscriptions.tryUpdatingHighWatermark(topicPartition0, 1000L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingLogStartOffset(topicPartition0, 10L)).thenReturn((Object)false);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        Records records = this.createRecords();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records).setLogStartOffset(10L);
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).fetchOffset(42L).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationWithUpdateLastStableOffsetOnNotAssignedPartition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        long fetchOffset = 42L;
        long highWatermark = 1000L;
        long logStartOffset = 10L;
        long lastStableOffset = 900L;
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn((Object)new SubscriptionState.FetchPosition(42L));
        Mockito.when((Object)subscriptions.tryUpdatingHighWatermark(topicPartition0, 1000L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingLogStartOffset(topicPartition0, 10L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingLastStableOffset(topicPartition0, 900L)).thenReturn((Object)false);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        Records records = this.createRecords();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records).setLogStartOffset(10L).setLastStableOffset(900L);
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).fetchOffset(42L).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationWithUpdatePreferredReplicaOnNotAssignedPartition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        long fetchOffset = 42L;
        long highWatermark = 1000L;
        long logStartOffset = 10L;
        long lastStableOffset = 900L;
        int preferredReadReplicaId = 21;
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn((Object)new SubscriptionState.FetchPosition(42L));
        Mockito.when((Object)subscriptions.tryUpdatingHighWatermark(topicPartition0, 1000L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingLogStartOffset(topicPartition0, 10L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingLastStableOffset(topicPartition0, 900L)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.tryUpdatingPreferredReadReplica((TopicPartition)ArgumentMatchers.eq((Object)topicPartition0), ArgumentMatchers.eq((int)21), (LongSupplier)ArgumentMatchers.any())).thenReturn((Object)false);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        Records records = this.createRecords();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records).setLogStartOffset(10L).setLastStableOffset(900L).setPreferredReadReplica(21);
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).fetchOffset(42L).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationOffsetOutOfRangeErrorWithNullPosition() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn(null);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code());
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testCollectFetchInitializationOffsetOutOfRangeErrorWithOffsetReset() {
        TopicPartition topicPartition0 = new TopicPartition("topic", 0);
        long fetchOffset = 42L;
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.hasValidPosition(topicPartition0)).thenReturn((Object)true);
        Mockito.when((Object)subscriptions.positionOrNull(topicPartition0)).thenReturn((Object)new SubscriptionState.FetchPosition(42L));
        Mockito.when((Object)subscriptions.hasDefaultOffsetResetPolicy()).thenReturn((Object)true);
        FetchCollector<String, String> fetchCollector = this.createFetchCollector(subscriptions);
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(topicPartition0.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code());
        CompletedFetch completedFetch = new CompletedFetchBuilder().partitionData(partitionData).partition(topicPartition0).fetchOffset(42L).build();
        FetchBuffer fetchBuffer = (FetchBuffer)Mockito.mock(FetchBuffer.class);
        Mockito.when((Object)fetchBuffer.nextInLineFetch()).thenReturn(null);
        Mockito.when((Object)fetchBuffer.peek()).thenReturn((Object)completedFetch).thenReturn(null);
        Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.nextOffsets().size());
        ((SubscriptionState)Mockito.verify((Object)subscriptions)).requestOffsetResetIfPartitionAssigned(topicPartition0);
        ((FetchBuffer)Mockito.verify((Object)fetchBuffer)).setNextInLineFetch(null);
    }

    @Test
    public void testReadCommittedWithAbortedTransaction() {
        this.buildDependencies(IsolationLevel.READ_COMMITTED);
        int recordCount = 20;
        this.assignAndSeek(this.topicAPartition0);
        Records rawRecords = this.createTransactionalRecords(ControlRecordType.ABORT, true, 0, recordCount);
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)rawRecords).setAbortedTransactions(this.createAbortedTransaction(0L));
        CompletedFetch completedFetch1 = this.completedFetchBuilder.partitionData(partitionData).build();
        this.fetchBuffer.add(completedFetch1);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)new OffsetAndMetadata((long)(recordCount + 1), Optional.of(0), ""), fetch.nextOffsets().get(this.topicAPartition0));
        int startOffset = recordCount + 1;
        rawRecords = this.createTransactionalRecords(ControlRecordType.ABORT, false, startOffset, recordCount);
        partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)rawRecords).setAbortedTransactions(this.createAbortedTransaction(startOffset));
        CompletedFetch completedFetch2 = this.completedFetchBuilder.partitionData(partitionData).fetchOffset(startOffset).build();
        this.fetchBuffer.add(completedFetch2);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)recordCount, (int)fetch.numRecords());
        Assertions.assertEquals((int)1, (int)fetch.nextOffsets().size());
        Assertions.assertEquals((Object)new OffsetAndMetadata((long)(startOffset + recordCount + 1), Optional.of(0), ""), fetch.nextOffsets().get(this.topicAPartition0));
    }

    private List<FetchResponseData.AbortedTransaction> createAbortedTransaction(long firstOffset) {
        FetchResponseData.AbortedTransaction abortedTransaction = new FetchResponseData.AbortedTransaction();
        abortedTransaction.setFirstOffset(firstOffset);
        abortedTransaction.setProducerId(100L);
        return Collections.singletonList(abortedTransaction);
    }

    private FetchCollector<String, String> createFetchCollector(SubscriptionState subscriptions) {
        Properties consumerProps = this.consumerProps();
        return new FetchCollector(this.logContext, (ConsumerMetadata)Mockito.mock(ConsumerMetadata.class), subscriptions, new FetchConfig(new ConsumerConfig(consumerProps)), new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer()), (FetchMetricsManager)Mockito.mock(FetchMetricsManager.class), (Time)new MockTime());
    }

    private static Set<TopicPartition> partitions(TopicPartition ... partitions) {
        return new HashSet<TopicPartition>(Arrays.asList(partitions));
    }

    private void buildDependencies() {
        this.buildDependencies(500, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildDependencies(IsolationLevel isolationLevel) {
        this.buildDependencies(500, isolationLevel);
    }

    private void buildDependencies(int maxPollRecords) {
        this.buildDependencies(maxPollRecords, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildDependencies(int maxPollRecords, IsolationLevel isolationLevel) {
        Properties p = this.consumerProperties(maxPollRecords);
        ConsumerConfig config = new ConsumerConfig(p);
        this.deserializers = new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        this.subscriptions = ConsumerUtils.createSubscriptionState((ConsumerConfig)config, (LogContext)this.logContext);
        this.fetchConfig = this.createFetchConfig(config, isolationLevel);
        Metrics metrics = ConsumerUtils.createMetrics((ConsumerConfig)config, (Time)this.time);
        this.metricsManager = ConsumerUtils.createFetchMetricsManager((Metrics)metrics);
        this.metadata = new ConsumerMetadata(0L, 1000L, 10000L, false, false, this.subscriptions, this.logContext, new ClusterResourceListeners());
        this.fetchCollector = new FetchCollector(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers, this.metricsManager, this.time);
        this.fetchBuffer = new FetchBuffer(this.logContext);
        this.completedFetchBuilder = new CompletedFetchBuilder();
    }

    private FetchConfig createFetchConfig(ConsumerConfig config, IsolationLevel isolationLevel) {
        return new FetchConfig(config.getInt("fetch.min.bytes").intValue(), config.getInt("fetch.max.bytes").intValue(), config.getInt("fetch.max.wait.ms").intValue(), config.getInt("max.partition.fetch.bytes").intValue(), config.getInt("max.poll.records").intValue(), config.getBoolean("check.crcs").booleanValue(), config.getString("client.rack"), isolationLevel);
    }

    private Properties consumerProps() {
        return this.consumerProperties(500);
    }

    private Properties consumerProperties(int maxPollRecords) {
        Properties p = new Properties();
        p.put("bootstrap.servers", "localhost:9092");
        p.put("key.deserializer", StringSerializer.class.getName());
        p.put("value.deserializer", StringSerializer.class.getName());
        p.put("max.poll.records", String.valueOf(maxPollRecords));
        return p;
    }

    private void assign(TopicPartition ... partitions) {
        this.subscriptions.assignFromUser(FetchCollectorTest.partitions(partitions));
    }

    private void assignAndSeek(TopicPartition tp) {
        this.assign(tp);
        this.subscriptions.seek(tp, 0L);
    }

    private static Stream<Arguments> testFetchWithMetadataRefreshErrorsSource() {
        List<Errors> errors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID);
        return errors.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> testFetchWithOtherErrorsSource() {
        ArrayList<Errors> errors = new ArrayList<Errors>(Arrays.asList(Errors.values()));
        errors.removeAll(Arrays.asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID, Errors.OFFSET_OUT_OF_RANGE, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.UNKNOWN_LEADER_EPOCH, Errors.UNKNOWN_SERVER_ERROR, Errors.CORRUPT_MESSAGE));
        return errors.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> testErrorInInitializeSource() {
        return Stream.of(Arguments.of((Object[])new Object[]{10, new RuntimeException()}), Arguments.of((Object[])new Object[]{0, new RuntimeException()}), Arguments.of((Object[])new Object[]{10, new KafkaException()}), Arguments.of((Object[])new Object[]{0, new KafkaException()}));
    }

    private Records createRecords() {
        return this.createRecords(10);
    }

    private Records createRecords(int recordCount) {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)allocate, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);){
            for (int i = 0; i < recordCount; ++i) {
                builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
            }
            MemoryRecords memoryRecords = builder.build();
            return memoryRecords;
        }
    }

    private Records createTransactionalRecords(ControlRecordType controlRecordType, boolean isControlRecordLast, int startOffset, int recordCount) {
        MockTime time = new MockTime();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int baseOffset = startOffset;
        if (!isControlRecordLast) {
            this.writeTransactionMarker(buffer, controlRecordType, startOffset, time);
            ++baseOffset;
        }
        try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset, (long)time.milliseconds(), (long)100L, (short)0, (int)0, (boolean)true, (int)0);){
            for (int i = 0; i < recordCount; ++i) {
                builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
            }
            builder.build();
        }
        if (isControlRecordLast) {
            this.writeTransactionMarker(buffer, controlRecordType, startOffset + recordCount, time);
        }
        buffer.flip();
        return MemoryRecords.readableRecords((ByteBuffer)buffer);
    }

    private void writeTransactionMarker(ByteBuffer buffer, ControlRecordType controlRecordType, int offset, Time time) {
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)offset, (long)time.milliseconds(), (int)0, (long)100L, (short)0, (EndTransactionMarker)new EndTransactionMarker(controlRecordType, 0));
    }

    private class CompletedFetchBuilder {
        private long fetchOffset = 0L;
        private int recordCount = 10;
        private TopicPartition topicPartition;
        private FetchResponseData.PartitionData partitionData;
        private Errors error;

        private CompletedFetchBuilder() {
            this.topicPartition = FetchCollectorTest.this.topicAPartition0;
            this.partitionData = null;
            this.error = null;
        }

        private CompletedFetchBuilder fetchOffset(long fetchOffset) {
            this.fetchOffset = fetchOffset;
            return this;
        }

        private CompletedFetchBuilder recordCount(int recordCount) {
            this.recordCount = recordCount;
            return this;
        }

        private CompletedFetchBuilder error(Errors error) {
            this.error = error;
            return this;
        }

        private CompletedFetchBuilder partitionData(FetchResponseData.PartitionData partitionData) {
            this.partitionData = partitionData;
            return this;
        }

        private CompletedFetchBuilder partition(TopicPartition topicPartition) {
            this.topicPartition = topicPartition;
            return this;
        }

        private CompletedFetch build() {
            Records records = FetchCollectorTest.this.createRecords(this.recordCount);
            if (this.partitionData == null) {
                this.partitionData = new FetchResponseData.PartitionData().setPartitionIndex(FetchCollectorTest.this.topicAPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records);
            }
            if (this.topicPartition != null) {
                this.partitionData.setPartitionIndex(this.topicPartition.partition());
            }
            if (this.error != null) {
                this.partitionData.setErrorCode(this.error.code());
            }
            FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(FetchCollectorTest.this.metricsManager, FetchCollectorTest.this.allPartitions);
            return new CompletedFetch(FetchCollectorTest.this.logContext.logger(CompletedFetch.class), FetchCollectorTest.this.subscriptions, BufferSupplier.create(), this.topicPartition, this.partitionData, metricsAggregator, Long.valueOf(this.fetchOffset), ApiKeys.FETCH.latestVersion());
        }
    }
}

