/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.FetchUtils;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;

public abstract class AbstractFetch<K, V>
implements Closeable {
    private final Logger log;
    protected final LogContext logContext;
    protected final ConsumerNetworkClient client;
    protected final ConsumerMetadata metadata;
    protected final SubscriptionState subscriptions;
    protected final FetchConfig<K, V> fetchConfig;
    protected final Time time;
    protected final FetchMetricsManager metricsManager;
    protected final FetchBuffer fetchBuffer;
    protected final BufferSupplier decompressionBufferSupplier;
    protected final Set<Integer> nodesWithPendingFetchRequests;
    protected final IdempotentCloser idempotentCloser = new IdempotentCloser();
    private final Map<Integer, FetchSessionHandler> sessionHandlers;

    public AbstractFetch(LogContext logContext, ConsumerNetworkClient client, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig<K, V> fetchConfig, FetchMetricsManager metricsManager, Time time) {
        this.log = logContext.logger(AbstractFetch.class);
        this.logContext = logContext;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.fetchConfig = fetchConfig;
        this.decompressionBufferSupplier = BufferSupplier.create();
        this.fetchBuffer = new FetchBuffer(logContext);
        this.sessionHandlers = new HashMap<Integer, FetchSessionHandler>();
        this.nodesWithPendingFetchRequests = new HashSet<Integer>();
        this.metricsManager = metricsManager;
        this.time = time;
    }

    boolean hasCompletedFetches() {
        return !this.fetchBuffer.isEmpty();
    }

    public boolean hasAvailableFetches() {
        return this.fetchBuffer.hasCompletedFetches(fetch -> this.subscriptions.isFetchable(fetch.partition));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleFetchResponse(Node fetchTarget, FetchSessionHandler.FetchRequestData data, ClientResponse resp) {
        try {
            FetchResponse response = (FetchResponse)resp.responseBody();
            FetchSessionHandler handler = this.sessionHandler(fetchTarget.id());
            if (handler == null) {
                this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", (Object)fetchTarget.id());
                return;
            }
            short requestVersion = resp.requestHeader().apiVersion();
            if (!handler.handleResponse(response, requestVersion)) {
                if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                    this.metadata.requestUpdate();
                }
                return;
            }
            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion);
            HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(responseData.keySet());
            FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(this.metricsManager, partitions);
            for (Map.Entry entry : responseData.entrySet()) {
                TopicPartition partition = (TopicPartition)entry.getKey();
                FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
                if (requestData == null) {
                    String message = data.metadata().isFull() ? MessageFormatter.arrayFormat((String)"Response for missing full request partition: partition={}; metadata={}", (Object[])new Object[]{partition, data.metadata()}).getMessage() : MessageFormatter.arrayFormat((String)"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", (Object[])new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage();
                    throw new IllegalStateException(message);
                }
                long fetchOffset = requestData.fetchOffset;
                FetchResponseData.PartitionData partitionData = (FetchResponseData.PartitionData)entry.getValue();
                this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", new Object[]{this.fetchConfig.isolationLevel, fetchOffset, partition, partitionData});
                CompletedFetch completedFetch = new CompletedFetch(this.logContext, this.subscriptions, this.decompressionBufferSupplier, partition, partitionData, metricAggregator, fetchOffset, requestVersion);
                this.fetchBuffer.add(completedFetch);
            }
            this.metricsManager.recordLatency(resp.requestLatencyMs());
        }
        finally {
            this.log.debug("Removing pending request for node {}", (Object)fetchTarget);
            this.nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleFetchResponse(Node fetchTarget, Throwable t) {
        try {
            FetchSessionHandler handler = this.sessionHandler(fetchTarget.id());
            if (handler != null) {
                handler.handleError(t);
                handler.sessionTopicPartitions().forEach(this.subscriptions::clearPreferredReadReplica);
            }
        }
        finally {
            this.log.debug("Removing pending request for node {}", (Object)fetchTarget);
            this.nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
    }

    protected void handleCloseFetchSessionResponse(Node fetchTarget, FetchSessionHandler.FetchRequestData data) {
        int sessionId = data.metadata().sessionId();
        this.log.debug("Successfully sent a close message for fetch session: {} to node: {}", (Object)sessionId, (Object)fetchTarget);
    }

    public void handleCloseFetchSessionResponse(Node fetchTarget, FetchSessionHandler.FetchRequestData data, Throwable t) {
        int sessionId = data.metadata().sessionId();
        this.log.debug("Unable to a close message for fetch session: {} to node: {}. This may result in unnecessary fetch sessions at the broker.", new Object[]{sessionId, fetchTarget, t});
    }

    protected FetchRequest.Builder createFetchRequest(Node fetchTarget, FetchSessionHandler.FetchRequestData requestData) {
        short maxVersion = requestData.canUseTopicIds() ? (short)ApiKeys.FETCH.latestVersion() : (short)12;
        FetchRequest.Builder request = FetchRequest.Builder.forConsumer(maxVersion, this.fetchConfig.maxWaitMs, this.fetchConfig.minBytes, requestData.toSend()).isolationLevel(this.fetchConfig.isolationLevel).setMaxBytes(this.fetchConfig.maxBytes).metadata(requestData.metadata()).removed(requestData.toForget()).replaced(requestData.toReplace()).rackId(this.fetchConfig.clientRackId);
        this.log.debug("Sending {} {} to broker {}", new Object[]{this.fetchConfig.isolationLevel, requestData, fetchTarget});
        this.log.debug("Adding pending request for node {}", (Object)fetchTarget);
        this.nodesWithPendingFetchRequests.add(fetchTarget.id());
        return request;
    }

    private Set<TopicPartition> fetchablePartitions() {
        Set<TopicPartition> buffered = this.fetchBuffer.bufferedPartitions();
        Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
        return new HashSet<TopicPartition>(this.subscriptions.fetchablePartitions(isNotBuffered));
    }

    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long currentTimeMs) {
        Optional<Integer> nodeId = this.subscriptions.preferredReadReplica(partition, currentTimeMs);
        if (nodeId.isPresent()) {
            Optional node = nodeId.flatMap(id -> this.metadata.fetch().nodeIfOnline(partition, (int)id));
            if (node.isPresent()) {
                return (Node)node.get();
            }
            this.log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata, using the leader instead.", nodeId, (Object)partition);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, partition);
            return leaderReplica;
        }
        return leaderReplica;
    }

    private Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests() {
        Cluster cluster = this.metadata.fetch();
        LinkedHashMap fetchable = new LinkedHashMap();
        try {
            this.sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
                sessionHandler.notifyClose();
                Node fetchTarget = cluster.nodeById((int)fetchTargetNodeId);
                if (fetchTarget == null || this.client.isUnavailable(fetchTarget)) {
                    this.log.debug("Skip sending close session request to broker {} since it is not reachable", (Object)fetchTarget);
                    return;
                }
                fetchable.put(fetchTarget, sessionHandler.newBuilder());
            });
        }
        finally {
            this.sessionHandlers.clear();
        }
        LinkedHashMap<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<Node, FetchSessionHandler.FetchRequestData>();
        for (Map.Entry entry : fetchable.entrySet()) {
            reqs.put((Node)entry.getKey(), ((FetchSessionHandler.Builder)entry.getValue()).build());
        }
        return reqs;
    }

    protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        this.metricsManager.maybeUpdateAssignment(this.subscriptions);
        LinkedHashMap<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<Node, FetchSessionHandler.Builder>();
        long currentTimeMs = this.time.milliseconds();
        Map<String, Uuid> topicIds = this.metadata.topicIds();
        for (TopicPartition partition : this.fetchablePartitions()) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + partition);
            }
            Optional<Node> leaderOpt = position.currentLeader.leader;
            if (!leaderOpt.isPresent()) {
                this.log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", (Object)partition, (Object)position);
                this.metadata.requestUpdate();
                continue;
            }
            Node node = this.selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
            if (this.client.isUnavailable(node)) {
                this.client.maybeThrowAuthFailure(node);
                this.log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", (Object)partition, (Object)node);
                continue;
            }
            if (this.nodesWithPendingFetchRequests.contains(node.id())) {
                this.log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", (Object)partition, (Object)node);
                continue;
            }
            FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                FetchSessionHandler fetchSessionHandler = this.sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(this.logContext, (int)n));
                return fetchSessionHandler.newBuilder();
            });
            Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
            FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId, position.offset, -1L, this.fetchConfig.fetchSize, Optional.empty(), position.currentLeader.epoch, Optional.empty());
            builder.add(partition, partitionData);
            this.log.debug("Added {} fetch request for partition {} at position {} to node {}", new Object[]{this.fetchConfig.isolationLevel, partition, position, node});
        }
        LinkedHashMap<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<Node, FetchSessionHandler.FetchRequestData>();
        for (Map.Entry entry : fetchable.entrySet()) {
            reqs.put((Node)entry.getKey(), ((FetchSessionHandler.Builder)entry.getValue()).build());
        }
        return reqs;
    }

    protected void maybeCloseFetchSessions(Timer timer) {
        ArrayList<RequestFuture<ClientResponse>> requestFutures = new ArrayList<RequestFuture<ClientResponse>>();
        Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = this.prepareCloseFetchSessionRequests();
        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
            final Node fetchTarget = entry.getKey();
            final FetchSessionHandler.FetchRequestData data = entry.getValue();
            FetchRequest.Builder request = this.createFetchRequest(fetchTarget, data);
            RequestFuture<ClientResponse> responseFuture = this.client.send(fetchTarget, request);
            responseFuture.addListener(new RequestFutureListener<ClientResponse>(){

                @Override
                public void onSuccess(ClientResponse value) {
                    AbstractFetch.this.handleCloseFetchSessionResponse(fetchTarget, data);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    AbstractFetch.this.handleCloseFetchSessionResponse(fetchTarget, data, e);
                }
            });
            requestFutures.add(responseFuture);
        }
        while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.client.poll(timer, null, true);
        }
        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.log.debug("All requests couldn't be sent in the specific timeout period {}ms. This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for KafkaConsumer.close(Duration timeout)", (Object)timer.timeoutMs());
        }
    }

    protected FetchSessionHandler sessionHandler(int node) {
        return this.sessionHandlers.get(node);
    }

    protected void closeInternal(Timer timer) {
        this.client.disableWakeups();
        this.maybeCloseFetchSessions(timer);
        Utils.closeQuietly(this.fetchBuffer, "fetchBuffer");
        Utils.closeQuietly(this.decompressionBufferSupplier, "decompressionBufferSupplier");
    }

    public void close(Timer timer) {
        this.idempotentCloser.close(() -> this.closeInternal(timer));
    }

    @Override
    public void close() {
        this.close(this.time.timer(0L));
    }
}

