/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.assertj.core.api.Assertions;

public class BlockingSourceContext<T>
implements SourceFunction.SourceContext<T> {
    private final String name;
    private final Object lock;
    private final OneShotLatch latchToTrigger;
    private final OneShotLatch latchToWait;
    private final ConcurrentHashMap<String, List<T>> collector;
    private final int threshold;
    private int counter = 0;
    private final List<T> localOutput;

    public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLatch latchToWait, ConcurrentHashMap<String, List<T>> output, int elemToFire) {
        this.name = name;
        this.lock = new Object();
        this.latchToTrigger = latchToTrigger;
        this.latchToWait = latchToWait;
        this.collector = output;
        this.threshold = elemToFire;
        this.localOutput = new ArrayList<T>();
        List<T> prev = this.collector.put(name, this.localOutput);
        Assertions.assertThat(prev).isNull();
    }

    public void collectWithTimestamp(T element, long timestamp) {
        this.collect(element);
    }

    public void collect(T element) {
        this.localOutput.add(element);
        if (++this.counter == this.threshold) {
            this.latchToTrigger.trigger();
            try {
                if (!this.latchToWait.isTriggered()) {
                    this.latchToWait.await();
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void emitWatermark(Watermark mark) {
        throw new UnsupportedOperationException();
    }

    public void markAsTemporarilyIdle() {
        throw new UnsupportedOperationException();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public void close() {
    }
}

