/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;

import com.google.cloud.Timestamp;
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostProcessingMetricsDoFn
extends DoFn<DataChangeRecord, DataChangeRecord>
implements Serializable {
    private static final long serialVersionUID = -1515578871387565606L;
    private static final Logger LOG = LoggerFactory.getLogger(PostProcessingMetricsDoFn.class);
    private static final long COMMITTED_TO_EMITTED_THRESHOLD_MS = 100000L;
    private static final long STREAM_THRESHOLD_MS = 5000L;
    private final ChangeStreamMetrics metrics;

    public PostProcessingMetricsDoFn(ChangeStreamMetrics metrics) {
        this.metrics = metrics;
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<DataChangeRecord> receiver) {
        Instant commitInstant = new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime());
        this.metrics.incDataRecordCounter();
        this.measureCommitTimestampToEmittedMillis(dataChangeRecord);
        this.measureStreamMillis(dataChangeRecord);
        receiver.outputWithTimestamp((Object)dataChangeRecord, commitInstant);
    }

    private void measureCommitTimestampToEmittedMillis(DataChangeRecord dataChangeRecord) {
        Timestamp emittedTimestamp = Timestamp.now();
        Timestamp commitTimestamp = dataChangeRecord.getCommitTimestamp();
        Duration committedToEmitted = new Duration(commitTimestamp.toSqlTimestamp().getTime(), emittedTimestamp.toSqlTimestamp().getTime());
        long commitedToEmittedMillis = committedToEmitted.getMillis();
        if (commitedToEmittedMillis > 100000L) {
            LOG.debug("Data record took " + commitedToEmittedMillis + "ms to be emitted: " + dataChangeRecord.getMetadata());
        }
    }

    private void measureStreamMillis(DataChangeRecord dataChangeRecord) {
        ChangeStreamRecordMetadata metadata = dataChangeRecord.getMetadata();
        Timestamp streamStartedAt = metadata.getRecordStreamStartedAt();
        Timestamp streamEndedAt = metadata.getRecordStreamEndedAt();
        Duration streamDuration = new Duration(streamStartedAt.toSqlTimestamp().getTime(), streamEndedAt.toSqlTimestamp().getTime());
        long streamMillis = streamDuration.getMillis();
        if (streamMillis > 5000L) {
            LOG.debug("Data record took " + streamMillis + "ms to be streamed: " + metadata);
        }
    }
}

