/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.metrics2.impl;

import com.google.common.base.Preconditions;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.MetricsFilter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.impl.MetricsBuffer;
import org.apache.hadoop.metrics2.impl.MetricsRecordFiltered;
import org.apache.hadoop.metrics2.impl.MetricsRecordImpl;
import org.apache.hadoop.metrics2.impl.SinkQueue;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.metrics2.util.Contracts;
import org.apache.hadoop.util.Time;

class MetricsSinkAdapter
implements SinkQueue.Consumer<MetricsBuffer> {
    private final Log LOG = LogFactory.getLog(MetricsSinkAdapter.class);
    private final String name;
    private final String description;
    private final String context;
    private final MetricsSink sink;
    private final MetricsFilter sourceFilter;
    private final MetricsFilter recordFilter;
    private final MetricsFilter metricFilter;
    private final SinkQueue<MetricsBuffer> queue;
    private final Thread sinkThread;
    private volatile boolean stopping = false;
    private volatile boolean inError = false;
    private final int period;
    private final int firstRetryDelay;
    private final int retryCount;
    private final long oobPutTimeout;
    private final float retryBackoff;
    private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
    private final MutableStat latency;
    private final MutableCounterInt dropped;
    private final MutableGaugeInt qsize;

    MetricsSinkAdapter(String name, String description, MetricsSink sink, String context, MetricsFilter sourceFilter, MetricsFilter recordFilter, MetricsFilter metricFilter, int period, int queueCapacity, int retryDelay, float retryBackoff, int retryCount) {
        this.name = (String)Preconditions.checkNotNull((Object)name, (Object)"name");
        this.description = description;
        this.sink = (MetricsSink)Preconditions.checkNotNull((Object)sink, (Object)"sink object");
        this.context = context;
        this.sourceFilter = sourceFilter;
        this.recordFilter = recordFilter;
        this.metricFilter = metricFilter;
        this.period = Contracts.checkArg(period, period > 0, (Object)"period");
        this.firstRetryDelay = Contracts.checkArg(retryDelay, retryDelay > 0, (Object)"retry delay");
        this.retryBackoff = Contracts.checkArg(retryBackoff, retryBackoff > 1.0f, (Object)"retry backoff");
        this.oobPutTimeout = (long)((double)this.firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000.0);
        this.retryCount = retryCount;
        this.queue = new SinkQueue(Contracts.checkArg(queueCapacity, queueCapacity > 0, (Object)"queue capacity"));
        this.latency = this.registry.newRate("Sink_" + name, "Sink end to end latency", false);
        this.dropped = this.registry.newCounter("Sink_" + name + "Dropped", "Dropped updates per sink", 0);
        this.qsize = this.registry.newGauge("Sink_" + name + "Qsize", "Queue size", 0);
        this.sinkThread = new Thread(){

            @Override
            public void run() {
                MetricsSinkAdapter.this.publishMetricsFromQueue();
            }
        };
        this.sinkThread.setName(name);
        this.sinkThread.setDaemon(true);
    }

    boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
        if (logicalTime % (long)this.period == 0L) {
            this.LOG.debug((Object)("enqueue, logicalTime=" + logicalTime));
            if (this.queue.enqueue(buffer)) {
                return true;
            }
            this.dropped.incr();
            return false;
        }
        return true;
    }

    public boolean putMetricsImmediate(MetricsBuffer buffer) {
        WaitableMetricsBuffer waitableBuffer = new WaitableMetricsBuffer(buffer);
        if (!this.queue.enqueue(waitableBuffer)) {
            this.LOG.warn((Object)(this.name + " has a full queue and can't consume the given metrics."));
            this.dropped.incr();
            return false;
        }
        if (!waitableBuffer.waitTillNotified(this.oobPutTimeout)) {
            this.LOG.warn((Object)(this.name + " couldn't fulfill an immediate putMetrics request in time." + " Abandoning."));
            return false;
        }
        return true;
    }

    void publishMetricsFromQueue() {
        int retryDelay = this.firstRetryDelay;
        int n = this.retryCount;
        int minDelay = Math.min(500, retryDelay * 1000);
        Random rng = new Random(System.nanoTime());
        while (!this.stopping) {
            try {
                this.queue.consumeAll(this);
                retryDelay = this.firstRetryDelay;
                n = this.retryCount;
                this.inError = false;
            }
            catch (InterruptedException e) {
                this.LOG.info((Object)(this.name + " thread interrupted."));
            }
            catch (Exception e) {
                if (n > 0) {
                    int retryWindow = Math.max(0, 500 * retryDelay - minDelay);
                    int awhile = rng.nextInt(retryWindow) + minDelay;
                    if (!this.inError) {
                        this.LOG.error((Object)("Got sink exception, retry in " + awhile + "ms"), (Throwable)e);
                    }
                    retryDelay = (int)((float)retryDelay * this.retryBackoff);
                    try {
                        Thread.sleep(awhile);
                    }
                    catch (InterruptedException e2) {
                        this.LOG.info((Object)(this.name + " thread interrupted while waiting for retry"), (Throwable)e2);
                    }
                    --n;
                    continue;
                }
                if (!this.inError) {
                    this.LOG.error((Object)"Got sink exception and over retry limit, suppressing further error messages", (Throwable)e);
                }
                this.queue.clear();
                this.inError = true;
            }
        }
    }

    @Override
    public void consume(MetricsBuffer buffer) {
        long ts = 0L;
        for (MetricsBuffer.Entry entry : buffer) {
            if (this.sourceFilter != null && !this.sourceFilter.accepts(entry.name())) continue;
            for (MetricsRecordImpl record : entry.records()) {
                if (this.context != null && !this.context.equals(record.context()) || this.recordFilter != null && !this.recordFilter.accepts(record)) continue;
                if (this.LOG.isDebugEnabled()) {
                    this.LOG.debug((Object)("Pushing record " + entry.name() + "." + record.context() + "." + record.name() + " to " + this.name));
                }
                this.sink.putMetrics(this.metricFilter == null ? record : new MetricsRecordFiltered(record, this.metricFilter));
                if (ts != 0L) continue;
                ts = record.timestamp();
            }
        }
        if (ts > 0L) {
            this.sink.flush();
            this.latency.add(Time.now() - ts);
        }
        if (buffer instanceof WaitableMetricsBuffer) {
            ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
        }
        this.LOG.debug((Object)"Done");
    }

    void start() {
        this.sinkThread.start();
        this.LOG.info((Object)("Sink " + this.name + " started"));
    }

    void stop() {
        this.stopping = true;
        this.sinkThread.interrupt();
        try {
            this.sinkThread.join();
        }
        catch (InterruptedException e) {
            this.LOG.warn((Object)"Stop interrupted", (Throwable)e);
        }
    }

    String name() {
        return this.name;
    }

    String description() {
        return this.description;
    }

    void snapshot(MetricsRecordBuilder rb, boolean all) {
        this.registry.snapshot(rb, all);
    }

    MetricsSink sink() {
        return this.sink;
    }

    static class WaitableMetricsBuffer
    extends MetricsBuffer {
        private final Semaphore notificationSemaphore = new Semaphore(0);

        public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
            super(metricsBuffer);
        }

        public boolean waitTillNotified(long millisecondsToWait) {
            try {
                return this.notificationSemaphore.tryAcquire(millisecondsToWait, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                return false;
            }
        }

        public void notifyAnyWaiters() {
            this.notificationSemaphore.release();
        }
    }
}

