/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

public class CoProcessOperatorTest
extends TestLogger {
    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        CoProcessOperator operator = new CoProcessOperator((CoProcessFunction)new WatermarkQueryingProcessFunction());
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark1(new Watermark(17L));
        testHarness.processWatermark2(new Watermark(17L));
        testHarness.processElement1(new StreamRecord((Object)5, 12L));
        testHarness.processWatermark1(new Watermark(42L));
        testHarness.processWatermark2(new Watermark(42L));
        testHarness.processElement2(new StreamRecord((Object)"6", 13L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(17L));
        expectedOutput.add(new StreamRecord((Object)"5WM:17 TS:12", 12L));
        expectedOutput.add(new Watermark(42L));
        expectedOutput.add(new StreamRecord((Object)"6WM:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        CoProcessOperator operator = new CoProcessOperator((CoProcessFunction)new ProcessingTimeQueryingProcessFunction());
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        testHarness.processElement1(new StreamRecord((Object)5));
        testHarness.setProcessingTime(42L);
        testHarness.processElement2(new StreamRecord((Object)"6"));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5PT:17 TS:null"));
        expectedOutput.add(new StreamRecord((Object)"6PT:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private static class ProcessingTimeQueryingProcessFunction
    extends CoProcessFunction<Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private ProcessingTimeQueryingProcessFunction() {
        }

        public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, CoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, CoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class WatermarkQueryingProcessFunction
    extends CoProcessFunction<Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private WatermarkQueryingProcessFunction() {
        }

        public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, CoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, CoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }
}

