/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.metadata;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.kafka.clients.admin.AdminClient;

@Experimental
public class SingleClusterTopicMetadataService
implements KafkaMetadataService {
    private final String kafkaClusterId;
    private final Properties properties;
    private transient AdminClient adminClient;

    public SingleClusterTopicMetadataService(String kafkaClusterId, Properties properties) {
        this.kafkaClusterId = kafkaClusterId;
        this.properties = properties;
    }

    @Override
    public Set<KafkaStream> getAllStreams() {
        try {
            return ((Set)this.getAdminClient().listTopics().names().get()).stream().map(this::createKafkaStream).collect(Collectors.toSet());
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Fetching all streams failed", e);
        }
    }

    @Override
    public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
        try {
            return ((Map)this.getAdminClient().describeTopics(new ArrayList<String>(streamIds)).all().get()).keySet().stream().collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Fetching all streams failed", e);
        }
    }

    private KafkaStream createKafkaStream(String topic) {
        ClusterMetadata clusterMetadata = new ClusterMetadata(Collections.singleton(topic), this.properties);
        return new KafkaStream(topic, Collections.singletonMap(this.kafkaClusterId, clusterMetadata));
    }

    private AdminClient getAdminClient() {
        if (this.adminClient == null) {
            Properties adminClientProps = new Properties();
            KafkaPropertiesUtil.copyProperties(this.properties, adminClientProps);
            String clientIdPrefix = adminClientProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
            adminClientProps.setProperty("client.id", clientIdPrefix + "-single-cluster-topic-metadata-service");
            this.adminClient = AdminClient.create((Properties)adminClientProps);
        }
        return this.adminClient;
    }

    @Override
    public boolean isClusterActive(String kafkaClusterId) {
        return this.kafkaClusterId.equals(kafkaClusterId);
    }

    @Override
    public void close() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }
}

