/*
 * Decompiled with CFR 0.152.
 */
package org.apache.htrace.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Scribe;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.zipkin.HTraceToZipkinConverter;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class ZipkinSpanReceiver
implements SpanReceiver {
    private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
    private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
    private static final int DEFAULT_COLLECTOR_PORT = 9410;
    private static final String CATEGORY = "zipkin";
    private static final boolean DEFAULT_IN_CLIENT_MODE = false;
    private static final int SHUTDOWN_TIMEOUT = 30;
    private static final int MAX_SPAN_BATCH_SIZE = 100;
    private static final int MAX_ERRORS = 10;
    private final BlockingQueue<Span> queue;
    private final TProtocolFactory protocolFactory;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ThreadFactory tf;
    private HTraceToZipkinConverter converter;
    private ExecutorService service;
    private HTraceConfiguration conf;
    private String collectorHostname;
    private int collectorPort;

    public ZipkinSpanReceiver(HTraceConfiguration conf) {
        this.queue = new ArrayBlockingQueue<Span>(1000);
        this.protocolFactory = new TBinaryProtocol.Factory();
        this.tf = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("zipkinSpanReceiver-%d").build();
        this.configure(conf);
    }

    private void configure(HTraceConfiguration conf) {
        this.conf = conf;
        this.collectorHostname = conf.get("zipkin.collector-hostname", DEFAULT_COLLECTOR_HOSTNAME);
        this.collectorPort = conf.getInt("zipkin.collector-port", 9410);
        this.initConverter();
        int numThreads = conf.getInt("zipkin.num-threads", 1);
        if (this.service != null) {
            this.service.shutdownNow();
            this.service = null;
        }
        this.service = Executors.newFixedThreadPool(numThreads, this.tf);
        for (int i = 0; i < numThreads; ++i) {
            this.service.submit(new WriteSpanRunnable());
        }
    }

    private void initConverter() {
        InetAddress tracedServiceHostname = null;
        try {
            String host = this.conf.get("zipkin.traced-service-hostname", InetAddress.getLocalHost().getHostAddress());
            tracedServiceHostname = InetAddress.getByName(host);
        }
        catch (UnknownHostException e) {
            LOG.error((Object)"Couldn't get the localHost address", (Throwable)e);
        }
        short tracedServicePort = (short)this.conf.getInt("zipkin.traced-service-port", -1);
        byte[] address = tracedServiceHostname != null ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
        int ipv4 = ByteBuffer.wrap(address).getInt();
        this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
    }

    public void close() throws IOException {
        this.running.set(false);
        this.service.shutdown();
        try {
            if (!this.service.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.error((Object)("Was not able to process all remaining spans to write upon closing in: 30 " + (Object)((Object)TimeUnit.SECONDS) + ". There could be un-sent spans still left." + "  They have been dropped."));
            }
        }
        catch (InterruptedException e1) {
            LOG.warn((Object)"Thread interrupted when terminating executor.", (Throwable)e1);
        }
    }

    public void receiveSpan(Span span) {
        if (this.running.get()) {
            try {
                this.queue.add(span);
            }
            catch (IllegalStateException e) {
                LOG.error((Object)("Error trying to append span (" + span.getDescription() + ") to the queue." + "  Blocking Queue was full."));
            }
        }
    }

    private class WriteSpanRunnable
    implements Runnable {
        private Scribe.Client scribeClient = null;
        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        private final TProtocol streamProtocol;

        public WriteSpanRunnable() {
            this.streamProtocol = ZipkinSpanReceiver.this.protocolFactory.getProtocol((TTransport)new TIOStreamTransport((OutputStream)this.baos));
        }

        @Override
        public void run() {
            ArrayList<Span> dequeuedSpans = new ArrayList<Span>(100);
            long errorCount = 0L;
            while (ZipkinSpanReceiver.this.running.get() || ZipkinSpanReceiver.this.queue.size() > 0) {
                Span firstSpan = null;
                try {
                    firstSpan = (Span)ZipkinSpanReceiver.this.queue.poll(1L, TimeUnit.SECONDS);
                    if (firstSpan != null) {
                        dequeuedSpans.add(firstSpan);
                        ZipkinSpanReceiver.this.queue.drainTo(dequeuedSpans, 99);
                    }
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                if (dequeuedSpans.isEmpty()) continue;
                if (this.scribeClient == null) {
                    this.startClient();
                }
                ArrayList<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
                try {
                    for (Span htraceSpan : dequeuedSpans) {
                        com.twitter.zipkin.gen.Span zipkinSpan = ZipkinSpanReceiver.this.converter.convert(htraceSpan);
                        this.baos.reset();
                        zipkinSpan.write(this.streamProtocol);
                        LogEntry logEntry = new LogEntry(ZipkinSpanReceiver.CATEGORY, Base64.encodeBase64String((byte[])this.baos.toByteArray()));
                        entries.add(logEntry);
                    }
                    this.scribeClient.Log(entries);
                    dequeuedSpans.clear();
                    errorCount = 0L;
                }
                catch (Exception e) {
                    LOG.error((Object)("Error when writing to the zipkin collector: " + ZipkinSpanReceiver.this.collectorHostname + ":" + ZipkinSpanReceiver.this.collectorPort), (Throwable)e);
                    if (++errorCount < 10L) {
                        try {
                            ZipkinSpanReceiver.this.queue.addAll(dequeuedSpans);
                        }
                        catch (IllegalStateException ex) {
                            LOG.error((Object)("Drop " + dequeuedSpans.size() + " span(s) because queue is full"));
                        }
                    }
                    this.closeClient();
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e1) {}
                }
            }
            this.closeClient();
        }

        private void closeClient() {
            if (this.scribeClient != null) {
                this.scribeClient.getInputProtocol().getTransport().close();
                this.scribeClient = null;
            }
        }

        private void startClient() {
            if (this.scribeClient == null) {
                TFramedTransport transport = new TFramedTransport((TTransport)new TSocket(ZipkinSpanReceiver.this.collectorHostname, ZipkinSpanReceiver.this.collectorPort));
                try {
                    transport.open();
                }
                catch (TTransportException e) {
                    e.printStackTrace();
                }
                TProtocol protocol = ZipkinSpanReceiver.this.protocolFactory.getProtocol((TTransport)transport);
                this.scribeClient = new Scribe.Client(protocol);
            }
        }
    }
}

