/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.servo.publish.atlas;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.PrettyPrinter;
import com.netflix.archaius.api.Config;
import com.netflix.archaius.config.EmptyConfig;
import com.netflix.servo.Metric;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.BasicCounter;
import com.netflix.servo.monitor.BasicGauge;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Gauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Pollers;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import com.netflix.servo.publish.MetricObserver;
import com.netflix.servo.publish.atlas.AtlasPrettyPrinter;
import com.netflix.servo.publish.atlas.HttpHelper;
import com.netflix.servo.publish.atlas.JsonPayload;
import com.netflix.servo.publish.atlas.ServoAtlasConfig;
import com.netflix.servo.publish.atlas.UpdateRequest;
import com.netflix.servo.publish.atlas.ValidCharacters;
import com.netflix.servo.tag.BasicTag;
import com.netflix.servo.tag.BasicTagList;
import com.netflix.servo.tag.Tag;
import com.netflix.servo.tag.TagList;
import iep.com.netflix.iep.http.BasicServerRegistry;
import iep.com.netflix.iep.http.RxHttp;
import iep.com.netflix.iep.http.ServerRegistry;
import iep.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

public class AtlasMetricObserver
implements MetricObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(AtlasMetricObserver.class);
    private static final Tag ATLAS_COUNTER_TAG = new BasicTag("atlas.dstype", "counter");
    private static final Tag ATLAS_GAUGE_TAG = new BasicTag("atlas.dstype", "gauge");
    private static final UpdateTasks NO_TASKS = new UpdateTasks(0, null, -1L);
    private static final String FILE_DATE_FORMAT = "yyyy_MM_dd_HH_mm_ss_SSS";
    private final JsonFactory jsonFactory = new JsonFactory();
    protected final HttpHelper httpHelper;
    protected final ServoAtlasConfig config;
    protected final long sendTimeoutMs;
    protected final long stepMs;
    private final Counter numMetricsTotal = Monitors.newCounter((String)"numMetricsTotal");
    private final Timer updateTimer = Monitors.newTimer((String)"update");
    private final Counter numMetricsDroppedSendTimeout = AtlasMetricObserver.newErrCounter("numMetricsDropped", "sendTimeout");
    private final Counter numMetricsDroppedQueueFull = AtlasMetricObserver.newErrCounter("numMetricsDropped", "sendQueueFull");
    private final Counter numMetricsDroppedHttpErr = AtlasMetricObserver.newErrCounter("numMetricsDropped", "httpError");
    private final Counter numMetricsSent = Monitors.newCounter((String)"numMetricsSent");
    private final TagList commonTags;
    private final BlockingQueue<UpdateTasks> pushQueue;
    private final Gauge<Integer> pushQueueSize = new BasicGauge(MonitorConfig.builder((String)"pushQueue").build(), (Callable)new Callable<Integer>(){

        @Override
        public Integer call() throws Exception {
            return AtlasMetricObserver.this.pushQueue.size();
        }
    });
    private final Thread pushThread;
    private final AtomicBoolean shouldPushMetrics = new AtomicBoolean(true);

    protected boolean shouldDumpPayload() {
        return false;
    }

    protected String getPayloadDirectory() {
        String tmp = System.getProperty("java.io.tmpdir");
        return tmp != null ? tmp : "/tmp";
    }

    public AtlasMetricObserver(ServoAtlasConfig config, TagList commonTags) {
        this(config, commonTags, 0);
    }

    public AtlasMetricObserver(ServoAtlasConfig config, TagList commonTags, int pollerIdx) {
        this(config, commonTags, pollerIdx, new HttpHelper(new RxHttp((Config)EmptyConfig.INSTANCE, (ServerRegistry)new BasicServerRegistry())));
    }

    public AtlasMetricObserver(ServoAtlasConfig config, TagList commonTags, int pollerIdx, HttpHelper httpHelper) {
        this.httpHelper = httpHelper;
        this.config = config;
        this.stepMs = (Long)Pollers.getPollingIntervals().get(pollerIdx);
        this.sendTimeoutMs = this.stepMs * 9L / 10L;
        this.commonTags = commonTags;
        this.pushQueue = new LinkedBlockingQueue<UpdateTasks>(config.getPushQueueSize());
        this.pushThread = new Thread((Runnable)new PushProcessor(), "BaseAtlasMetricObserver-Push");
        this.pushThread.setDaemon(true);
        this.pushThread.start();
    }

    public void stop() {
        this.shouldPushMetrics.set(false);
        this.pushThread.interrupt();
    }

    protected static Counter newErrCounter(String name, String err) {
        return new BasicCounter(MonitorConfig.builder((String)name).withTag("error", err).build());
    }

    protected static Metric asGauge(Metric m) {
        return new Metric(m.getConfig().withAdditionalTag(ATLAS_GAUGE_TAG), m.getTimestamp(), m.getValue());
    }

    protected static Metric asCounter(Metric m) {
        return new Metric(m.getConfig().withAdditionalTag(ATLAS_COUNTER_TAG), m.getTimestamp(), m.getValue());
    }

    protected static boolean isCounter(Metric m) {
        TagList tags = m.getConfig().getTags();
        String value = tags.getValue(DataSourceType.KEY);
        return value != null && value.equals(DataSourceType.COUNTER.name());
    }

    protected static boolean isGauge(Metric m) {
        TagList tags = m.getConfig().getTags();
        String value = tags.getValue(DataSourceType.KEY);
        return value != null && value.equals(DataSourceType.GAUGE.name());
    }

    protected static boolean isRate(Metric m) {
        TagList tags = m.getConfig().getTags();
        String value = tags.getValue(DataSourceType.KEY);
        return DataSourceType.RATE.name().equals(value) || DataSourceType.NORMALIZED.name().equals(value);
    }

    protected static List<Metric> identifyDsTypes(List<Metric> metrics) {
        return metrics.stream().map(m -> AtlasMetricObserver.isRate(m) ? m : AtlasMetricObserver.asGauge(m)).collect(Collectors.toList());
    }

    public String getName() {
        return "atlas";
    }

    private List<Metric> identifyCountersForPush(List<Metric> metrics) {
        ArrayList<Metric> transformed = new ArrayList<Metric>(metrics.size());
        Iterator<Metric> iterator = metrics.iterator();
        while (iterator.hasNext()) {
            Metric m;
            Metric toAdd = m = iterator.next();
            if (AtlasMetricObserver.isCounter(m)) {
                toAdd = AtlasMetricObserver.asCounter(m);
            } else if (AtlasMetricObserver.isGauge(m)) {
                toAdd = AtlasMetricObserver.asGauge(m);
            }
            transformed.add(toAdd);
        }
        return transformed;
    }

    public void push(List<Metric> rawMetrics) {
        int attempts;
        List<Metric> validMetrics = ValidCharacters.toValidValues(this.filter(rawMetrics));
        List<Metric> metrics = this.transformMetrics(validMetrics);
        LOGGER.debug("Scheduling push of {} metrics", (Object)metrics.size());
        UpdateTasks tasks = this.getUpdateTasks(BasicTagList.EMPTY, this.identifyCountersForPush(metrics));
        int maxAttempts = 5;
        for (attempts = 1; !this.pushQueue.offer(tasks) && attempts <= 5; ++attempts) {
            UpdateTasks droppedTasks = (UpdateTasks)this.pushQueue.remove();
            LOGGER.warn("Removing old push task due to queue full. Dropping {} metrics.", (Object)droppedTasks.numMetrics);
            this.numMetricsDroppedQueueFull.increment((long)droppedTasks.numMetrics);
        }
        if (attempts >= 5) {
            LOGGER.error("Unable to push update of {}", (Object)tasks);
            this.numMetricsDroppedQueueFull.increment((long)tasks.numMetrics);
        } else {
            LOGGER.debug("Queued push of {}", (Object)tasks);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendNow(UpdateTasks updateTasks) {
        if (updateTasks.numMetrics == 0) {
            return;
        }
        Stopwatch s = this.updateTimer.start();
        int totalSent = 0;
        try {
            totalSent = this.httpHelper.sendAll(updateTasks.tasks, updateTasks.numMetrics, this.sendTimeoutMs);
            LOGGER.debug("Sent {}/{} metrics to atlas", (Object)totalSent, (Object)updateTasks.numMetrics);
        }
        finally {
            s.stop();
            int dropped = updateTasks.numMetrics - totalSent;
            this.numMetricsDroppedSendTimeout.increment((long)dropped);
        }
    }

    protected boolean shouldIncludeMetric(Metric metric) {
        return true;
    }

    protected List<Metric> filter(List<Metric> metrics) {
        List<Metric> filtered = metrics.stream().filter(this::shouldIncludeMetric).collect(Collectors.toList());
        LOGGER.debug("Filter: input {} metrics, output {} metrics", (Object)metrics.size(), (Object)filtered.size());
        return filtered;
    }

    protected List<Metric> transformMetrics(List<Metric> metrics) {
        return metrics;
    }

    public void update(List<Metric> rawMetrics) {
        List<Metric> valid = ValidCharacters.toValidValues(rawMetrics);
        List<Metric> metrics = AtlasMetricObserver.identifyDsTypes(this.filter(valid));
        List<Metric> transformed = this.transformMetrics(metrics);
        this.sendNow(this.getUpdateTasks(this.getCommonTags(), transformed));
    }

    private UpdateTasks getUpdateTasks(TagList tags, List<Metric> metrics) {
        int i;
        int batchSize;
        if (!this.config.shouldSendMetrics()) {
            LOGGER.debug("Plugin disabled or running on a dev environment. Not sending metrics.");
            return NO_TASKS;
        }
        if (metrics.isEmpty()) {
            LOGGER.debug("metrics list is empty, no data being sent to server");
            return NO_TASKS;
        }
        int numMetrics = metrics.size();
        Metric[] atlasMetrics = new Metric[metrics.size()];
        metrics.toArray(atlasMetrics);
        this.numMetricsTotal.increment((long)numMetrics);
        ArrayList<Observable<Integer>> tasks = new ArrayList<Observable<Integer>>();
        String uri = this.config.getAtlasUri();
        LOGGER.debug("writing {} metrics to atlas ({})", (Object)numMetrics, (Object)uri);
        for (i = 0; i < numMetrics; i += batchSize) {
            int remaining = numMetrics - i;
            batchSize = Math.min(remaining, this.config.batchSize());
            Metric[] batch = new Metric[batchSize];
            System.arraycopy(atlasMetrics, i, batch, 0, batchSize);
            Observable<Integer> sender = this.getSenderObservable(tags, batch);
            tasks.add(sender);
        }
        assert (i == numMetrics);
        LOGGER.debug("succeeded in creating {} observable(s) to send metrics with total size {}", (Object)tasks.size(), (Object)numMetrics);
        return new UpdateTasks(numMetrics * this.getNumberOfCopies(), tasks, System.currentTimeMillis());
    }

    protected int getNumberOfCopies() {
        return 1;
    }

    private String getPayloadPrefix() {
        SimpleDateFormat fmt = new SimpleDateFormat(FILE_DATE_FORMAT);
        fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
        return fmt.format(new Date());
    }

    protected void dumpPayload(File out, JsonPayload payload) throws IOException {
        try (JsonGenerator generator = this.jsonFactory.createGenerator(out, JsonEncoding.UTF8);){
            generator.setPrettyPrinter((PrettyPrinter)new AtlasPrettyPrinter());
            payload.toJson(generator);
        }
    }

    protected Observable<Integer> getSenderObservable(TagList tags, Metric[] batch) {
        UpdateRequest payload = new UpdateRequest(tags, batch, batch.length);
        if (this.shouldDumpPayload()) {
            String prefix = this.getPayloadPrefix();
            try {
                Path tempFile = Files.createTempFile(Paths.get(this.getPayloadDirectory(), new String[0]), prefix, ".json", new FileAttribute[0]);
                this.dumpPayload(tempFile.toFile(), payload);
            }
            catch (IOException ex) {
                LOGGER.debug("Ignoring error writing payload sent to atlas: {}", (Object)ex.getMessage());
            }
        }
        return this.httpHelper.postSmile(this.config.getAtlasUri(), payload).map(this.withBookkeeping(batch.length));
    }

    protected TagList getCommonTags() {
        return this.commonTags;
    }

    protected Func1<HttpClientResponse<ByteBuf>, Integer> withBookkeeping(int batchSize) {
        return response -> {
            boolean ok;
            boolean bl = ok = response.getStatus().code() == 200;
            if (ok) {
                this.numMetricsSent.increment((long)batchSize);
            } else {
                LOGGER.info("Status code: {} - Lost {} metrics", (Object)response.getStatus().code(), (Object)batchSize);
                this.numMetricsDroppedHttpErr.increment((long)batchSize);
            }
            return batchSize;
        };
    }

    private class PushProcessor
    implements Runnable {
        private PushProcessor() {
        }

        @Override
        public void run() {
            while (AtlasMetricObserver.this.shouldPushMetrics.get()) {
                try {
                    AtlasMetricObserver.this.sendNow((UpdateTasks)AtlasMetricObserver.this.pushQueue.take());
                }
                catch (InterruptedException e) {
                    LOGGER.debug("Interrupted trying to get next UpdateTask to push");
                    break;
                }
                catch (Exception t) {
                    LOGGER.info("Caught unexpected exception pushing metrics", (Throwable)t);
                }
            }
        }
    }

    private static class UpdateTasks {
        private final int numMetrics;
        private final List<Observable<Integer>> tasks;
        private final long timestamp;

        UpdateTasks(int numMetrics, List<Observable<Integer>> tasks, long timestamp) {
            this.numMetrics = numMetrics;
            this.tasks = tasks;
            this.timestamp = timestamp;
        }

        public String toString() {
            return "UpdateTasks{numMetrics=" + this.numMetrics + ", tasks=" + this.tasks + ", timestamp=" + this.timestamp + '}';
        }
    }
}

