/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.DurableConduitBridge;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class CompositeConsumerNetworkBridgeTest
extends DynamicNetworkTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class);
    private static final String testTopic1 = "test.composite.topic.1";
    private static final String testTopic2 = "test.composite.topic.2";
    private static final String testQueue1 = "test.composite.queue.1";
    private static final String testQueue2 = "test.composite.queue.2";
    private BrokerService broker1;
    private BrokerService broker2;
    private Session session1;
    private Session session2;
    private final DynamicNetworkTestSupport.FLOW flow;
    private static final List<ActiveMQTopic> topics = List.of(new ActiveMQTopic("test.composite.topic.1"), new ActiveMQTopic("test.composite.topic.2"));
    private static final List<ActiveMQQueue> queues = List.of(new ActiveMQQueue("test.composite.queue.1"), new ActiveMQQueue("test.composite.queue.2"));

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({DynamicNetworkTestSupport.FLOW.FORWARD}, {DynamicNetworkTestSupport.FLOW.REVERSE});
    }

    public CompositeConsumerNetworkBridgeTest(DynamicNetworkTestSupport.FLOW flow) {
        this.flow = flow;
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    @Test
    public void testCompositeDurableSubscriber() throws Exception {
        this.setUp();
        ActiveMQTopic compositeTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber durSub = this.session1.createDurableSubscriber((Topic)compositeTopic, this.subName);
        this.assertConsumersCount(this.broker1, (ActiveMQDestination)compositeTopic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)compositeTopic, 0);
        this.assertNCDurableSubsCount(this.broker2, compositeTopic, 0);
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
            this.assertNCDurableSubsCount(this.broker2, topic, 1);
        }
        this.assertCompositeMapCounts(1, 1);
        durSub.close();
        Thread.sleep(1000L);
        this.removeSubscription(this.broker1, this.subName);
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
            this.assertNCDurableSubsCount(this.broker2, topic, 0);
        }
        this.assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeAndNormalDurableSub() throws Exception {
        this.setUp();
        ActiveMQTopic compositeTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber durSub1 = this.session1.createDurableSubscriber((Topic)compositeTopic, this.subName + "1");
        TopicSubscriber durSub2 = this.session1.createDurableSubscriber((Topic)topics.get(0), this.subName + "2");
        for (ActiveMQTopic topic : topics) {
            this.assertNCDurableSubsCount(this.broker2, topic, 1);
        }
        this.assertNCDurableSubsCount(this.broker2, compositeTopic, 0);
        this.assertCompositeMapCounts(1, 1);
        MessageProducer producer = this.session2.createProducer((Destination)topics.get(0));
        producer.send((Message)this.session2.createTextMessage("test"));
        Assert.assertNotNull((Object)durSub1.receive(1000L));
        Assert.assertNotNull((Object)durSub2.receive(1000L));
        durSub1.close();
        durSub2.close();
        Thread.sleep(1000L);
        this.removeSubscription(this.broker1, this.subName + "1");
        this.removeSubscription(this.broker1, this.subName + "2");
        this.assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testTopicCompositeSubs() throws Exception {
        this.setUp();
        ActiveMQTopic compositeTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        MessageConsumer sub1 = this.session1.createConsumer((Destination)compositeTopic);
        MessageConsumer sub2 = this.session1.createConsumer((Destination)compositeTopic);
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker1, (ActiveMQDestination)topic, 2);
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        }
        this.assertCompositeMapCounts(2, 0);
        MessageProducer producer = this.session2.createProducer((Destination)topics.get(0));
        producer.send((Message)this.session2.createTextMessage("test"));
        Assert.assertNotNull((Object)sub1.receive(1000L));
        Assert.assertNotNull((Object)sub2.receive(1000L));
        sub1.close();
        sub2.close();
        this.assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeQueueSubs() throws Exception {
        this.setUp();
        ActiveMQQueue compositeQueue = new ActiveMQQueue("test.composite.queue.1,test.composite.queue.2");
        MessageConsumer sub1 = this.session1.createConsumer((Destination)compositeQueue);
        MessageConsumer sub2 = this.session1.createConsumer((Destination)compositeQueue);
        for (ActiveMQDestination activeMQDestination : queues) {
            this.assertConsumersCount(this.broker1, activeMQDestination, 2);
            this.assertConsumersCount(this.broker2, activeMQDestination, 1);
        }
        this.assertCompositeMapCounts(2, 0);
        MessageProducer producer = this.session2.createProducer((Destination)queues.get(0));
        producer.send((Message)this.session2.createTextMessage("test"));
        Assert.assertTrue((sub1.receive(1000L) != null || sub2.receive(1000L) != null ? 1 : 0) != 0);
        sub1.close();
        sub2.close();
        this.assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeAndNormalQueueSubs() throws Exception {
        this.setUp();
        ActiveMQQueue compositeQueue = new ActiveMQQueue("test.composite.queue.1,test.composite.queue.2");
        MessageConsumer sub1 = this.session1.createConsumer((Destination)compositeQueue);
        MessageConsumer sub2 = this.session1.createConsumer((Destination)new ActiveMQQueue(testQueue2));
        this.assertConsumersCount(this.broker1, (ActiveMQDestination)queues.get(0), 1);
        this.assertConsumersCount(this.broker1, (ActiveMQDestination)queues.get(1), 2);
        for (ActiveMQDestination activeMQDestination : queues) {
            this.assertConsumersCount(this.broker2, activeMQDestination, 1);
        }
        this.assertCompositeMapCounts(1, 0);
        MessageProducer producer = this.session2.createProducer((Destination)queues.get(0));
        producer.send((Message)this.session2.createTextMessage("test"));
        Assert.assertNotNull((Object)sub1.receive(1000L));
        sub1.close();
        sub2.close();
        this.assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeTwoDurableSubscribers() throws Exception {
        this.setUp();
        ActiveMQTopic compositeTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber durSub1 = this.session1.createDurableSubscriber((Topic)compositeTopic, this.subName + "1");
        TopicSubscriber durSub2 = this.session1.createDurableSubscriber((Topic)compositeTopic, this.subName + "2");
        this.assertConsumersCount(this.broker1, (ActiveMQDestination)compositeTopic, 2);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)compositeTopic, 0);
        this.assertNCDurableSubsCount(this.broker2, compositeTopic, 0);
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
            this.assertNCDurableSubsCount(this.broker2, topic, 1);
        }
        this.assertCompositeMapCounts(2, 2);
        durSub1.close();
        Thread.sleep(1000L);
        this.removeSubscription(this.broker1, this.subName + "1");
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
            this.assertNCDurableSubsCount(this.broker2, topic, 1);
        }
        durSub2.close();
        Thread.sleep(1000L);
        this.removeSubscription(this.broker1, this.subName + "2");
        for (ActiveMQTopic topic : topics) {
            this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
            this.assertNCDurableSubsCount(this.broker2, topic, 0);
        }
        this.assertCompositeMapCounts(0, 0);
    }

    private void setUp() throws Exception {
        this.doSetUp(this.tempFolder.newFolder(), this.tempFolder.newFolder());
    }

    protected void doSetUp(File localDataDir, File remoteDataDir) throws Exception {
        this.doSetUpRemoteBroker(remoteDataDir);
        this.doSetUpLocalBroker(localDataDir);
        Thread.sleep(1000L);
    }

    protected void doSetUpLocalBroker(File dataDir) throws Exception {
        this.localBroker = this.createLocalBroker(dataDir);
        this.localBroker.setDeleteAllMessagesOnStartup(true);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        Wait.waitFor(() -> ((NetworkConnector)this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1, (long)10000L, (long)500L);
        this.localSession = this.localConnection.createSession(false, 1);
        if (this.flow.equals((Object)DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker1 = this.localBroker;
            this.session1 = this.localSession;
        } else {
            this.broker2 = this.localBroker;
            this.session2 = this.localSession;
        }
    }

    protected void doSetUpRemoteBroker(File dataDir) throws Exception {
        this.remoteBroker = this.createRemoteBroker(dataDir);
        this.remoteBroker.setDeleteAllMessagesOnStartup(true);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        if (this.flow.equals((Object)DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker2 = this.remoteBroker;
            this.session2 = this.remoteSession;
        } else {
            this.broker1 = this.remoteBroker;
            this.session1 = this.remoteSession;
        }
    }

    protected BrokerService createLocalBroker(File dataDir) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(dataDir);
        brokerService.setBrokerName("localBroker");
        brokerService.addNetworkConnector(this.configureLocalNetworkConnector());
        brokerService.addConnector("tcp://localhost:0");
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2), new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2)});
        return brokerService;
    }

    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
        List transportConnectors = this.remoteBroker.getTransportConnectors();
        URI remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
        String uri = "static:(" + remoteURI + ")";
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
        connector.setName("networkConnector");
        connector.setDynamicOnly(false);
        connector.setDecreaseNetworkConsumerPriority(false);
        connector.setConduitSubscriptions(true);
        connector.setDuplex(true);
        connector.setStaticBridge(false);
        ArrayList<ActiveMQQueue> dynamicIncludedDestinations = new ArrayList<ActiveMQQueue>();
        dynamicIncludedDestinations.addAll(List.of(new ActiveMQTopic("test.composite.topic.>"), new ActiveMQQueue("test.composite.queue.>")));
        connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations);
        return connector;
    }

    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(dataDir);
        brokerService.addConnector("tcp://localhost:0");
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2), new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2)});
        return brokerService;
    }

    protected void assertCompositeMapCounts(int compositeConsumerIdsSize, int compositeSubSize) throws Exception {
        DurableConduitBridge bridge = this.findBridge();
        Assert.assertTrue((boolean)Wait.waitFor(() -> compositeConsumerIdsSize == bridge.compositeConsumerIds.size(), (long)5000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> compositeSubSize == bridge.compositeSubscriptions.size(), (long)5000L, (long)500L));
    }

    protected DurableConduitBridge findBridge() throws Exception {
        if (this.flow.equals((Object)DynamicNetworkTestSupport.FLOW.FORWARD)) {
            return this.findBridge(this.remoteBroker);
        }
        return this.findBridge(this.localBroker);
    }

    protected DurableConduitBridge findBridge(BrokerService broker) throws Exception {
        DemandForwardingBridge bridge;
        if (broker.getNetworkConnectors().size() > 0) {
            Assert.assertTrue((boolean)Wait.waitFor(() -> ((NetworkConnector)broker.getNetworkConnectors().get(0)).activeBridges().size() == 1, (long)5000L, (long)500L));
            bridge = (NetworkBridge)((NetworkConnector)broker.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        } else {
            bridge = this.findDuplexBridge(broker.getTransportConnectorByScheme("tcp"));
        }
        return (DurableConduitBridge)bridge;
    }
}

