/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.ProduceTask;
import io.confluent.kafkarest.RestProducer;
import io.confluent.kafkarest.converters.AvroConverter;
import io.confluent.kafkarest.converters.ConversionException;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.SchemaHolder;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class AvroRestProducer
implements RestProducer<JsonNode, JsonNode> {
    protected final KafkaProducer<Object, Object> producer;
    protected final KafkaAvroSerializer keySerializer;
    protected final KafkaAvroSerializer valueSerializer;
    protected final Map<Schema, Integer> schemaIdCache;

    public AvroRestProducer(KafkaProducer<Object, Object> producer, KafkaAvroSerializer keySerializer, KafkaAvroSerializer valueSerializer) {
        this.producer = producer;
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        this.schemaIdCache = new ConcurrentHashMap<Schema, Integer>(100);
    }

    @Override
    public void produce(ProduceTask task, String topic, Integer partition, Collection<? extends ProduceRecord<JsonNode, JsonNode>> records) {
        SchemaHolder schemaHolder = task.getSchemaHolder();
        Schema keySchema = null;
        Schema valueSchema = null;
        Integer keySchemaId = schemaHolder.getKeySchemaId();
        Integer valueSchemaId = schemaHolder.getValueSchemaId();
        try {
            if (keySchemaId != null) {
                keySchema = this.keySerializer.getById(keySchemaId.intValue());
            } else if (schemaHolder.getKeySchema() != null) {
                keySchema = new Schema.Parser().parse(schemaHolder.getKeySchema());
                if (this.schemaIdCache.containsKey(keySchema)) {
                    keySchemaId = this.schemaIdCache.get(keySchema);
                    keySchema = this.keySerializer.getById(keySchemaId.intValue());
                } else {
                    keySchemaId = this.keySerializer.register(topic + "-key", keySchema);
                    this.schemaIdCache.put(keySchema, keySchemaId);
                }
            }
            if (valueSchemaId != null) {
                valueSchema = this.valueSerializer.getById(valueSchemaId.intValue());
            } else if (schemaHolder.getValueSchema() != null) {
                valueSchema = new Schema.Parser().parse(schemaHolder.getValueSchema());
                if (this.schemaIdCache.containsKey(valueSchema)) {
                    valueSchemaId = this.schemaIdCache.get(valueSchema);
                    valueSchema = this.valueSerializer.getById(valueSchemaId.intValue());
                } else {
                    valueSchemaId = this.valueSerializer.register(topic + "-value", valueSchema);
                    this.schemaIdCache.put(valueSchema, valueSchemaId);
                }
            }
        }
        catch (RestClientException e) {
            throw new RestException("Schema registration or lookup failed", 408, 40801, (Throwable)e);
        }
        catch (SchemaParseException e) {
            throw Errors.invalidSchemaException((SchemaParseException)e);
        }
        catch (IOException e) {
            throw new RestException("Schema registration or lookup failed", 408, 40801, (Throwable)e);
        }
        task.setSchemaIds(keySchemaId, valueSchemaId);
        ArrayList<ProducerRecord> kafkaRecords = new ArrayList<ProducerRecord>();
        try {
            for (ProduceRecord<JsonNode, JsonNode> produceRecord : records) {
                Object key = keySchema != null ? AvroConverter.toAvro((JsonNode)produceRecord.getKey(), (Schema)keySchema) : null;
                Object value = valueSchema != null ? AvroConverter.toAvro((JsonNode)produceRecord.getValue(), (Schema)valueSchema) : null;
                Integer recordPartition = partition;
                if (recordPartition == null) {
                    recordPartition = produceRecord.partition();
                }
                kafkaRecords.add(new ProducerRecord(topic, recordPartition, key, value));
            }
        }
        catch (ConversionException e) {
            throw Errors.jsonAvroConversionException((Throwable)e);
        }
        for (ProducerRecord producerRecord : kafkaRecords) {
            this.producer.send(producerRecord, task.createCallback());
        }
    }

    @Override
    public void close() {
        this.producer.close();
    }
}

