/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;

public class RowTimeMiniBatchAssginerOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private final long minibatchInterval;
    private transient long currentWatermark;
    private transient long nextWatermark;

    public RowTimeMiniBatchAssginerOperator(long minibatchInterval) {
        this.minibatchInterval = minibatchInterval;
    }

    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        this.nextWatermark = RowTimeMiniBatchAssginerOperator.getMiniBatchStart(this.currentWatermark, this.minibatchInterval) + this.minibatchInterval - 1L;
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        this.output.collect(element);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
            return;
        }
        this.currentWatermark = Math.max(this.currentWatermark, mark.getTimestamp());
        if (this.currentWatermark >= this.nextWatermark) {
            this.advanceWatermark();
        }
    }

    private void advanceWatermark() {
        this.output.emitWatermark(new Watermark(this.currentWatermark));
        long start = RowTimeMiniBatchAssginerOperator.getMiniBatchStart(this.currentWatermark, this.minibatchInterval);
        long end = start + this.minibatchInterval - 1L;
        this.nextWatermark = end > this.currentWatermark ? end : end + this.minibatchInterval;
    }

    public void close() throws Exception {
        super.close();
        this.advanceWatermark();
    }

    private static long getMiniBatchStart(long watermark, long interval) {
        return watermark - (watermark + interval) % interval;
    }
}

