/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import kafka.cluster.Partition;
import kafka.cluster.PartitionListener;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.DelayedShareFetch;
import kafka.server.share.DelayedShareFetchTest;
import kafka.server.share.SharePartition;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

@Timeout(value=120L)
public class SharePartitionManagerTest {
    private static final int DEFAULT_RECORD_LOCK_DURATION_MS = 30000;
    private static final int MAX_DELIVERY_COUNT = 5;
    private static final short MAX_IN_FLIGHT_MESSAGES = 200;
    private static final short MAX_FETCH_RECORDS = 500;
    private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
    private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
    private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 2000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
    static final int PARTITION_MAX_BYTES = 40000;
    static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
    private Timer mockTimer;
    private ReplicaManager mockReplicaManager;
    private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList());

    @BeforeEach
    public void setUp() {
        this.mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", (Timer)new SystemTimer("sharePartitionManagerTestTimer"));
        this.mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Partition partition = this.mockPartition();
        Mockito.when((Object)this.mockReplicaManager.getPartitionOrException((TopicPartition)Mockito.any())).thenReturn((Object)partition);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.mockTimer.close();
    }

    @Test
    public void testNewContextReturnsFinalContextWithoutRequestData() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 40000));
        reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 40000));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.emptyList(), reqMetadata2, Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context2.getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextWithRequestData() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 40000));
        reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 40000));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(tpId1, 0));
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData3, Collections.emptyList(), reqMetadata2, Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context2.getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextError() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 40000));
        reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 40000));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(tpId1, 40000));
        Assertions.assertThrows(InvalidRequestException.class, () -> sharePartitionManager.newContext(groupId, reqData3, Collections.emptyList(), reqMetadata2, Boolean.valueOf(true)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNewContext() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        topicNames.put(tpId0, "foo");
        topicNames.put(tpId1, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
        reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        ((ShareSessionContext)context2).shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
            Assertions.assertTrue((boolean)reqData2.containsKey(topicIdPartition));
            Assertions.assertEquals(reqData2.get(topicIdPartition), (Object)sharePartitionData);
        });
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(respData2, (Object)resp2.responseData(topicNames));
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        Uuid memberId4 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(memberId4, 1), Boolean.valueOf(true)));
        ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context5.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        ShareSessionContext shareSessionContext5 = (ShareSessionContext)context5;
        ShareSession shareSession = shareSessionContext5.session();
        synchronized (shareSession) {
            shareSessionContext5.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                ShareFetchRequest.SharePartitionData data = cachedSharePartition.reqData();
                Assertions.assertTrue((boolean)reqData2.containsKey(topicIdPartition));
                Assertions.assertEquals(reqData2.get(topicIdPartition), (Object)data);
            });
        }
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertEquals((int)0, (int)resp5.responseData(topicNames).size());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true));
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData8.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        CompletableFuture releaseResponse = sharePartitionManager.releaseSession(groupId, reqMetadata2.memberId().toString());
        Assertions.assertTrue((boolean)releaseResponse.isDone());
        Assertions.assertFalse((boolean)releaseResponse.isCompletedExceptionally());
        Assertions.assertEquals((int)0, (int)cache.size());
    }

    @Test
    public void testShareSessionExpiration() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(2, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid fooId = Uuid.randomUuid();
        topicNames.put(fooId, "foo");
        TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> session1req = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        session1req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
        session1req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
        String groupId = "grp";
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext session1context = sharePartitionManager.newContext(groupId, session1req, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(session1context.getClass(), ShareSessionContext.class);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
        respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
        ShareFetchResponse session1resp = session1context.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)session1resp.error());
        Assertions.assertEquals((int)2, (int)session1resp.responseData(topicNames).size());
        ShareSessionKey session1Key = new ShareSessionKey(groupId, reqMetadata1.memberId());
        Assertions.assertNotNull((Object)cache.get(session1Key));
        time.sleep(500L);
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> session2req = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        session2req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
        session2req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext session2context = sharePartitionManager.newContext(groupId, session2req, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false));
        Assertions.assertEquals(session2context.getClass(), ShareSessionContext.class);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
        respData2.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
        ShareFetchResponse session2resp = session2context.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)session2resp.error());
        Assertions.assertEquals((int)2, (int)session2resp.responseData(topicNames).size());
        ShareSessionKey session2Key = new ShareSessionKey(groupId, reqMetadata2.memberId());
        Assertions.assertNotNull((Object)cache.get(session1Key));
        Assertions.assertNotNull((Object)cache.get(session2Key));
        time.sleep(500L);
        ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(session1context2.getClass(), ShareSessionContext.class);
        time.sleep(501L);
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> session3req = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        session3req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
        session3req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
        ShareRequestMetadata reqMetadata3 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext session3context = sharePartitionManager.newContext(groupId, session3req, EMPTY_PART_LIST, reqMetadata3, Boolean.valueOf(false));
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData3.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
        respData3.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
        ShareFetchResponse session3resp = session3context.updateAndGenerateResponseData(groupId, reqMetadata3.memberId(), respData3);
        Assertions.assertEquals((Object)Errors.NONE, (Object)session3resp.error());
        Assertions.assertEquals((int)2, (int)session3resp.responseData(topicNames).size());
        ShareSessionKey session3Key = new ShareSessionKey(groupId, reqMetadata3.memberId());
        Assertions.assertNotNull((Object)cache.get(session1Key));
        Assertions.assertNull((Object)cache.get(session2Key), (String)"share session 2 should have been evicted by latest share session, as share session 1 was used more recently");
        Assertions.assertNotNull((Object)cache.get(session3Key));
    }

    @Test
    public void testSubsequentShareSession() {
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        topicNames.put(fooId, "foo");
        topicNames.put(barId, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
        reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
        String groupId = "grp";
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp0.partition()));
        respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = Collections.singletonMap(tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
        ArrayList<TopicIdPartition> removed2 = new ArrayList<TopicIdPartition>();
        removed2.add(tp0);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, removed2, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        HashSet<TopicIdPartition> expectedTopicIdPartitions2 = new HashSet<TopicIdPartition>();
        expectedTopicIdPartitions2.add(tp1);
        expectedTopicIdPartitions2.add(tp2);
        HashSet actualTopicIdPartitions2 = new HashSet();
        ShareSessionContext shareSessionContext = (ShareSessionContext)context2;
        shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
            TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
            actualTopicIdPartitions2.add(topicIdPartition);
        });
        Assertions.assertEquals(expectedTopicIdPartitions2, actualTopicIdPartitions2);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
        respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp2.partition()));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals((int)1, (int)resp2.data().responses().size());
        Assertions.assertEquals((Object)barId, (Object)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().get(0)).topicId());
        Assertions.assertEquals((int)1, (int)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().get(0)).partitions().size());
        Assertions.assertEquals((int)0, (int)((ShareFetchResponseData.PartitionData)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().get(0)).partitions().get(0)).partitionIndex());
        Assertions.assertEquals((int)1, (int)resp2.responseData(topicNames).size());
    }

    @Test
    public void testZeroSizeShareSession() {
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid fooId = Uuid.randomUuid();
        topicNames.put(fooId, "foo");
        TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
        reqData1.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
        String groupId = "grp";
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
        respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        ArrayList<TopicIdPartition> removed2 = new ArrayList<TopicIdPartition>();
        removed2.add(foo0);
        removed2.add(foo1);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), removed2, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        LinkedHashMap respData2 = new LinkedHashMap();
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertTrue((boolean)resp2.responseData(topicNames).isEmpty());
        Assertions.assertEquals((int)1, (int)cache.size());
    }

    @Test
    public void testToForgetPartitions() {
        String groupId = "grp";
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(foo, new ShareFetchRequest.SharePartitionData(foo.topicId(), 100));
        reqData1.put(bar, new ShareFetchRequest.SharePartitionData(bar.topicId(), 100));
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        this.assertPartitionsPresent((ShareSessionContext)context1, Arrays.asList(foo, bar));
        this.mockUpdateAndGenerateResponseData(context1, groupId, reqMetadata1.memberId());
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(foo), new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true));
        this.assertPartitionsPresent((ShareSessionContext)context2, Collections.singletonList(bar));
        this.mockUpdateAndGenerateResponseData(context2, groupId, reqMetadata1.memberId());
        ShareFetchContext context3 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(bar), new ShareRequestMetadata(reqMetadata1.memberId(), 2), Boolean.valueOf(true));
        this.assertPartitionsPresent((ShareSessionContext)context3, Collections.emptyList());
    }

    @Test
    public void testShareSessionUpdateTopicIdsBrokerSide() {
        String groupId = "grp";
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 1));
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        topicNames.put(fooId, "foo");
        topicNames.put(barId, "bar");
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(foo, new ShareFetchRequest.SharePartitionData(foo.topicId(), 100));
        reqData1.put(bar, new ShareFetchRequest.SharePartitionData(bar.topicId(), 100));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(bar, new ShareFetchResponseData.PartitionData().setPartitionIndex(bar.partition()));
        respData1.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context2).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code()));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals((short)Errors.INCONSISTENT_TOPIC_ID.code(), (short)((ShareFetchResponseData.PartitionData)resp2.responseData(topicNames).get(foo)).errorCode());
    }

    @Test
    public void testGetErroneousAndValidTopicIdPartitions() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tpNull1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 0));
        TopicIdPartition tpNull2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 1));
        String groupId = "grp";
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
        reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
        reqData2.put(tpNull1, new ShareFetchRequest.SharePartitionData(tpNull1.topicId(), 100));
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        this.assertErroneousAndValidTopicIdPartitions(context2.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(tpNull1), Arrays.asList(tp0, tp1));
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        respData2.put(tpNull1, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        ShareFetchResponse resp2Throttle = context2.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2Throttle.error());
        Assertions.assertEquals((int)100, (int)resp2Throttle.throttleTimeMs());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(Uuid.randomUuid(), 1), Boolean.valueOf(true)));
        ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context5.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        this.assertErroneousAndValidTopicIdPartitions(context5.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(tpNull1), Arrays.asList(tp0, tp1));
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData7 = Collections.singletonMap(tpNull2, new ShareFetchRequest.SharePartitionData(tpNull2.topicId(), 100));
        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, reqData7, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true));
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        this.assertErroneousAndValidTopicIdPartitions(context7.getErroneousAndValidTopicIdPartitions(), Arrays.asList(tpNull1, tpNull2), Arrays.asList(tp0, tp1));
        ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        this.assertErroneousAndValidTopicIdPartitions(context8.getErroneousAndValidTopicIdPartitions(), Collections.emptyList(), Collections.emptyList());
        ShareFetchResponse resp8 = context8.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        Assertions.assertEquals((int)100, (int)resp8.throttleTimeMs());
        CompletableFuture releaseResponse = sharePartitionManager.releaseSession(groupId, reqMetadata2.memberId().toString());
        Assertions.assertTrue((boolean)releaseResponse.isDone());
        Assertions.assertFalse((boolean)releaseResponse.isCompletedExceptionally());
        Assertions.assertEquals((int)0, (int)cache.size());
    }

    @Test
    public void testShareFetchContextResponseSize() {
        MockTime time = new MockTime();
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withTime((Time)time).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        topicNames.put(tpId0, "foo");
        topicNames.put(tpId1, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
        reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
        ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
        short version = ApiKeys.SHARE_FETCH.latestVersion();
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int respSize2 = context2.responseSize(respData2, version);
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(respData2, (Object)resp2.responseData(topicNames));
        Assertions.assertEquals((int)(4 + resp2.data().size(objectSerializationCache, version)), (int)respSize2);
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        Uuid memberId4 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(memberId4, 1), Boolean.valueOf(true)));
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData5 = Collections.singletonMap(tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
        ShareFetchContext context5 = sharePartitionManager.newContext(groupId, reqData5, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context5.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData5 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData5.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        int respSize5 = context5.responseSize(respData5, version);
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData5);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertEquals((int)(4 + resp5.data().size(objectSerializationCache, version)), (int)respSize5);
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true)));
        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true));
        int respSize7 = context7.responseSize(respData2, version);
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        Assertions.assertEquals((int)(4 + new ShareFetchResponseData().size(objectSerializationCache, version)), (int)respSize7);
        ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int respSize8 = context8.responseSize(respData8, version);
        ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        Assertions.assertEquals((int)(4 + resp8.data().size(objectSerializationCache, version)), (int)respSize8);
    }

    @Test
    public void testCachedTopicPartitionsWithNoTopicPartitions() {
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        List result = sharePartitionManager.cachedTopicIdPartitionsInShareSession("grp", Uuid.randomUuid());
        Assertions.assertTrue((boolean)result.isEmpty());
    }

    @Test
    public void testCachedTopicPartitionsForValidShareSessions() {
        ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid memberId2 = Uuid.randomUuid();
        LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
        reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, 0);
        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context1.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareSessionKey shareSessionKey1 = new ShareSessionKey(groupId, reqMetadata1.memberId());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals(new HashSet<TopicIdPartition>(Arrays.asList(tp0, tp1)), new HashSet(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = Collections.singletonMap(tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, 0);
        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false));
        Assertions.assertEquals(ShareSessionContext.class, context2.getClass());
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(Collections.singletonList(tp2), (Object)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = Collections.singletonMap(tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
        ShareFetchContext context3 = sharePartitionManager.newContext(groupId, reqData3, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey1.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context3.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context3).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData3.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp3 = context3.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData3);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp3.error());
        Assertions.assertEquals(new HashSet<TopicIdPartition>(Arrays.asList(tp0, tp1, tp2)), new HashSet(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData4 = Collections.singletonMap(tp3, new ShareFetchRequest.SharePartitionData(tp3.topicId(), 100));
        ShareFetchContext context4 = sharePartitionManager.newContext(groupId, reqData4, Collections.singletonList(tp2), new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context4.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context4).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData4 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData4.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp4 = context4.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData4);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp4.error());
        Assertions.assertEquals(Collections.singletonList(tp3), (Object)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
        ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), -1), Boolean.valueOf(true));
        Assertions.assertEquals(FinalContext.class, context5.getClass());
        LinkedHashMap respData5 = new LinkedHashMap();
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData5);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertFalse((boolean)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1).isEmpty());
        sharePartitionManager.releaseSession(groupId, reqMetadata1.memberId().toString());
        Assertions.assertTrue((boolean)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1).isEmpty());
        ShareFetchContext context6 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(tp3), new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true));
        Assertions.assertEquals(ShareSessionContext.class, context6.getClass());
        Assertions.assertTrue((boolean)((ShareSessionContext)context6).isSubsequent());
        LinkedHashMap respData6 = new LinkedHashMap();
        ShareFetchResponse resp6 = context6.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData6);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp6.error());
        Assertions.assertEquals(Collections.emptyList(), (Object)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
    }

    @Test
    public void testSharePartitionKey() {
        SharePartitionKey sharePartitionKey1 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        SharePartitionKey sharePartitionKey2 = new SharePartitionKey("mock-group-2", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        SharePartitionKey sharePartitionKey3 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
        SharePartitionKey sharePartitionKey4 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 1)));
        SharePartitionKey sharePartitionKey5 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
        SharePartitionKey sharePartitionKey1Copy = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        Assertions.assertEquals((Object)sharePartitionKey1, (Object)sharePartitionKey1Copy);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey2);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey3);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey4);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey5);
        Assertions.assertNotEquals((Object)sharePartitionKey1, null);
    }

    @Test
    public void testMultipleSequentialShareFetches() {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1));
        TopicIdPartition tp4 = new TopicIdPartition(fooId, new TopicPartition("foo", 2));
        TopicIdPartition tp5 = new TopicIdPartition(barId, new TopicPartition("bar", 2));
        TopicIdPartition tp6 = new TopicIdPartition(fooId, new TopicPartition("foo", 3));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp0, 40000);
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        partitionMaxBytes.put(tp3, 40000);
        partitionMaxBytes.put(tp4, 40000);
        partitionMaxBytes.put(tp5, 40000);
        partitionMaxBytes.put(tp6, 40000);
        this.mockFetchOffsetForTimestamp(this.mockReplicaManager);
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)0L).thenReturn((Object)100L);
        Metrics metrics = new Metrics();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp0, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp3, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp4, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp5, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp6, 1);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTime(time).withMetrics(metrics).withTimer(this.mockTimer).build();
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet())).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)2))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)3))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        HashMap<MetricName, Consumer> expectedMetrics = new HashMap<MetricName, Consumer>();
        expectedMetrics.put(metrics.metricName("partition-load-time-avg", "share-group-metrics"), val -> Assertions.assertEquals((int)val.intValue(), (int)14, (String)"partition-load-time-avg"));
        expectedMetrics.put(metrics.metricName("partition-load-time-max", "share-group-metrics"), val -> Assertions.assertEquals((Double)val, (double)100.0, (String)"partition-load-time-max"));
        expectedMetrics.forEach((metric, test) -> {
            Assertions.assertTrue((boolean)metrics.metrics().containsKey(metric));
            test.accept((Double)((KafkaMetric)metrics.metrics().get(metric)).metricValue());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleConcurrentShareFetches() throws InterruptedException {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp0, 40000);
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        partitionMaxBytes.put(tp3, 40000);
        MockTime time = new MockTime(0L, System.currentTimeMillis(), 0L);
        this.mockFetchOffsetForTimestamp(this.mockReplicaManager);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp0, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp3, 1);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withTime((Time)time).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)1L, (Object[])new Long[]{15L, 6L, 30L, 25L});
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)4L, (Object[])new Long[]{1L, 18L, 5L});
        Mockito.when((Object)sp2.nextFetchOffset()).thenReturn((Object)10L, (Object[])new Long[]{25L, 26L});
        Mockito.when((Object)sp3.nextFetchOffset()).thenReturn((Object)20L, (Object[])new Long[]{15L, 23L, 16L});
        ((ReplicaManager)Mockito.doAnswer(invocation -> {
            Assertions.assertEquals((long)1L, (long)sp0.nextFetchOffset());
            Assertions.assertEquals((long)4L, (long)sp1.nextFetchOffset());
            Assertions.assertEquals((long)10L, (long)sp2.nextFetchOffset());
            Assertions.assertEquals((long)20L, (long)sp3.nextFetchOffset());
            return SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet());
        }).doAnswer(invocation -> {
            Assertions.assertEquals((long)15L, (long)sp0.nextFetchOffset());
            Assertions.assertEquals((long)1L, (long)sp1.nextFetchOffset());
            Assertions.assertEquals((long)25L, (long)sp2.nextFetchOffset());
            Assertions.assertEquals((long)15L, (long)sp3.nextFetchOffset());
            return SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet());
        }).doAnswer(invocation -> {
            Assertions.assertEquals((long)6L, (long)sp0.nextFetchOffset());
            Assertions.assertEquals((long)18L, (long)sp1.nextFetchOffset());
            Assertions.assertEquals((long)26L, (long)sp2.nextFetchOffset());
            Assertions.assertEquals((long)23L, (long)sp3.nextFetchOffset());
            return SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet());
        }).doAnswer(invocation -> {
            Assertions.assertEquals((long)30L, (long)sp0.nextFetchOffset());
            Assertions.assertEquals((long)5L, (long)sp1.nextFetchOffset());
            Assertions.assertEquals((long)26L, (long)sp2.nextFetchOffset());
            Assertions.assertEquals((long)16L, (long)sp3.nextFetchOffset());
            return SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet());
        }).doAnswer(invocation -> {
            Assertions.assertEquals((long)25L, (long)sp0.nextFetchOffset());
            Assertions.assertEquals((long)5L, (long)sp1.nextFetchOffset());
            Assertions.assertEquals((long)26L, (long)sp2.nextFetchOffset());
            Assertions.assertEquals((long)16L, (long)sp3.nextFetchOffset());
            return SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet());
        }).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        try {
            for (int i = 0; i != threadCount; ++i) {
                executorService.submit(() -> sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes));
                if (i % 10 != 0) continue;
                executorService.awaitTermination(50L, TimeUnit.MILLISECONDS);
            }
        }
        finally {
            if (!executorService.awaitTermination(50L, TimeUnit.MILLISECONDS)) {
                executorService.shutdown();
            }
        }
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.atMost((int)100))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.atLeast((int)10))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testReplicaManagerFetchShouldNotProceed() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp0, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Map result = (Map)future.join();
        Assertions.assertEquals((int)0, (int)result.size());
    }

    @Test
    public void testReplicaManagerFetchShouldProceed() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        this.mockFetchOffsetForTimestamp(this.mockReplicaManager);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp0, 1);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet())).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testCloseSharePartitionManager() throws Exception {
        Timer timer = (Timer)Mockito.mock(SystemTimerReaper.class);
        Persister persister = (Persister)Mockito.mock(Persister.class);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withTimer(timer).withShareGroupPersister(persister).build();
        ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)0))).close();
        ((Persister)Mockito.verify((Object)persister, (VerificationMode)Mockito.times((int)0))).stop();
        sharePartitionManager.close();
        ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)1))).close();
    }

    @Test
    public void testReleaseSessionSuccess() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("baz", 4));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId.toString()))).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId.toString()))).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRecordStateException("Unable to release acquired records for the batch")));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp2));
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp3));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withPartitionCacheMap(partitionCacheMap).build();
        CompletableFuture resultFuture = sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)3, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp1));
        Assertions.assertTrue((boolean)result.containsKey(tp2));
        Assertions.assertTrue((boolean)result.containsKey(tp3));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).errorCode());
        Assertions.assertEquals((int)2, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).partitionIndex());
        Assertions.assertEquals((short)Errors.INVALID_RECORD_STATE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorCode());
        Assertions.assertEquals((Object)"Unable to release acquired records for the batch", (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorMessage());
        Assertions.assertEquals((int)4, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorMessage());
    }

    @Test
    public void testReleaseSessionWithIncorrectGroupId() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = sharePartitionManager.releaseSession("grp-2", memberId.toString());
        Assertions.assertTrue((boolean)resultFuture.isDone());
        Assertions.assertTrue((boolean)resultFuture.isCompletedExceptionally());
        Throwable exception = Assertions.assertThrows(ExecutionException.class, resultFuture::get);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, (Object)exception.getCause());
    }

    @Test
    public void testReleaseSessionWithIncorrectMemberId() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(memberId, new TopicPartition("foo", 0));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, Uuid.randomUuid()))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = sharePartitionManager.releaseSession(groupId, memberId.toString());
        Assertions.assertTrue((boolean)resultFuture.isDone());
        Assertions.assertTrue((boolean)resultFuture.isCompletedExceptionally());
        Throwable exception = Assertions.assertThrows(ExecutionException.class, resultFuture::get);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, (Object)exception.getCause());
    }

    @Test
    public void testReleaseSessionWithEmptyTopicPartitions() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)new ImplicitLinkedHashCollection());
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)0, (int)result.size());
    }

    @Test
    public void testReleaseSessionWithNullShareSession() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn(null);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)((ShareSession)Mockito.mock(ShareSession.class)));
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)0, (int)result.size());
    }

    @Test
    public void testAcknowledgeSinglePartition() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp2);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        CompletableFuture resultFuture = sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
    }

    @Test
    public void testAcknowledgeMultiplePartition() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp3.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
        Metrics metrics = new Metrics();
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withMetrics(metrics).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp1, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        acknowledgeTopics.put(tp2, Arrays.asList(new ShareAcknowledgementBatch(15L, 26L, Collections.singletonList((byte)2)), new ShareAcknowledgementBatch(34L, 56L, Collections.singletonList((byte)2))));
        acknowledgeTopics.put(tp3, Arrays.asList(new ShareAcknowledgementBatch(4L, 15L, Collections.singletonList((byte)3)), new ShareAcknowledgementBatch(16L, 21L, Collections.singletonList((byte)3))));
        CompletableFuture resultFuture = sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)3, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp1));
        Assertions.assertTrue((boolean)result.containsKey(tp2));
        Assertions.assertTrue((boolean)result.containsKey(tp3));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).errorCode());
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorCode());
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorCode());
        HashMap<MetricName, Consumer> expectedMetrics = new HashMap<MetricName, Consumer>();
        expectedMetrics.put(metrics.metricName("share-acknowledgement-count", "share-group-metrics"), val -> Assertions.assertEquals((Double)val, (double)1.0));
        expectedMetrics.put(metrics.metricName("share-acknowledgement-rate", "share-group-metrics"), val -> Assertions.assertTrue((val > 0.0 ? 1 : 0) != 0));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), val -> Assertions.assertEquals((double)2.0, (Double)val));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), val -> Assertions.assertEquals((double)2.0, (Double)val));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), val -> Assertions.assertEquals((double)2.0, (Double)val));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), val -> Assertions.assertTrue((val > 0.0 ? 1 : 0) != 0));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), val -> Assertions.assertTrue((val > 0.0 ? 1 : 0) != 0));
        expectedMetrics.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), val -> Assertions.assertTrue((val > 0.0 ? 1 : 0) != 0));
        expectedMetrics.forEach((metric, test) -> {
            Assertions.assertTrue((boolean)metrics.metrics().containsKey(metric));
            test.accept((Double)((KafkaMetric)metrics.metrics().get(metric)).metricValue());
        });
    }

    @Test
    public void testAcknowledgeIncorrectGroupId() {
        String groupId = "grp";
        String groupId2 = "grp2";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp2);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        CompletableFuture resultFuture = sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
    }

    @Test
    public void testAcknowledgeIncorrectMemberId() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRequestException("Member is not the owner of batch record")));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp2);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        CompletableFuture resultFuture = sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.INVALID_REQUEST.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)"Member is not the owner of batch record", (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
    }

    @Test
    public void testAcknowledgeEmptyPartitionCacheMap() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, Arrays.asList(new ShareAcknowledgementBatch(78L, 90L, Collections.singletonList((byte)2)), new ShareAcknowledgementBatch(94L, 99L, Collections.singletonList((byte)2))));
        CompletableFuture resultFuture = sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)3, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
    }

    @Test
    public void testAcknowledgeCompletesDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), partitionMaxBytes, 100);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 2);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet())).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp1, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Assertions.assertEquals((int)1, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)1))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp3)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), partitionMaxBytes, 100);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp3.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp3, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte)1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte)1))));
        sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testReleaseSessionCompletesDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, Uuid.fromString((String)memberId)))).thenReturn((Object)shareSession);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), partitionMaxBytes, 100);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 1);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        SharePartitionManager sharePartitionManager = (SharePartitionManager)Mockito.spy((Object)SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withCache(cache).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(partitionMaxBytes.keySet())).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        Mockito.when((Object)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString((String)memberId))).thenReturn(Arrays.asList(tp1, tp3));
        sharePartitionManager.releaseSession(groupId, memberId);
        Assertions.assertEquals((int)1, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)1))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp1, 40000);
        partitionMaxBytes.put(tp2, 40000);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, Uuid.fromString((String)memberId)))).thenReturn((Object)shareSession);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp3)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), partitionMaxBytes, 100);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp3.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        SharePartitionManager sharePartitionManager = (SharePartitionManager)Mockito.spy((Object)SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withCache(cache).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        Mockito.when((Object)sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString((String)memberId))).thenReturn(Collections.singletonList(tp3));
        sharePartitionManager.releaseSession(groupId, memberId);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testPendingInitializationShouldCompleteFetchRequest() throws Exception {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        CompletableFuture pendingInitializationFuture = new CompletableFuture();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(pendingInitializationFuture);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        Assertions.assertTrue((boolean)((Map)future.join()).isEmpty());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertFalse((boolean)pendingInitializationFuture.isDone());
        pendingInitializationFuture.complete(null);
    }

    @Test
    public void testDelayedInitializationShouldCompleteFetchRequest() throws Exception {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        CompletableFuture<Object> pendingInitializationFuture1 = new CompletableFuture<Object>();
        CompletableFuture<Object> pendingInitializationFuture2 = new CompletableFuture<Object>();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(pendingInitializationFuture1).thenReturn(pendingInitializationFuture2).thenReturn(CompletableFuture.failedFuture((Throwable)new LeaderNotAvailableException("Leader not available")));
        DelayedOperationPurgatory shareFetchPurgatorySpy = (DelayedOperationPurgatory)Mockito.spy((Object)new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true));
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)shareFetchPurgatorySpy);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture future1 = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        CompletableFuture future2 = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        CompletableFuture future3 = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)3))).maybeInitialize();
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)3))).addDelayedShareFetchRequest((DelayedShareFetch)ArgumentMatchers.any(), (List)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)3))).tryCompleteElseWatch((DelayedOperation)((DelayedShareFetch)ArgumentMatchers.any()), (List)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)0))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertFalse((boolean)future2.isDone());
        Assertions.assertFalse((boolean)future3.isDone());
        pendingInitializationFuture1.complete(null);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)1))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        pendingInitializationFuture2.complete(null);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)2))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)2))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testSharePartitionInitializationExceptions() throws Exception {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new LeaderNotAvailableException("Leader not available")));
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Assertions.assertTrue((boolean)((Map)future.join()).isEmpty());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)0))).markFenced();
        Assertions.assertEquals((int)1, (int)partitionCacheMap.size());
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new IllegalStateException("Illegal state")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorNotAvailableException("Coordinator not available")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)2))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRequestException("Invalid request")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.INVALID_REQUEST, "Invalid request");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)3))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new FencedStateEpochException("Fenced state epoch")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)4))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new NotLeaderOrFollowerException("Not leader or follower")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)5))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new RuntimeException("Runtime exception")));
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)6))).markFenced();
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
    }

    @Test
    public void testShareFetchProcessingExceptions() throws Exception {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        Map partitionCacheMap = (Map)Mockito.mock(Map.class);
        Mockito.when((Object)((SharePartition)partitionCacheMap.computeIfAbsent((SharePartitionKey)ArgumentMatchers.any(), (Function)ArgumentMatchers.any()))).thenThrow(new Throwable[]{new RuntimeException("Error creating instance")});
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
    }

    @Test
    public void testSharePartitionInitializationFailure() throws Exception {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.isLeader()).thenReturn((Object)false);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any())).thenThrow(new Throwable[]{new KafkaStorageException("Exception")}).thenReturn((Object)partition);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCacheMap(partitionCacheMap).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception");
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
        future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER);
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
    }

    @Test
    public void testSharePartitionPartialInitializationFailure() throws Exception {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(memberId1, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(memberId1, new TopicPartition("foo", 2));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, 40000, tp1, 40000, tp2, 40000);
        Partition partition0 = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition0.isLeader()).thenReturn((Object)false);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)partition0);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (FetchPartitionData)ArgumentMatchers.any())).thenReturn((Object)new ShareAcquiredRecords(Collections.emptyList(), 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
        Mockito.when((Object)sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture((Throwable)new FencedStateEpochException("Fenced state epoch")));
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, replicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(replicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(Collections.singleton(tp1))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCacheMap(partitionCacheMap).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Map partitionDataMap = (Map)future.get();
        Assertions.assertEquals((int)3, (int)partitionDataMap.size());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp0));
        Assertions.assertEquals((short)Errors.NOT_LEADER_OR_FOLLOWER.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp0)).errorCode());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp1));
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp1)).errorCode());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp2));
        Assertions.assertEquals((short)Errors.FENCED_STATE_EPOCH.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp2)).errorCode());
        Assertions.assertEquals((Object)"Fenced state epoch", (Object)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp2)).errorMessage());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)0))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testReplicaManagerFetchException() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception");
        Assertions.assertEquals((int)1, (int)partitionCacheMap.size());
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new NotLeaderOrFollowerException("Leader exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
    }

    @Test
    public void testReplicaManagerFetchMultipleSharePartitionsException() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp0, 40000);
        partitionMaxBytes.put(tp1, 40000);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)false);
        Mockito.when((Object)sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), 1000, true, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new FencedStateEpochException("Fenced exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCacheMap(partitionCacheMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception");
        Assertions.assertEquals((int)1, (int)partitionCacheMap.size());
        Assertions.assertEquals((Object)sp1, partitionCacheMap.get(new SharePartitionKey(groupId, tp1)));
        Mockito.when((Object)sp1.maybeAcquireFetchLock()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new FencedStateEpochException("Fenced exception again")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
        Assertions.assertTrue((boolean)partitionCacheMap.isEmpty());
    }

    @Test
    public void testListenerRegistration() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        HashMap<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<TopicIdPartition, Integer>();
        partitionMaxBytes.put(tp0, 40000);
        partitionMaxBytes.put(tp1, 40000);
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Partition partition = this.mockPartition();
        Mockito.when((Object)mockReplicaManager.getPartitionOrException((TopicPartition)Mockito.any())).thenReturn((Object)partition);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(mockReplicaManager).withTimer(this.mockTimer).build();
        sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)2))).maybeAddListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
    }

    @Test
    public void testSharePartitionListenerOnFailed() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
        this.testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onFailed(arg_0));
    }

    @Test
    public void testSharePartitionListenerOnDeleted() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
        this.testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onDeleted(arg_0));
    }

    @Test
    public void testSharePartitionListenerOnBecomingFollower() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
        this.testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onBecomingFollower(arg_0));
    }

    private void testSharePartitionListener(SharePartitionKey sharePartitionKey, Map<SharePartitionKey, SharePartition> partitionCacheMap, ReplicaManager mockReplicaManager, Consumer<TopicPartition> listenerConsumer) {
        TopicPartition tp = new TopicPartition("foo", 1);
        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
        SharePartitionKey spk = new SharePartitionKey("grp", tpId);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        partitionCacheMap.put(sharePartitionKey, sp0);
        partitionCacheMap.put(spk, sp1);
        listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
        Assertions.assertEquals((int)1, (int)partitionCacheMap.size());
        Assertions.assertFalse((boolean)partitionCacheMap.containsKey(sharePartitionKey));
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)1))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
        listenerConsumer.accept(tp);
        Assertions.assertEquals((int)1, (int)partitionCacheMap.size());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)1))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
    }

    private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
    }

    private ShareFetchResponseData.PartitionData errorShareFetchResponse(Short errorCode) {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setErrorCode(errorCode.shortValue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mockUpdateAndGenerateResponseData(ShareFetchContext context, String groupId, Uuid memberId) {
        LinkedHashMap data = new LinkedHashMap();
        if (context.getClass() == ShareSessionContext.class) {
            ShareSessionContext shareSessionContext = (ShareSessionContext)context;
            if (!shareSessionContext.isSubsequent()) {
                shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> data.put(topicIdPartition, topicIdPartition.topic() == null ? this.errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) : this.noErrorShareFetchResponse()));
            } else {
                ShareSession shareSession = shareSessionContext.session();
                synchronized (shareSession) {
                    shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                        TopicIdPartition topicIdPartition;
                        data.put(topicIdPartition, (topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()))).topic() == null ? this.errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) : this.noErrorShareFetchResponse());
                    });
                }
            }
        }
        context.updateAndGenerateResponseData(groupId, memberId, data);
    }

    private void assertPartitionsPresent(ShareSessionContext context, List<TopicIdPartition> partitions) {
        HashSet partitionsInContext = new HashSet();
        if (!context.isSubsequent()) {
            context.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> partitionsInContext.add(topicIdPartition));
        } else {
            context.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                partitionsInContext.add(topicIdPartition);
            });
        }
        HashSet<TopicIdPartition> partitionsSet = new HashSet<TopicIdPartition>(partitions);
        Assertions.assertEquals(partitionsSet, partitionsInContext);
    }

    private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData, List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
        HashSet<TopicIdPartition> expectedErroneousSet = new HashSet<TopicIdPartition>(expectedErroneous);
        HashSet<TopicIdPartition> expectedValidSet = new HashSet<TopicIdPartition>(expectedValid);
        HashSet actualErroneousPartitions = new HashSet();
        HashSet actualValidPartitions = new HashSet();
        erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) -> actualErroneousPartitions.add(topicIdPartition));
        erroneousAndValidPartitionData.validTopicIdPartitions().forEach((topicIdPartition, partitionData) -> actualValidPartitions.add(topicIdPartition));
        Assertions.assertEquals(expectedErroneousSet, actualErroneousPartitions);
        Assertions.assertEquals(expectedValidSet, actualValidPartitions);
    }

    private Partition mockPartition() {
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.isLeader()).thenReturn((Object)true);
        Mockito.when((Object)partition.getLeaderEpoch()).thenReturn((Object)1);
        return partition;
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, TopicIdPartition topicIdPartition, Errors error) {
        this.validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, null);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, TopicIdPartition topicIdPartition, Errors error, String message) {
        this.validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, message);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, List<TopicIdPartition> topicIdPartitions, Errors error, String message) {
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join();
        Assertions.assertEquals((int)topicIdPartitions.size(), (int)result.size());
        topicIdPartitions.forEach(topicIdPartition -> {
            Assertions.assertTrue((boolean)result.containsKey(topicIdPartition));
            Assertions.assertEquals((int)topicIdPartition.partition(), (int)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).partitionIndex());
            Assertions.assertEquals((short)error.code(), (short)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).errorCode());
            Assertions.assertEquals((Object)message, (Object)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).errorMessage());
        });
    }

    private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
        FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
        ((ReplicaManager)Mockito.doReturn((Object)new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when((Object)replicaManager)).fetchOffsetForTimestamp((TopicPartition)Mockito.any(TopicPartition.class), Mockito.anyLong(), (Option)Mockito.any(), (Optional)Mockito.any(), Mockito.anyBoolean());
    }

    static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
        ArrayList logReadResults = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2(topicIdPartition, (Object)new LogReadResult(new FetchDataInfo(new LogOffsetMetadata(0L, 0L, 0), (Records)MemoryRecords.EMPTY), Option.empty(), -1L, -1L, -1L, -1L, -1L, Option.empty(), Option.empty(), Option.empty()))));
        return CollectionConverters.asScala(logReadResults).toSeq();
    }

    static void mockReplicaManagerDelayedShareFetch(ReplicaManager replicaManager, DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
        ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
            Object[] args = invocationOnMock.getArguments();
            DelayedShareFetchKey key = (DelayedShareFetchKey)args[0];
            delayedShareFetchPurgatory.checkAndComplete((DelayedOperationKey)key);
            return null;
        }).when((Object)replicaManager)).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any(DelayedShareFetchKey.class));
        ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
            Object[] args = invocationOnMock.getArguments();
            DelayedShareFetch operation = (DelayedShareFetch)args[0];
            List keys = (List)args[1];
            delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)operation, keys);
            return null;
        }).when((Object)replicaManager)).addDelayedShareFetchRequest((DelayedShareFetch)ArgumentMatchers.any(), (List)ArgumentMatchers.any());
    }

    static class SharePartitionManagerBuilder {
        private ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        private Time time = new MockTime();
        private ShareSessionCache cache = new ShareSessionCache(10, 1000L);
        private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<SharePartitionKey, SharePartition>();
        private Persister persister = new NoOpShareStatePersister();
        private Timer timer = new MockTimer();
        private Metrics metrics = new Metrics();

        SharePartitionManagerBuilder() {
        }

        private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
            this.replicaManager = replicaManager;
            return this;
        }

        private SharePartitionManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private SharePartitionManagerBuilder withCache(ShareSessionCache cache) {
            this.cache = cache;
            return this;
        }

        SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap) {
            this.partitionCacheMap = partitionCacheMap;
            return this;
        }

        private SharePartitionManagerBuilder withShareGroupPersister(Persister persister) {
            this.persister = persister;
            return this;
        }

        private SharePartitionManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        private SharePartitionManagerBuilder withMetrics(Metrics metrics) {
            this.metrics = metrics;
            return this;
        }

        public static SharePartitionManagerBuilder builder() {
            return new SharePartitionManagerBuilder();
        }

        public SharePartitionManager build() {
            return new SharePartitionManager(this.replicaManager, this.time, this.cache, this.partitionCacheMap, 30000, this.timer, 5, 200, 500, this.persister, (GroupConfigManager)Mockito.mock(GroupConfigManager.class), this.metrics);
        }
    }
}

