/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.admin.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import org.apache.kafka.clients.admin.internals.ApiRequestScope;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class PartitionLeaderStrategy
implements AdminApiLookupStrategy<TopicPartition> {
    private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new ApiRequestScope(){};
    private final Logger log;
    private final boolean tolerateUnknownTopics;

    public PartitionLeaderStrategy(LogContext logContext) {
        this(logContext, true);
    }

    public PartitionLeaderStrategy(LogContext logContext, boolean tolerateUnknownTopics) {
        this.log = logContext.logger(PartitionLeaderStrategy.class);
        this.tolerateUnknownTopics = tolerateUnknownTopics;
    }

    @Override
    public ApiRequestScope lookupScope(TopicPartition key) {
        return SINGLE_REQUEST_SCOPE;
    }

    public MetadataRequest.Builder buildRequest(Set<TopicPartition> partitions) {
        MetadataRequestData request = new MetadataRequestData();
        request.setAllowAutoTopicCreation(false);
        partitions.stream().map(TopicPartition::topic).distinct().forEach(topic -> request.topics().add(new MetadataRequestData.MetadataRequestTopic().setName((String)topic)));
        return new MetadataRequest.Builder(request);
    }

    private void handleTopicError(String topic, Errors topicError, Set<TopicPartition> requestPartitions, Map<TopicPartition, Throwable> failed) {
        switch (topicError) {
            case UNKNOWN_TOPIC_OR_PARTITION: {
                if (!this.tolerateUnknownTopics) {
                    this.log.error("Received unknown topic error for topic {}", (Object)topic, (Object)topicError.exception());
                    this.failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> topicError.exception("Failed to fetch metadata for partition " + String.valueOf(tp) + " because metadata for topic `" + topic + "` could not be found"));
                    break;
                }
            }
            case LEADER_NOT_AVAILABLE: 
            case BROKER_NOT_AVAILABLE: {
                this.log.debug("Metadata request for topic {} returned topic-level error {}. Will retry", (Object)topic, (Object)topicError);
                break;
            }
            case TOPIC_AUTHORIZATION_FAILED: {
                this.log.error("Received authorization failure for topic {} in `Metadata` response", (Object)topic, (Object)topicError.exception());
                this.failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> new TopicAuthorizationException("Failed to fetch metadata for partition " + String.valueOf(tp) + " due to topic authorization failure", Collections.singleton(topic)));
                break;
            }
            case INVALID_TOPIC_EXCEPTION: {
                this.log.error("Received invalid topic error for topic {} in `Metadata` response", (Object)topic, (Object)topicError.exception());
                this.failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> new InvalidTopicException("Failed to fetch metadata for partition " + String.valueOf(tp) + " due to invalid topic `" + topic + "`", Collections.singleton(topic)));
                break;
            }
            default: {
                this.log.error("Received unexpected error for topic {} in `Metadata` response", (Object)topic, (Object)topicError.exception());
                this.failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> topicError.exception("Failed to fetch metadata for partition " + String.valueOf(tp) + " due to unexpected error for topic `" + topic + "`"));
            }
        }
    }

    private void failAllPartitionsForTopic(String topic, Set<TopicPartition> partitions, Map<TopicPartition, Throwable> failed, Function<TopicPartition, Throwable> exceptionGenerator) {
        partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp -> failed.put((TopicPartition)tp, (Throwable)exceptionGenerator.apply((TopicPartition)tp)));
    }

    private void handlePartitionError(TopicPartition topicPartition, Errors partitionError, Map<TopicPartition, Throwable> failed) {
        switch (partitionError) {
            case UNKNOWN_TOPIC_OR_PARTITION: 
            case LEADER_NOT_AVAILABLE: 
            case BROKER_NOT_AVAILABLE: 
            case NOT_LEADER_OR_FOLLOWER: 
            case REPLICA_NOT_AVAILABLE: 
            case KAFKA_STORAGE_ERROR: {
                this.log.debug("Metadata request for partition {} returned partition-level error {}. Will retry", (Object)topicPartition, (Object)partitionError);
                break;
            }
            default: {
                this.log.error("Received unexpected error for partition {} in `Metadata` response", (Object)topicPartition, (Object)partitionError.exception());
                failed.put(topicPartition, partitionError.exception("Unexpected error during metadata lookup for " + String.valueOf(topicPartition)));
            }
        }
    }

    @Override
    public AdminApiLookupStrategy.LookupResult<TopicPartition> handleResponse(Set<TopicPartition> requestPartitions, AbstractResponse abstractResponse) {
        MetadataResponse response = (MetadataResponse)abstractResponse;
        HashMap<TopicPartition, Throwable> failed = new HashMap<TopicPartition, Throwable>();
        HashMap<TopicPartition, Integer> mapped = new HashMap<TopicPartition, Integer>();
        for (MetadataResponseData.MetadataResponseTopic topicMetadata : response.data().topics()) {
            String topic = topicMetadata.name();
            Errors topicError = Errors.forCode(topicMetadata.errorCode());
            if (topicError != Errors.NONE) {
                this.handleTopicError(topic, topicError, requestPartitions, failed);
                continue;
            }
            for (MetadataResponseData.MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
                TopicPartition topicPartition = new TopicPartition(topic, partitionMetadata.partitionIndex());
                Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
                if (!requestPartitions.contains(topicPartition)) continue;
                if (partitionError != Errors.NONE) {
                    this.handlePartitionError(topicPartition, partitionError, failed);
                    continue;
                }
                int leaderId = partitionMetadata.leaderId();
                if (leaderId >= 0) {
                    mapped.put(topicPartition, leaderId);
                    continue;
                }
                this.log.debug("Metadata request for {} returned no error, but the leader is unknown. Will retry", (Object)topicPartition);
            }
        }
        return new AdminApiLookupStrategy.LookupResult<TopicPartition>(failed, mapped);
    }

    public static class PartitionLeaderFuture<V>
    implements AdminApiFuture<TopicPartition, V> {
        private final Set<TopicPartition> requestKeys;
        private final Map<TopicPartition, Integer> partitionLeaderCache;
        private final Map<TopicPartition, KafkaFuture<V>> futures;

        public PartitionLeaderFuture(Set<TopicPartition> requestKeys, Map<TopicPartition, Integer> partitionLeaderCache) {
            this.requestKeys = requestKeys;
            this.partitionLeaderCache = partitionLeaderCache;
            this.futures = requestKeys.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), k -> new KafkaFutureImpl()));
        }

        @Override
        public Set<TopicPartition> lookupKeys() {
            return this.futures.keySet();
        }

        @Override
        public Set<TopicPartition> uncachedLookupKeys() {
            HashSet<TopicPartition> keys = new HashSet<TopicPartition>();
            this.requestKeys.forEach(tp -> {
                if (!this.partitionLeaderCache.containsKey(tp)) {
                    keys.add((TopicPartition)tp);
                }
            });
            return keys;
        }

        @Override
        public Map<TopicPartition, Integer> cachedKeyBrokerIdMapping() {
            HashMap<TopicPartition, Integer> mapping = new HashMap<TopicPartition, Integer>();
            this.requestKeys.forEach(tp -> {
                Integer brokerId = this.partitionLeaderCache.get(tp);
                if (brokerId != null) {
                    mapping.put((TopicPartition)tp, brokerId);
                }
            });
            return mapping;
        }

        public Map<TopicPartition, KafkaFuture<V>> all() {
            return this.futures;
        }

        @Override
        public void complete(Map<TopicPartition, V> values) {
            values.forEach(this::complete);
        }

        private void complete(TopicPartition key, V value) {
            this.futureOrThrow(key).complete(value);
        }

        @Override
        public void completeLookup(Map<TopicPartition, Integer> brokerIdMapping) {
            this.partitionLeaderCache.putAll(brokerIdMapping);
        }

        @Override
        public void completeExceptionally(Map<TopicPartition, Throwable> errors) {
            errors.forEach(this::completeExceptionally);
        }

        private void completeExceptionally(TopicPartition key, Throwable t2) {
            this.partitionLeaderCache.remove(key);
            this.futureOrThrow(key).completeExceptionally(t2);
        }

        private KafkaFutureImpl<V> futureOrThrow(TopicPartition key) {
            KafkaFutureImpl future = (KafkaFutureImpl)this.futures.get(key);
            if (future == null) {
                throw new IllegalArgumentException("Attempt to complete future for " + String.valueOf(key) + ", which was not requested");
            }
            return future;
        }
    }
}

