/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafkarest.AvroRestProducer;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.NoSchemaRestProducer;
import io.confluent.kafkarest.ProduceTask;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.RestConfigUtils;
import io.confluent.kafkarest.RestProducer;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.SchemaHolder;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerPool {
    private static final Logger log = LoggerFactory.getLogger(ProducerPool.class);
    private Map<EmbeddedFormat, RestProducer> producers = new HashMap<EmbeddedFormat, RestProducer>();

    public ProducerPool(KafkaRestConfig appConfig) {
        this(appConfig, null);
    }

    public ProducerPool(KafkaRestConfig appConfig, Properties producerConfigOverrides) {
        this(appConfig, RestConfigUtils.bootstrapBrokers(appConfig), producerConfigOverrides);
    }

    public ProducerPool(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        Map<String, Object> binaryProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
        this.producers.put(EmbeddedFormat.BINARY, this.buildBinaryProducer(binaryProps));
        Map<String, Object> jsonProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
        this.producers.put(EmbeddedFormat.JSON, this.buildJsonProducer(jsonProps));
        Map<String, Object> avroProps = this.buildAvroConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
        this.producers.put(EmbeddedFormat.AVRO, this.buildAvroProducer(avroProps));
    }

    private Map<String, Object> buildStandardConfig(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", bootstrapBrokers);
        Properties producerProps = appConfig.getProducerProperties();
        return this.buildConfig(props, producerProps, producerConfigOverrides);
    }

    private NoSchemaRestProducer<byte[], byte[]> buildBinaryProducer(Map<String, Object> binaryProps) {
        return this.buildNoSchemaProducer(binaryProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    private NoSchemaRestProducer<Object, Object> buildJsonProducer(Map<String, Object> jsonProps) {
        return this.buildNoSchemaProducer(jsonProps, (Serializer)new KafkaJsonSerializer(), (Serializer)new KafkaJsonSerializer());
    }

    private <K, V> NoSchemaRestProducer<K, V> buildNoSchemaProducer(Map<String, Object> props, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        keySerializer.configure(props, true);
        valueSerializer.configure(props, false);
        KafkaProducer producer = new KafkaProducer(props, keySerializer, valueSerializer);
        return new NoSchemaRestProducer(producer);
    }

    private Map<String, Object> buildAvroConfig(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        HashMap<String, Object> avroDefaults = new HashMap<String, Object>();
        avroDefaults.put("bootstrap.servers", bootstrapBrokers);
        avroDefaults.put("schema.registry.url", appConfig.getString("schema.registry.url"));
        Properties producerProps = appConfig.getProducerProperties();
        return this.buildConfig(avroDefaults, producerProps, producerConfigOverrides);
    }

    private AvroRestProducer buildAvroProducer(Map<String, Object> avroProps) {
        KafkaAvroSerializer avroKeySerializer = new KafkaAvroSerializer();
        avroKeySerializer.configure(avroProps, true);
        KafkaAvroSerializer avroValueSerializer = new KafkaAvroSerializer();
        avroValueSerializer.configure(avroProps, false);
        KafkaProducer avroProducer = new KafkaProducer(avroProps, (Serializer)avroKeySerializer, (Serializer)avroValueSerializer);
        return new AvroRestProducer((KafkaProducer<Object, Object>)avroProducer, avroKeySerializer, avroValueSerializer);
    }

    private Map<String, Object> buildConfig(Map<String, Object> defaults, Properties userProps, Properties overrides) {
        HashMap<String, Object> config = new HashMap<String, Object>(defaults);
        for (String propName : userProps.stringPropertyNames()) {
            config.put(propName, userProps.getProperty(propName));
        }
        if (overrides != null) {
            for (String propName : overrides.stringPropertyNames()) {
                config.put(propName, overrides.getProperty(propName));
            }
        }
        return config;
    }

    public <K, V> void produce(String topic, Integer partition, EmbeddedFormat recordFormat, SchemaHolder schemaHolder, Collection<? extends ProduceRecord<K, V>> records, ProduceRequestCallback callback) {
        ProduceTask task = new ProduceTask(schemaHolder, records.size(), callback);
        log.trace("Starting produce task " + task.toString());
        RestProducer restProducer = this.producers.get(recordFormat);
        restProducer.produce(task, topic, partition, records);
    }

    public void shutdown() {
        for (RestProducer restProducer : this.producers.values()) {
            restProducer.close();
        }
    }

    public static interface ProduceRequestCallback {
        public void onCompletion(Integer var1, Integer var2, List<RecordMetadataOrException> var3);
    }
}

