/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.flume.source;

import java.util.HashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.pulsar.io.flume.source.AbstractSinkOfFlume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkOfFlume
extends AbstractSinkOfFlume
implements Configurable,
BatchSizeSupported {
    private static final Logger LOG = LoggerFactory.getLogger(SinkOfFlume.class);
    private long batchSize;
    private SinkCounter counter = null;

    public void configure(Context context) {
        this.batchSize = context.getInteger("batchSize", Integer.valueOf(1000)).intValue();
    }

    public long getBatchSize() {
        return this.batchSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status result;
        block16: {
            result = Sink.Status.READY;
            Channel channel = this.getChannel();
            Event event = null;
            try (Transaction transaction = null;){
                transaction = channel.getTransaction();
                transaction.begin();
                for (long processedEvents = 0L; processedEvents < this.batchSize && (event = channel.take()) != null; ++processedEvents) {
                    if (processedEvents == 0L) {
                        result = Sink.Status.BACKOFF;
                        this.counter.incrementBatchEmptyCount();
                    } else if (processedEvents < this.batchSize) {
                        this.counter.incrementBatchUnderflowCount();
                    } else {
                        this.counter.incrementBatchCompleteCount();
                    }
                    event.getHeaders();
                    event.getBody();
                    HashMap<String, Object> m = new HashMap<String, Object>();
                    m.put("headers", event.getHeaders());
                    m.put("body", event.getBody());
                    records.put(m);
                }
                transaction.commit();
            }
        }
        return result;
    }

    public synchronized void start() {
        records = new LinkedBlockingQueue();
        this.counter = new SinkCounter("flume-sink");
    }

    public synchronized void stop() {
    }
}

