/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.druid;

import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.api.druid.DruidTranquilityService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"druid", "timeseries", "olap", "ingest", "put", "record"})
@CapabilityDescription(value="Sends records to Druid for Indexing. Leverages Druid Tranquility Controller service.")
@WritesAttribute(attribute="record.count", description="The number of messages that were sent to Druid for this FlowFile. FlowFiles on the success relationship will have a value of this attribute that indicates the number of records successfully processed by Druid, and the FlowFile content will be only the successful records. This behavior applies to the failure and dropped relationships as well.")
public class PutDruidRecord
extends AbstractProcessor {
    static final String RECORD_COUNT = "record.count";
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("putdruid-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("putdruid-record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data to outgoing relationships.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder().name("putdruid-tranquility-service").displayName("Tranquility Service").description("Tranquility Service to use for sending events to Druid.").required(true).identifiesControllerService(DruidTranquilityService.class).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles are routed to this relationship when they are successfully processed by Druid").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when they cannot be parsed or otherwise processed by Druid").build();
    static final Relationship REL_DROPPED = new Relationship.Builder().name("dropped").description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...").build();

    public void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER_FACTORY);
        properties.add(RECORD_WRITER_FACTORY);
        properties.add(DRUID_TRANQUILITY_SERVICE);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_DROPPED);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        RecordSetWriter successfulRecordWriter;
        RecordSetWriter failedRecordWriter;
        RecordSetWriter droppedRecordWriter;
        final ComponentLog log = this.getLogger();
        DruidTranquilityService tranquilityController = (DruidTranquilityService)context.getProperty(DRUID_TRANQUILITY_SERVICE).asControllerService(DruidTranquilityService.class);
        Tranquilizer tranquilizer = tranquilityController.getTranquilizer();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        FlowFile droppedFlowFile = session.create(flowFile);
        final AtomicInteger droppedFlowFileCount = new AtomicInteger(0);
        FlowFile failedFlowFile = session.create(flowFile);
        final AtomicInteger failedFlowFileCount = new AtomicInteger(0);
        FlowFile successfulFlowFile = session.create(flowFile);
        final AtomicInteger successfulFlowFileCount = new AtomicInteger(0);
        final AtomicInteger recordWriteErrors = new AtomicInteger(0);
        int recordCount = 0;
        OutputStream droppedOutputStream = session.write(droppedFlowFile);
        OutputStream failedOutputStream = session.write(failedFlowFile);
        OutputStream successfulOutputStream = session.write(successfulFlowFile);
        try (InputStream in = session.read(flowFile);){
            Record r;
            RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
            RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
            Map attributes = flowFile.getAttributes();
            RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, this.getLogger());
            RecordSchema outSchema = writerFactory.getSchema(attributes, reader.getSchema());
            droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream, flowFile);
            droppedRecordWriter.beginRecordSet();
            failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream, flowFile);
            failedRecordWriter.beginRecordSet();
            successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream, flowFile);
            successfulRecordWriter.beginRecordSet();
            while ((r = reader.nextRecord()) != null) {
                final Record record = r;
                ++recordCount;
                Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)r, (DataType)RecordFieldType.RECORD.getRecordDataType(r.getSchema()));
                log.debug("Tranquilizer Status: {}", new Object[]{tranquilizer.status().toString()});
                Future future = tranquilizer.send((Object)contentMap);
                log.debug("Sent Payload to Druid: {}", new Object[]{contentMap});
                future.addEventListener((FutureEventListener)new FutureEventListener<Object>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onFailure(Throwable cause) {
                        if (cause instanceof MessageDroppedException) {
                            log.debug("Record Dropped due to MessageDroppedException: {}, transferring record to dropped.", new Object[]{cause.getMessage()}, cause);
                            try {
                                RecordSetWriter recordSetWriter = droppedRecordWriter;
                                synchronized (recordSetWriter) {
                                    droppedRecordWriter.write(record);
                                    droppedRecordWriter.flush();
                                    droppedFlowFileCount.incrementAndGet();
                                }
                            }
                            catch (IOException ioe) {
                                log.error("Error transferring record to dropped, this may result in data loss.", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                                recordWriteErrors.incrementAndGet();
                            }
                        } else {
                            log.error("FlowFile Processing Failed due to: {}", new Object[]{cause.getMessage()}, cause);
                            try {
                                RecordSetWriter ioe = failedRecordWriter;
                                synchronized (ioe) {
                                    failedRecordWriter.write(record);
                                    failedRecordWriter.flush();
                                    failedFlowFileCount.incrementAndGet();
                                }
                            }
                            catch (IOException ioe) {
                                log.error("Error transferring record to failure, this may result in data loss.", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                                recordWriteErrors.incrementAndGet();
                            }
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onSuccess(Object value) {
                        log.debug(" FlowFile Processing Success: {}", new Object[]{value.toString()});
                        try {
                            RecordSetWriter recordSetWriter = successfulRecordWriter;
                            synchronized (recordSetWriter) {
                                successfulRecordWriter.write(record);
                                successfulRecordWriter.flush();
                                successfulFlowFileCount.incrementAndGet();
                            }
                        }
                        catch (IOException ioe) {
                            log.error("Error transferring record to success, this may result in data loss. However the record was successfully processed by Druid", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                            recordWriteErrors.incrementAndGet();
                        }
                    }
                });
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            log.error("FlowFile Processing Failed due to: {}", new Object[]{e.getMessage()}, e);
            flowFile = session.putAttribute(flowFile, RECORD_COUNT, Integer.toString(recordCount));
            session.transfer(flowFile, REL_FAILURE);
            try {
                droppedOutputStream.close();
                session.remove(droppedFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with dropped records.", (Throwable)ioe);
            }
            try {
                failedOutputStream.close();
                session.remove(failedFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with failed records.", (Throwable)ioe);
            }
            try {
                successfulOutputStream.close();
                session.remove(successfulFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with successful records.", (Throwable)ioe);
            }
            session.commitAsync();
            return;
        }
        if (recordCount == 0) {
            flowFile = session.putAttribute(flowFile, RECORD_COUNT, "0");
            session.transfer(flowFile, REL_SUCCESS);
            try {
                droppedOutputStream.close();
                session.remove(droppedFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with dropped records.", (Throwable)ioe);
            }
            try {
                failedOutputStream.close();
                session.remove(failedFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with failed records.", (Throwable)ioe);
            }
            try {
                successfulOutputStream.close();
                session.remove(successfulFlowFile);
            }
            catch (IOException ioe) {
                log.error("Error closing output stream for FlowFile with successful records.", (Throwable)ioe);
            }
        } else {
            while (recordCount != droppedFlowFileCount.get() + failedFlowFileCount.get() + successfulFlowFileCount.get() + recordWriteErrors.get()) {
                Thread.yield();
            }
            try {
                droppedRecordWriter.finishRecordSet();
                droppedRecordWriter.close();
            }
            catch (IOException ioe) {
                log.error("Error closing FlowFile with dropped records: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                session.rollback();
                throw new ProcessException((Throwable)ioe);
            }
            if (droppedFlowFileCount.get() > 0) {
                droppedFlowFile = session.putAttribute(droppedFlowFile, RECORD_COUNT, Integer.toString(droppedFlowFileCount.get()));
                session.transfer(droppedFlowFile, REL_DROPPED);
            } else {
                session.remove(droppedFlowFile);
            }
            try {
                failedRecordWriter.finishRecordSet();
                failedRecordWriter.close();
            }
            catch (IOException ioe) {
                log.error("Error closing FlowFile with failed records: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                session.rollback();
                throw new ProcessException((Throwable)ioe);
            }
            if (failedFlowFileCount.get() > 0) {
                failedFlowFile = session.putAttribute(failedFlowFile, RECORD_COUNT, Integer.toString(failedFlowFileCount.get()));
                session.transfer(failedFlowFile, REL_FAILURE);
            } else {
                session.remove(failedFlowFile);
            }
            try {
                successfulRecordWriter.finishRecordSet();
                successfulRecordWriter.close();
            }
            catch (IOException ioe) {
                log.error("Error closing FlowFile with successful records: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                session.rollback();
                throw new ProcessException((Throwable)ioe);
            }
            if (successfulFlowFileCount.get() > 0) {
                successfulFlowFile = session.putAttribute(successfulFlowFile, RECORD_COUNT, Integer.toString(successfulFlowFileCount.get()));
                session.transfer(successfulFlowFile, REL_SUCCESS);
                session.getProvenanceReporter().send(successfulFlowFile, tranquilityController.getTransitUri());
            } else {
                session.remove(successfulFlowFile);
            }
            session.remove(flowFile);
        }
    }
}

