/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.ArrayDeque;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BatchGroupedReduceOperatorTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void noIncrementalResults() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = this.createTestHarness();
        testHarness.processElement(new StreamRecord((Object)"hello"));
        testHarness.processElement(new StreamRecord((Object)"hello"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.empty());
    }

    @Test
    public void resultsOnMaxWatermark() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = this.createTestHarness();
        testHarness.processElement(new StreamRecord((Object)"hello"));
        testHarness.processElement(new StreamRecord((Object)"hello"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        testHarness.processWatermark(Long.MAX_VALUE);
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"hellohello", Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)"ciaociaociao", Long.MAX_VALUE));
        expectedOutput.add(new Watermark(Long.MAX_VALUE));
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
    }

    @Test
    public void resultForSingleInput() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = this.createTestHarness();
        testHarness.processElement(new StreamRecord((Object)"hello"));
        testHarness.processElement(new StreamRecord((Object)"ciao"));
        testHarness.processWatermark(Long.MAX_VALUE);
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"hello", Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)"ciao", Long.MAX_VALUE));
        expectedOutput.add(new Watermark(Long.MAX_VALUE));
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
    }

    private KeyedOneInputStreamOperatorTestHarness<String, String, String> createTestHarness() throws Exception {
        BatchGroupedReduceOperator operator = new BatchGroupedReduceOperator((ReduceFunction)new Concatenator(), (TypeSerializer)StringSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, String, String>((OneInputStreamOperator<String, String>)operator, (KeySelector<String, String>)(KeySelector & Serializable)in -> in, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        return testHarness;
    }

    static class Concatenator
    implements ReduceFunction<String> {
        Concatenator() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + value2;
        }
    }
}

