/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals.metrics;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics;
import org.rocksdb.Cache;
import org.rocksdb.HistogramData;
import org.rocksdb.HistogramType;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.TickerType;
import org.slf4j.Logger;

public class RocksDBMetricsRecorder {
    private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
    private final Logger logger;
    private Sensor bytesWrittenToDatabaseSensor;
    private Sensor bytesReadFromDatabaseSensor;
    private Sensor memtableBytesFlushedSensor;
    private Sensor memtableHitRatioSensor;
    private Sensor memtableAvgFlushTimeSensor;
    private Sensor memtableMinFlushTimeSensor;
    private Sensor memtableMaxFlushTimeSensor;
    private Sensor writeStallDurationSensor;
    private Sensor blockCacheDataHitRatioSensor;
    private Sensor blockCacheIndexHitRatioSensor;
    private Sensor blockCacheFilterHitRatioSensor;
    private Sensor bytesReadDuringCompactionSensor;
    private Sensor bytesWrittenDuringCompactionSensor;
    private Sensor compactionTimeAvgSensor;
    private Sensor compactionTimeMinSensor;
    private Sensor compactionTimeMaxSensor;
    private Sensor numberOfOpenFilesSensor;
    private Sensor numberOfFileErrorsSensor;
    private final Map<String, DbAndCacheAndStatistics> storeToValueProviders = new ConcurrentHashMap<String, DbAndCacheAndStatistics>();
    private final String metricsScope;
    private final String storeName;
    private TaskId taskId;
    private StreamsMetricsImpl streamsMetrics;
    private boolean singleCache = true;

    public RocksDBMetricsRecorder(String metricsScope, String storeName) {
        this.metricsScope = metricsScope;
        this.storeName = storeName;
        LogContext logContext = new LogContext(String.format("[RocksDB Metrics Recorder for %s] ", storeName));
        this.logger = logContext.logger(RocksDBMetricsRecorder.class);
    }

    public String storeName() {
        return this.storeName;
    }

    public TaskId taskId() {
        return this.taskId;
    }

