/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(50L)).grace(Duration.ofMillis(50L));
    private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
    private final String errorMessagePrefix = "Window settings mismatch. WindowBytesStoreSupplier settings";
    private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> second-join-left-repartition-filter, first-join-left-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: first-join-left-repartition-filter (stores: [])\n      --> first-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: second-join-left-repartition-filter (stores: [])\n      --> second-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n      <-- first-join-left-repartition-filter\n    Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n      <-- second-join-left-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> first-join-other-windowed\n    Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n      --> first-join-this-windowed\n    Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-other-join\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-this-join\n      <-- first-join-left-repartition-source\n    Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-merge\n      <-- first-join-other-windowed\n    Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-merge\n      <-- first-join-this-windowed\n    Processor: first-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- first-join-this-join, first-join-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- first-join-merge\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> second-join-other-windowed\n    Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n      --> second-join-this-windowed\n    Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-other-join\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-this-join\n      <-- second-join-left-repartition-source\n    Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-merge\n      <-- second-join-other-windowed\n    Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-merge\n      <-- second-join-this-windowed\n    Processor: second-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- second-join-this-join, second-join-other-join\n    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n      <-- second-join-merge\n\n";
    private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-FILTER-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-MAP-0000000003\n    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n      <-- KSTREAM-FILTER-0000000005\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n      --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> KSTREAM-WINDOWED-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-JOINTHIS-0000000009\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-JOINOTHER-0000000010\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-JOINTHIS-0000000018\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-JOINOTHER-0000000019\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000008\n    Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000007\n    Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000016\n    Processor: KSTREAM-MERGE-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n    Processor: KSTREAM-MERGE-0000000020 (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- KSTREAM-MERGE-0000000011\n    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n      <-- KSTREAM-MERGE-0000000020\n\n";

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVersionLatest() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, Integer::sum, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        this.props.setProperty("built.in.metrics.version", "latest");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamKStreamJoin.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("left", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
            inputTopic.pipeInput((Object)"A", null);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void shouldReuseRepartitionTopicWithGeneratedName() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties props = new Properties();
        props.put("topology.optimization", "none");
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream3 = builder.stream("topic3", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream newStream = stream1.map((k, v) -> new KeyValue(v, k));
        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L))).to("out-one");
        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L))).to("out-to");
        Assert.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-FILTER-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-MAP-0000000003\n    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n      <-- KSTREAM-FILTER-0000000005\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n      --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> KSTREAM-WINDOWED-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-JOINTHIS-0000000009\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-JOINOTHER-0000000010\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-JOINTHIS-0000000018\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-JOINOTHER-0000000019\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000008\n    Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000007\n    Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000016\n    Processor: KSTREAM-MERGE-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n    Processor: KSTREAM-MERGE-0000000020 (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- KSTREAM-MERGE-0000000011\n    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n      <-- KSTREAM-MERGE-0000000020\n\n", (Object)builder.build(props).describe().toString());
    }

    @Test
    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties props = new Properties();
        props.put("topology.optimization", "none");
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream3 = builder.stream("topic3", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream newStream = stream1.map((k, v) -> new KeyValue(v, k));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String());
        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), streamJoined.withName("first-join")).to("out-one");
        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), streamJoined.withName("second-join")).to("out-two");
        Topology topology = builder.build(props);
        System.out.println(topology.describe().toString());
        Assert.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> second-join-left-repartition-filter, first-join-left-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: first-join-left-repartition-filter (stores: [])\n      --> first-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: second-join-left-repartition-filter (stores: [])\n      --> second-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n      <-- first-join-left-repartition-filter\n    Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n      <-- second-join-left-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> first-join-other-windowed\n    Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n      --> first-join-this-windowed\n    Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-other-join\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-this-join\n      <-- first-join-left-repartition-source\n    Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-merge\n      <-- first-join-other-windowed\n    Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-merge\n      <-- first-join-this-windowed\n    Processor: first-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- first-join-this-join, first-join-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- first-join-merge\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> second-join-other-windowed\n    Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n      --> second-join-this-windowed\n    Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-other-join\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-this-join\n      <-- second-join-left-repartition-source\n    Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-merge\n      <-- second-join-other-windowed\n    Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-merge\n      <-- second-join-this-windowed\n    Processor: second-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- second-join-this-join, second-join-other-join\n    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n      <-- second-join-merge\n\n", (Object)topology.describe().toString());
    }

    @Test
    public void shouldDisableLoggingOnStreamJoined() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofMillis(50L));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withStoreName("store").withLoggingDisabled();
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, Integer::sum, joinWindows, streamJoined);
        Topology topology = builder.build();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store-this-join-store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)false));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store-other-join-store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    @Test
    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofMillis(50L));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.singletonMap("test", "property"));
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, Integer::sum, joinWindows, streamJoined);
        Topology topology = builder.build();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        internalTopologyBuilder.buildSubtopology(0);
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store-this-join-store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store-other-join-store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)AssignmentTestUtils.SUBTOPOLOGY_0)).stateChangelogTopics.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        for (InternalTopicConfig config : ((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)AssignmentTestUtils.SUBTOPOLOGY_0)).stateChangelogTopics.values()) {
            MatcherAssert.assertThat(config.getProperties(Collections.emptyMap(), 0L).get("test"), (Matcher)CoreMatchers.equalTo((Object)"property"));
        }
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 500L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 150L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenThisJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, false);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 500L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 150L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenOtherJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, false);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)this.streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldBuildJoinWithCustomStoresAndCorrectWindowSettings() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        left.join(right, Integer::sum, this.joinWindows, this.streamJoined);
        builder.build();
    }

    @Test
    public void shouldExceptionWhenJoinStoresDontHaveUniqueNames() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofMillis(50L));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        WindowBytesStoreSupplier thisStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        WindowBytesStoreSupplier otherStoreSupplier = this.buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        this.buildStreamsJoinThatShouldThrow((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows, "Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
    }

    @Test
    public void shouldJoinWithCustomStoreSuppliers() {
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withThisStoreSupplier(thisStoreSupplier), joinWindows);
        this.runJoin((StreamJoined<String, Integer, Integer>)streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
    }

    private void runJoin(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream joinedStream = left.join(right, Integer::sum, joinWindows, streamJoined);
        joinedStream.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopicLeft = driver.createInputTopic("left", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopicRight = driver.createInputTopic("right", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            inputTopicLeft.pipeInput((Object)"A", (Object)1, 1L);
            inputTopicLeft.pipeInput((Object)"B", (Object)1, 2L);
            inputTopicRight.pipeInput((Object)"A", (Object)1, 1L);
            inputTopicRight.pipeInput((Object)"B", (Object)2, 2L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<String, Integer>("A", 2, 1L), new KeyValueTimestamp<String, Integer>("B", 3, 2L));
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
            for (int i2 = 0; i2 < 2; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+c0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+c1", 0L));
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofHours(24L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+null", 0L));
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+null", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+null", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
            for (int i2 = 0; i2 < 2; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("c" + expectedKeys[i2]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+c0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+c0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+c1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+c1", 0L));
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            time = 1000L;
            for (i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("B" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time += 100L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+b0", 1100L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 1100L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 1100L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 1100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "B1+c1", 1101L), new KeyValueTimestamp<Integer, String>(2, "B2+c2", 1101L), new KeyValueTimestamp<Integer, String>(3, "B3+c3", 1101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "B2+d2", 1102L), new KeyValueTimestamp<Integer, String>(3, "B3+d3", 1102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "B3+e3", 1103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+h0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+i0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+i1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+j0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+j1", 1001L), new KeyValueTimestamp<Integer, String>(2, "B2+j2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("k" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+k0", 1000L), new KeyValueTimestamp<Integer, String>(1, "B1+k1", 1001L), new KeyValueTimestamp<Integer, String>(2, "B2+k2", 1002L), new KeyValueTimestamp<Integer, String>(3, "B3+k3", 1003L));
            time = 2000L;
            for (int i2 = 0; i2 < expectedKeys.length; ++i2) {
                inputTopic2.pipeInput((Object)expectedKeys[i2], (Object)("l" + expectedKeys[i2]), time + (long)i2);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time = 2100L;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+l0", 2100L), new KeyValueTimestamp<Integer, String>(1, "C1+l1", 2100L), new KeyValueTimestamp<Integer, String>(2, "C2+l2", 2100L), new KeyValueTimestamp<Integer, String>(3, "C3+l3", 2100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("D" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "D1+l1", 2101L), new KeyValueTimestamp<Integer, String>(2, "D2+l2", 2101L), new KeyValueTimestamp<Integer, String>(3, "D3+l3", 2101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("E" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "E2+l2", 2102L), new KeyValueTimestamp<Integer, String>(3, "E3+l3", 2102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("F" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "F3+l3", 2103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("G" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time = 1899L;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("H" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("I" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "I0+l0", 2000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("J" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "J0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "J1+l1", 2001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("K" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "K0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "K1+l1", 2001L), new KeyValueTimestamp<Integer, String>(2, "K2+l2", 2002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("L" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "L0+l0", 2000L), new KeyValueTimestamp<Integer, String>(1, "L1+l1", 2001L), new KeyValueTimestamp<Integer, String>(2, "L2+l2", 2002L), new KeyValueTimestamp<Integer, String>(3, "L3+l3", 2003L));
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).after(Duration.ofMillis(100L)).grace(Duration.ofMillis(0L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time = 999L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 1001L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+d0", 1002L), new KeyValueTimestamp<Integer, String>(1, "A1+d1", 1002L), new KeyValueTimestamp<Integer, String>(2, "A2+d2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+e0", 1003L), new KeyValueTimestamp<Integer, String>(1, "A1+e1", 1003L), new KeyValueTimestamp<Integer, String>(2, "A2+e2", 1003L), new KeyValueTimestamp<Integer, String>(3, "A3+e3", 1003L));
            time = 1100L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+f0", 1100L), new KeyValueTimestamp<Integer, String>(1, "A1+f1", 1100L), new KeyValueTimestamp<Integer, String>(2, "A2+f2", 1100L), new KeyValueTimestamp<Integer, String>(3, "A3+f3", 1100L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+g1", 1101L), new KeyValueTimestamp<Integer, String>(2, "A2+g2", 1101L), new KeyValueTimestamp<Integer, String>(3, "A3+g3", 1101L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+h2", 1102L), new KeyValueTimestamp<Integer, String>(3, "A3+h3", 1102L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "A3+i3", 1103L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(0L)).before(Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            long time = 1000L;
            for (int i = 0; i < expectedKeys.length; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 1000L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("c" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+c0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+c1", 1001L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("d" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+d0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+d1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+d2", 1002L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("e" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+e0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+e1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+e2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+e3", 1003L));
            time = 1000L;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("f" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+f0", 1000L), new KeyValueTimestamp<Integer, String>(1, "A1+f1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+f2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+f3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("g" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+g1", 1001L), new KeyValueTimestamp<Integer, String>(2, "A2+g2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+g3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("h" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+h2", 1002L), new KeyValueTimestamp<Integer, String>(3, "A3+h3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("i" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "A3+i3", 1003L));
            ++time;
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("j" + expectedKey), time);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
        }
    }

    private void buildStreamsJoinThatShouldThrow(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows, String expectedExceptionMessagePrefix) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        StreamsException streamsException = (StreamsException)Assert.assertThrows(StreamsException.class, () -> left.join(right, Integer::sum, joinWindows, streamJoined));
        Assert.assertTrue((boolean)streamsException.getMessage().startsWith(expectedExceptionMessagePrefix));
    }

    private WindowBytesStoreSupplier buildWindowBytesStoreSupplier(String name, long retentionPeriod, long windowSize, boolean retainDuplicates) {
        return Stores.inMemoryWindowStore((String)name, (Duration)Duration.ofMillis(retentionPeriod), (Duration)Duration.ofMillis(windowSize), (boolean)retainDuplicates);
    }
}

