/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class SideOutputITCase
extends AbstractTestBase
implements Serializable {
    @Rule
    public transient ExpectedException expectedException = ExpectedException.none();
    static List<Integer> elements = new ArrayList<Integer>();

    @Test
    public void testWatermarkForwarding() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){};
        OutputTag<String> sideOutputTag2 = new OutputTag<String>("other-side"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource dataStream = env.addSource((SourceFunction)new SourceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                ctx.collectWithTimestamp((Object)1, 0L);
                ctx.emitWatermark(new Watermark(0L));
                ctx.collectWithTimestamp((Object)2, 1L);
                ctx.collectWithTimestamp((Object)5, 2L);
                ctx.emitWatermark(new Watermark(2L));
                ctx.collectWithTimestamp((Object)3, 3L);
                ctx.collectWithTimestamp((Object)4, 4L);
            }

            public void cancel() {
            }
        });
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag1){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag1;
            {
                this.val$sideOutputTag1 = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag1, (Object)("sideout-" + String.valueOf(value)));
            }
        });
        class WatermarkReifier
        extends AbstractStreamOperator<String>
        implements OneInputStreamOperator<String, String> {
            private static final long serialVersionUID = 1L;

            WatermarkReifier() {
            }

            public void processElement(StreamRecord<String> element) throws Exception {
                this.output.collect((Object)new StreamRecord((Object)("E:" + (String)element.getValue())));
            }

            public void processWatermark(Watermark mark) throws Exception {
                super.processWatermark(mark);
                this.output.collect((Object)new StreamRecord((Object)("WM:" + mark.getTimestamp())));
            }
        }
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).transform("ReifyWatermarks", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new WatermarkReifier()).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).transform("ReifyWatermarks", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new WatermarkReifier()).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.map((MapFunction)new MapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map(Integer value) throws Exception {
                return value.toString();
            }
        }).transform("ReifyWatermarks", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new WatermarkReifier()).addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:0", "WM:0", "WM:2", "WM:2", "WM:2", "WM:9223372036854775807", "WM:9223372036854775807", "WM:9223372036854775807"), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testSideOutputWithMultipleConsumers() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource dataStream = env.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(value)));
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testSideOutputWithMultipleConsumersWithObjectReuse() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setParallelism(3);
        DataStreamSource dataStream = env.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(value)));
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testDifferentSideOutputTypes() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("string"){};
        OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("int"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setParallelism(3);
        DataStreamSource dataStream = env.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag1, (OutputTag)sideOutputTag2){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag1;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag1 = outputTag;
                this.val$sideOutputTag2 = outputTag2;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag1, (Object)("sideout-" + String.valueOf(value)));
                ctx.output(this.val$sideOutputTag2, (Object)13);
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList(13, 13, 13, 13, 13), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testSideOutputNameClash() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){};
        OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("side"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag1, (OutputTag)sideOutputTag2){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag1;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag1 = outputTag;
                this.val$sideOutputTag2 = outputTag2;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag1, (Object)("sideout-" + String.valueOf(value)));
                ctx.output(this.val$sideOutputTag2, (Object)13);
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink1);
        this.expectedException.expect(UnsupportedOperationException.class);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).addSink((SinkFunction)sideOutputResultSink2);
    }

    @Test
    public void testProcessFunctionSideOutput() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(value)));
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testCoProcessFunctionSideOutput() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.connect((DataStream)ds2).process((CoProcessFunction)new CoProcessFunction<Integer, Integer, Integer>((OutputTag)sideOutputTag){
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout1-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout2-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
        OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.connect((DataStream)ds2).process((CoProcessFunction)new CoProcessFunction<Integer, Integer, Integer>((OutputTag)sideOutputTag1, (OutputTag)sideOutputTag2){
            final /* synthetic */ OutputTag val$sideOutputTag1;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag1 = outputTag;
                this.val$sideOutputTag2 = outputTag2;
            }

            public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag1, (Object)("sideout1-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag2, (Object)("sideout2-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testKeyedProcessFunctionSideOutput() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = dataStream.keyBy((KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        }).process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(value)));
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testLegacyKeyedCoProcessFunctionSideOutput() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.keyBy((KeySelector & Serializable)i -> i).connect((DataStream)ds2.keyBy((KeySelector & Serializable)i -> i)).process((CoProcessFunction)new CoProcessFunction<Integer, Integer, Integer>((OutputTag)sideOutputTag){
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout1-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout2-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testKeyedCoProcessFunctionSideOutput() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.keyBy((KeySelector & Serializable)i -> i).connect((DataStream)ds2.keyBy((KeySelector & Serializable)i -> i)).process((KeyedCoProcessFunction)new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>((OutputTag)sideOutputTag){
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout1-" + ctx.getCurrentKey() + "-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 3) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag, (Object)("sideout2-" + ctx.getCurrentKey() + "-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout2-3-3", "sideout2-4-4", "sideout2-5-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testLegacyKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
        OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.keyBy((KeySelector & Serializable)i -> i).connect((DataStream)ds2.keyBy((KeySelector & Serializable)i -> i)).process((CoProcessFunction)new CoProcessFunction<Integer, Integer, Integer>((OutputTag)sideOutputTag1, (OutputTag)sideOutputTag2){
            final /* synthetic */ OutputTag val$sideOutputTag1;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag1 = outputTag;
                this.val$sideOutputTag2 = outputTag2;
            }

            public void processElement1(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag1, (Object)("sideout1-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, CoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag2, (Object)("sideout2-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
        OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
        TestListResultSink sideOutputResultSink1 = new TestListResultSink();
        TestListResultSink sideOutputResultSink2 = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource ds1 = see.fromCollection(elements);
        DataStreamSource ds2 = see.fromCollection(elements);
        SingleOutputStreamOperator passThroughtStream = ds1.keyBy((KeySelector & Serializable)i -> i).connect((DataStream)ds2.keyBy((KeySelector & Serializable)i -> i)).process((KeyedCoProcessFunction)new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>((OutputTag)sideOutputTag1, (OutputTag)sideOutputTag2){
            final /* synthetic */ OutputTag val$sideOutputTag1;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag1 = outputTag;
                this.val$sideOutputTag2 = outputTag2;
            }

            public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value < 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag1, (Object)("sideout1-" + ctx.getCurrentKey() + "-" + String.valueOf(value)));
                }
            }

            public void processElement2(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value >= 4) {
                    out.collect((Object)value);
                    ctx.output(this.val$sideOutputTag2, (Object)("sideout2-" + ctx.getCurrentKey() + "-" + String.valueOf(value)));
                }
            }
        });
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink1);
        passThroughtStream.getSideOutput((OutputTag)sideOutputTag2).addSink((SinkFunction)sideOutputResultSink2);
        passThroughtStream.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout1-3-3"), (Object)sideOutputResultSink1.getSortedResult());
        Assert.assertEquals(Arrays.asList("sideout2-4-4", "sideout2-5-5"), (Object)sideOutputResultSink2.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testProcessFunctionSideOutputWithWrongTag() throws Exception {
        OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){};
        OutputTag<String> sideOutputTag2 = new OutputTag<String>("other-side"){};
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        dataStream.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)sideOutputTag2){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag2;
            {
                this.val$sideOutputTag2 = outputTag;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
                ctx.output(this.val$sideOutputTag2, (Object)("sideout-" + String.valueOf(value)));
            }
        }).getSideOutput((OutputTag)sideOutputTag1).addSink((SinkFunction)sideOutputResultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList(new Object[0]), (Object)sideOutputResultSink.getSortedResult());
    }

    @Test
    public void testAllWindowLateArrivingEvents() throws Exception {
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);
        DataStreamSource dataStream = see.fromCollection(elements);
        OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
        SingleOutputStreamOperator windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestWatermarkAssigner()).windowAll((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)1L), (Time)Time.milliseconds((long)1L))).sideOutputLateData((OutputTag)lateDataTag).apply((AllWindowFunction)new AllWindowFunction<Integer, Integer, TimeWindow>(){
            private static final long serialVersionUID = 1L;

            public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
                for (Integer val : values) {
                    out.collect((Object)val);
                }
            }
        });
        windowOperator.getSideOutput((OutputTag)lateDataTag).flatMap((FlatMapFunction)new FlatMapFunction<Integer, String>(){
            private static final long serialVersionUID = 1L;

            public void flatMap(Integer value, Collector<String> out) throws Exception {
                out.collect((Object)("late-" + String.valueOf(value)));
            }
        }).addSink((SinkFunction)sideOutputResultSink);
        see.execute();
        Assert.assertEquals((Object)sideOutputResultSink.getSortedResult(), Arrays.asList("late-3", "late-4"));
    }

    @Test
    public void testKeyedWindowLateArrivingEvents() throws Exception {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink lateResultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
        SingleOutputStreamOperator windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestWatermarkAssigner()).keyBy((KeySelector)new TestKeySelector()).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)1L), (Time)Time.milliseconds((long)1L))).allowedLateness(Time.milliseconds((long)2L)).sideOutputLateData((OutputTag)lateDataTag).apply((WindowFunction)new WindowFunction<Integer, String, Integer, TimeWindow>(){
            private static final long serialVersionUID = 1L;

            public void apply(Integer key, TimeWindow window, Iterable<Integer> input, Collector<String> out) throws Exception {
                for (Integer val : input) {
                    out.collect((Object)(String.valueOf(key) + "-" + String.valueOf(val)));
                }
            }
        });
        windowOperator.addSink((SinkFunction)resultSink);
        windowOperator.getSideOutput((OutputTag)lateDataTag).addSink((SinkFunction)lateResultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("1-1", "2-2", "4-4", "5-5"), (Object)resultSink.getSortedResult());
        Assert.assertEquals(Collections.singletonList(3), (Object)lateResultSink.getSortedResult());
    }

    @Test
    public void testProcessdWindowFunctionSideOutput() throws Exception {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromCollection(elements);
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        SingleOutputStreamOperator windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestWatermarkAssigner()).keyBy((KeySelector)new TestKeySelector()).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)1L), (Time)Time.milliseconds((long)1L))).process((ProcessWindowFunction)new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void process(Integer integer, ProcessWindowFunction.Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                out.collect((Object)integer);
                context.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(integer)));
            }
        });
        windowOperator.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testProcessAllWindowFunctionSideOutput() throws Exception {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);
        DataStreamSource dataStream = see.fromCollection(elements);
        OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
        SingleOutputStreamOperator windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestWatermarkAssigner()).windowAll((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)1L), (Time)Time.milliseconds((long)1L))).process((ProcessAllWindowFunction)new ProcessAllWindowFunction<Integer, Integer, TimeWindow>((OutputTag)sideOutputTag){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$sideOutputTag;
            {
                this.val$sideOutputTag = outputTag;
            }

            public void process(ProcessAllWindowFunction.Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                for (Integer e : elements) {
                    out.collect((Object)e);
                    context.output(this.val$sideOutputTag, (Object)("sideout-" + String.valueOf(e)));
                }
            }
        });
        windowOperator.getSideOutput((OutputTag)sideOutputTag).addSink((SinkFunction)sideOutputResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        see.execute();
        Assert.assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 5), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testUnionOfTwoSideOutputs() throws Exception {
        TestListResultSink evensResultSink = new TestListResultSink();
        TestListResultSink oddsResultSink = new TestListResultSink();
        TestListResultSink oddsUEvensResultSink = new TestListResultSink();
        TestListResultSink evensUOddsResultSink = new TestListResultSink();
        TestListResultSink oddsUOddsResultSink = new TestListResultSink();
        TestListResultSink evensUEvensResultSink = new TestListResultSink();
        TestListResultSink oddsUEvensExternalResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{1, 2, 3, 4});
        OutputTag<Integer> oddTag = new OutputTag<Integer>("odds"){};
        OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
        SingleOutputStreamOperator passThroughStream = input.process((ProcessFunction)new ProcessFunction<Integer, Integer>((OutputTag)oddTag, (OutputTag)evenTag){
            final /* synthetic */ OutputTag val$oddTag;
            final /* synthetic */ OutputTag val$evenTag;
            {
                this.val$oddTag = outputTag;
                this.val$evenTag = outputTag2;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
                if (value % 2 != 0) {
                    ctx.output(this.val$oddTag, (Object)value);
                } else {
                    ctx.output(this.val$evenTag, (Object)value);
                }
                out.collect((Object)value);
            }
        });
        SideOutputDataStream evens = passThroughStream.getSideOutput((OutputTag)evenTag);
        SideOutputDataStream odds = passThroughStream.getSideOutput((OutputTag)oddTag);
        evens.addSink((SinkFunction)evensResultSink);
        odds.addSink((SinkFunction)oddsResultSink);
        passThroughStream.addSink((SinkFunction)resultSink);
        odds.union(new DataStream[]{evens}).addSink((SinkFunction)oddsUEvensResultSink);
        evens.union(new DataStream[]{odds}).addSink((SinkFunction)evensUOddsResultSink);
        odds.union(new DataStream[]{odds}).addSink((SinkFunction)oddsUOddsResultSink);
        evens.union(new DataStream[]{evens}).addSink((SinkFunction)evensUEvensResultSink);
        odds.union(new DataStream[]{env.fromElements((Object[])new Integer[]{2, 4})}).addSink((SinkFunction)oddsUEvensExternalResultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList(1, 3), (Object)oddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(2, 4), (Object)evensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4), (Object)resultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4), (Object)oddsUEvensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4), (Object)evensUOddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 1, 3, 3), (Object)oddsUOddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(2, 2, 4, 4), (Object)evensUEvensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1, 2, 3, 4), (Object)oddsUEvensExternalResultSink.getSortedResult());
    }

    static {
        elements.add(1);
        elements.add(2);
        elements.add(5);
        elements.add(3);
        elements.add(4);
    }

    private static class TestKeySelector
    implements KeySelector<Integer, Integer> {
        private static final long serialVersionUID = 1L;

        private TestKeySelector() {
        }

        public Integer getKey(Integer value) throws Exception {
            return value;
        }
    }

    private static class TestWatermarkAssigner
    implements AssignerWithPunctuatedWatermarks<Integer> {
        private static final long serialVersionUID = 1L;

        private TestWatermarkAssigner() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }

        public long extractTimestamp(Integer element, long previousElementTimestamp) {
            return element.intValue();
        }
    }
}