    public void init(StreamsMetricsImpl streamsMetrics, TaskId taskId) {
        Objects.requireNonNull(streamsMetrics, "Streams metrics must not be null");
        Objects.requireNonNull(streamsMetrics, "task ID must not be null");
        if (this.taskId != null && !this.taskId.equals(taskId)) {
            throw new IllegalStateException("Metrics recorder is re-initialised with different task: previous task is " + this.taskId + " whereas current task is " + taskId + ". This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
        if (this.streamsMetrics != null && this.streamsMetrics != streamsMetrics) {
            throw new IllegalStateException("Metrics recorder is re-initialised with different Streams metrics. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
        RocksDBMetrics.RocksDBMetricContext metricContext = new RocksDBMetrics.RocksDBMetricContext(taskId.toString(), this.metricsScope, this.storeName);
        this.initSensors(streamsMetrics, metricContext);
        this.initGauges(streamsMetrics, metricContext);
        this.taskId = taskId;
        this.streamsMetrics = streamsMetrics;
    }

    public void addValueProviders(String segmentName, RocksDB db, Cache cache, Statistics statistics) {
        if (this.storeToValueProviders.isEmpty()) {
            this.logger.debug("Adding metrics recorder of task {} to metrics recording trigger", (Object)this.taskId);
            this.streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
        } else if (this.storeToValueProviders.containsKey(segmentName)) {
            throw new IllegalStateException("Value providers for store " + segmentName + " of task " + this.taskId + " has been already added. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
        this.verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
        this.logger.debug("Adding value providers for store {} of task {}", (Object)segmentName, (Object)this.taskId);
        this.storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
    }

    private void verifyDbAndCacheAndStatistics(String segmentName, RocksDB db, Cache cache, Statistics statistics) {
        for (DbAndCacheAndStatistics valueProviders : this.storeToValueProviders.values()) {
            this.verifyConsistencyOfValueProvidersAcrossSegments(segmentName, statistics, valueProviders.statistics, "statistics");
            this.verifyConsistencyOfValueProvidersAcrossSegments(segmentName, cache, valueProviders.cache, "cache");
            if (db == valueProviders.db) {
                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + this.taskId + " was already added for another segment as a value provider. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
            }
            if (this.storeToValueProviders.size() == 1 && cache != valueProviders.cache) {
                this.singleCache = false;
                continue;
            }
            if ((!this.singleCache || cache == valueProviders.cache) && (this.singleCache || cache != valueProviders.cache)) continue;
            throw new IllegalStateException("Caches for store " + this.storeName + " of task " + this.taskId + " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
    }

    private void verifyConsistencyOfValueProvidersAcrossSegments(String segmentName, Object newValueProvider, Object oldValueProvider, String valueProviderName) {
        if (newValueProvider == null && oldValueProvider != null || newValueProvider != null && oldValueProvider == null) {
            char capitalizedFirstChar = valueProviderName.toUpperCase(Locale.US).charAt(0);
            StringBuilder capitalizedValueProviderName = new StringBuilder(valueProviderName);
            capitalizedValueProviderName.setCharAt(0, capitalizedFirstChar);
            throw new IllegalStateException(capitalizedValueProviderName + " for segment " + segmentName + " of task " + this.taskId + " is" + (newValueProvider == null ? " " : " not ") + "null although the " + valueProviderName + " of another segment in this metrics recorder is" + (newValueProvider != null ? " " : " not ") + "null. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
    }

    private void initSensors(StreamsMetricsImpl streamsMetrics, RocksDBMetrics.RocksDBMetricContext metricContext) {
        this.bytesWrittenToDatabaseSensor = RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricContext);
        this.bytesReadFromDatabaseSensor = RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricContext);
        this.memtableBytesFlushedSensor = RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricContext);
        this.memtableHitRatioSensor = RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricContext);
        this.memtableAvgFlushTimeSensor = RocksDBMetrics.memtableAvgFlushTimeSensor(streamsMetrics, metricContext);
        this.memtableMinFlushTimeSensor = RocksDBMetrics.memtableMinFlushTimeSensor(streamsMetrics, metricContext);
        this.memtableMaxFlushTimeSensor = RocksDBMetrics.memtableMaxFlushTimeSensor(streamsMetrics, metricContext);
        this.writeStallDurationSensor = RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricContext);
        this.blockCacheDataHitRatioSensor = RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricContext);
        this.blockCacheIndexHitRatioSensor = RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricContext);
        this.blockCacheFilterHitRatioSensor = RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricContext);
        this.bytesWrittenDuringCompactionSensor = RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricContext);
        this.bytesReadDuringCompactionSensor = RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricContext);
        this.compactionTimeAvgSensor = RocksDBMetrics.compactionTimeAvgSensor(streamsMetrics, metricContext);
        this.compactionTimeMinSensor = RocksDBMetrics.compactionTimeMinSensor(streamsMetrics, metricContext);
        this.compactionTimeMaxSensor = RocksDBMetrics.compactionTimeMaxSensor(streamsMetrics, metricContext);
        this.numberOfOpenFilesSensor = RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricContext);
        this.numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
    }

    private void initGauges(StreamsMetricsImpl streamsMetrics, RocksDBMetrics.RocksDBMetricContext metricContext) {
        RocksDBMetrics.addNumImmutableMemTableMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-immutable-mem-table"));
        RocksDBMetrics.addCurSizeActiveMemTable(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("cur-size-active-mem-table"));
        RocksDBMetrics.addCurSizeAllMemTables(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("cur-size-all-mem-tables"));
        RocksDBMetrics.addSizeAllMemTables(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("size-all-mem-tables"));
        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-entries-active-mem-table"));
        RocksDBMetrics.addNumDeletesActiveMemTableMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-deletes-active-mem-table"));
        RocksDBMetrics.addNumEntriesImmMemTablesMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-entries-imm-mem-tables"));
        RocksDBMetrics.addNumDeletesImmMemTablesMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-deletes-imm-mem-tables"));
        RocksDBMetrics.addMemTableFlushPending(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("mem-table-flush-pending"));
        RocksDBMetrics.addNumRunningFlushesMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-running-flushes"));
        RocksDBMetrics.addCompactionPendingMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("compaction-pending"));
        RocksDBMetrics.addNumRunningCompactionsMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-running-compactions"));
        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("estimate-pending-compaction-bytes"));
        RocksDBMetrics.addTotalSstFilesSizeMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("total-sst-files-size"));
        RocksDBMetrics.addLiveSstFilesSizeMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("live-sst-files-size"));
        RocksDBMetrics.addNumLiveVersionMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("num-live-versions"));
        RocksDBMetrics.addEstimateNumKeysMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("estimate-num-keys"));
        RocksDBMetrics.addEstimateTableReadersMemMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("estimate-table-readers-mem"));
        RocksDBMetrics.addBackgroundErrorsMetric(streamsMetrics, metricContext, this.gaugeToComputeSumOfProperties("background-errors"));
        RocksDBMetrics.addBlockCacheCapacityMetric(streamsMetrics, metricContext, this.gaugeToComputeBlockCacheMetrics("block-cache-capacity"));
        RocksDBMetrics.addBlockCacheUsageMetric(streamsMetrics, metricContext, this.gaugeToComputeBlockCacheMetrics("block-cache-usage"));
        RocksDBMetrics.addBlockCachePinnedUsageMetric(streamsMetrics, metricContext, this.gaugeToComputeBlockCacheMetrics("block-cache-pinned-usage"));
    }

    private Gauge<BigInteger> gaugeToComputeSumOfProperties(String propertyName) {
        return (metricsConfig, now) -> {
            BigInteger result = BigInteger.valueOf(0L);
            for (DbAndCacheAndStatistics valueProvider : this.storeToValueProviders.values()) {
                try {
                    result = result.add(new BigInteger(1, RocksDBMetricsRecorder.longToBytes(valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))));
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
                }
            }
            return result;
        };
    }

    private Gauge<BigInteger> gaugeToComputeBlockCacheMetrics(String propertyName) {
        return (metricsConfig, now) -> {
            BigInteger result = BigInteger.valueOf(0L);
            for (DbAndCacheAndStatistics valueProvider : this.storeToValueProviders.values()) {
                try {
                    if (this.singleCache) {
                        result = new BigInteger(1, RocksDBMetricsRecorder.longToBytes(valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)));
                        break;
                    }
                    result = result.add(new BigInteger(1, RocksDBMetricsRecorder.longToBytes(valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))));
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
                }
            }
            return result;
        };
    }

    private static byte[] longToBytes(long data) {
        ByteBuffer conversionBuffer = ByteBuffer.allocate(8);
        conversionBuffer.putLong(0, data);
        return conversionBuffer.array();
    }

    public void removeValueProviders(String segmentName) {
        this.logger.debug("Removing value providers for store {} of task {}", (Object)segmentName, (Object)this.taskId);
        DbAndCacheAndStatistics removedValueProviders = this.storeToValueProviders.remove(segmentName);
        if (removedValueProviders == null) {
            throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + this.taskId + " could be found. This is a bug in Kafka Streams. Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
        }
        if (this.storeToValueProviders.isEmpty()) {
            this.logger.debug("Removing metrics recorder for store {} of task {} from metrics recording trigger", (Object)this.storeName, (Object)this.taskId);
            this.streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this);
        }
    }

    public void record(long now) {
        this.logger.debug("Recording metrics for store {}", (Object)this.storeName);
        long bytesWrittenToDatabase = 0L;
        long bytesReadFromDatabase = 0L;
        long memtableBytesFlushed = 0L;
        long memtableHits = 0L;
        long memtableMisses = 0L;
        long blockCacheDataHits = 0L;
        long blockCacheDataMisses = 0L;
        long blockCacheIndexHits = 0L;
        long blockCacheIndexMisses = 0L;
        long blockCacheFilterHits = 0L;
        long blockCacheFilterMisses = 0L;
        long writeStallDuration = 0L;
        long bytesWrittenDuringCompaction = 0L;
        long bytesReadDuringCompaction = 0L;
        long numberOfOpenFiles = 0L;
        long numberOfFileErrors = 0L;
        long memtableFlushTimeSum = 0L;
        long memtableFlushTimeCount = 0L;
        double memtableFlushTimeMin = Double.MAX_VALUE;
        double memtableFlushTimeMax = 0.0;
        long compactionTimeSum = 0L;
        long compactionTimeCount = 0L;
        double compactionTimeMin = Double.MAX_VALUE;
        double compactionTimeMax = 0.0;
        boolean shouldRecord = true;
        for (DbAndCacheAndStatistics valueProviders : this.storeToValueProviders.values()) {
            if (valueProviders.statistics == null) {
                shouldRecord = false;
                break;
            }
            bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN);
            bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ);
            memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES);
            memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
            memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
            blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT);
            blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS);
            blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT);
            blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS);
            blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT);
            blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS);
            writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
            bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
            bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
            numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS) - valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
            numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
            HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
            memtableFlushTimeSum += memtableFlushTimeData.getSum();
            memtableFlushTimeCount += memtableFlushTimeData.getCount();
            memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin());
            memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax());
            HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME);
            compactionTimeSum += compactionTimeData.getSum();
            compactionTimeCount += compactionTimeData.getCount();
            compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin());
            compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax());
        }
        if (shouldRecord) {
            this.bytesWrittenToDatabaseSensor.record((double)bytesWrittenToDatabase, now);
            this.bytesReadFromDatabaseSensor.record((double)bytesReadFromDatabase, now);
            this.memtableBytesFlushedSensor.record((double)memtableBytesFlushed, now);
            this.memtableHitRatioSensor.record(this.computeHitRatio(memtableHits, memtableMisses), now);
            this.memtableAvgFlushTimeSensor.record(this.computeAvg(memtableFlushTimeSum, memtableFlushTimeCount), now);
            this.memtableMinFlushTimeSensor.record(memtableFlushTimeMin, now);
            this.memtableMaxFlushTimeSensor.record(memtableFlushTimeMax, now);
            this.blockCacheDataHitRatioSensor.record(this.computeHitRatio(blockCacheDataHits, blockCacheDataMisses), now);
            this.blockCacheIndexHitRatioSensor.record(this.computeHitRatio(blockCacheIndexHits, blockCacheIndexMisses), now);
            this.blockCacheFilterHitRatioSensor.record(this.computeHitRatio(blockCacheFilterHits, blockCacheFilterMisses), now);
            this.writeStallDurationSensor.record((double)writeStallDuration, now);
            this.bytesWrittenDuringCompactionSensor.record((double)bytesWrittenDuringCompaction, now);
            this.bytesReadDuringCompactionSensor.record((double)bytesReadDuringCompaction, now);
            this.compactionTimeAvgSensor.record(this.computeAvg(compactionTimeSum, compactionTimeCount), now);
            this.compactionTimeMinSensor.record(compactionTimeMin, now);
            this.compactionTimeMaxSensor.record(compactionTimeMax, now);
            this.numberOfOpenFilesSensor.record((double)numberOfOpenFiles, now);
            this.numberOfFileErrorsSensor.record((double)numberOfFileErrors, now);
        }
    }

    private double computeHitRatio(long hits, long misses) {
        if (hits == 0L) {
            return 0.0;
        }
        return (double)hits / (double)(hits + misses);
    }

    private double computeAvg(long sum, long count) {
        if (count == 0L) {
            return 0.0;
        }
        return (double)sum / (double)count;
    }

    private static class DbAndCacheAndStatistics {
        public final RocksDB db;
        public final Cache cache;
        public final Statistics statistics;

        public DbAndCacheAndStatistics(RocksDB db, Cache cache, Statistics statistics) {
            Objects.requireNonNull(db, "database instance must not be null");
            this.db = db;
            this.cache = cache;
            if (statistics != null) {
                statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
            }
            this.statistics = statistics;
        }
    }
}

