/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class ForceDurableNetworkBridgeTest
extends DynamicNetworkTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class);
    protected String testTopicName2 = "include.nonforced.bar";
    protected String staticTopic = "include.static.bar";
    protected String staticTopic2 = "include.static.nonforced.bar";
    private BrokerService broker1;
    private BrokerService broker2;
    private Session session1;
    private final DynamicNetworkTestSupport.FLOW flow;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({DynamicNetworkTestSupport.FLOW.FORWARD}, {DynamicNetworkTestSupport.FLOW.REVERSE});
    }

    public ForceDurableNetworkBridgeTest(DynamicNetworkTestSupport.FLOW flow) {
        this.flow = flow;
    }

    @Before
    public void setUp() throws Exception {
        this.doSetUp(true, this.tempFolder.newFolder(), this.tempFolder.newFolder());
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    @Test
    public void testForceDurableSubscriptionStatic() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.staticTopic);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
    }

    @Test
    public void testConsumerNotForceDurableSubscriptionStatic() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.staticTopic2);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testConsumerNotForceDurableSubscription() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName2);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        sub1.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
    }

    @Test
    public void testConsumerNotForceDurableWithAnotherDurable() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName2);
        TopicSubscriber durSub = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        this.session1.createConsumer((Destination)topic);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        durSub.close();
        Thread.sleep(1000L);
        this.removeSubscription(this.broker1, this.subName);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
    }

    @Test
    public void testForceDurableSubscription() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        sub1.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
    }

    @Test
    public void testForceDurableMultiSubscriptions() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        MessageConsumer sub2 = this.session1.createConsumer((Destination)topic);
        MessageConsumer sub3 = this.session1.createConsumer((Destination)topic);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        sub1.close();
        sub2.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        sub3.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
    }

    @Test
    public void testForceDurableSubWithDurableCreatedFirst() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        TopicSubscriber durSub = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        durSub.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        Thread.sleep(1000L);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        sub1.close();
        Thread.sleep(1000L);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.removeSubscription(this.broker1, this.subName);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        TopicSubscriber durSub = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        durSub.close();
        Thread.sleep(1000L);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.removeSubscription(this.broker1, this.subName);
        Thread.sleep(1000L);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        sub1.close();
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    @Test
    public void testDurableSticksAroundOnConsumerClose() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic(this.testTopicName);
        MessageConsumer sub1 = this.session1.createConsumer((Destination)topic);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        TopicSubscriber durSub = this.session1.createDurableSubscriber((Topic)topic, this.subName);
        durSub.close();
        sub1.close();
        Thread.sleep(1000L);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 1);
        this.assertNCDurableSubsCount(this.broker2, topic, 1);
        this.removeSubscription(this.broker1, this.subName);
        this.assertConsumersCount(this.broker2, (ActiveMQDestination)topic, 0);
        this.assertNCDurableSubsCount(this.broker2, topic, 0);
    }

    protected void restartBrokers() throws Exception {
        this.doTearDown();
        this.doSetUp(false, this.localBroker.getDataDirectoryFile(), this.remoteBroker.getDataDirectoryFile());
    }

    protected void doSetUp(boolean deleteAllMessages, File localDataDir, File remoteDataDir) throws Exception {
        this.included = new ActiveMQTopic(this.testTopicName);
        this.doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
        this.doSetUpLocalBroker(deleteAllMessages, localDataDir);
        Thread.sleep(1000L);
    }

    protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception {
        this.localBroker = this.createLocalBroker(dataDir);
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ((NetworkConnector)ForceDurableNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1;
            }
        }, (long)10000L, (long)500L);
        this.localSession = this.localConnection.createSession(false, 1);
        if (this.flow.equals((Object)DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker1 = this.localBroker;
            this.session1 = this.localSession;
        } else {
            this.broker2 = this.localBroker;
        }
    }

    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
        this.remoteBroker = this.createRemoteBroker(dataDir);
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        if (this.flow.equals((Object)DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker2 = this.remoteBroker;
        } else {
            this.broker1 = this.remoteBroker;
            this.session1 = this.remoteSession;
        }
    }

    protected BrokerService createLocalBroker(File dataDir) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(dataDir);
        brokerService.setBrokerName("localBroker");
        brokerService.addNetworkConnector(this.configureLocalNetworkConnector());
        brokerService.addConnector("tcp://localhost:0");
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(this.testTopicName), new ActiveMQTopic(this.testTopicName2), new ActiveMQTopic(this.excludeTopicName)});
        return brokerService;
    }

    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
        List transportConnectors = this.remoteBroker.getTransportConnectors();
        URI remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
        String uri = "static:(" + remoteURI + ")";
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
        connector.setName("networkConnector");
        connector.setDynamicOnly(false);
        connector.setDecreaseNetworkConsumerPriority(false);
        connector.setConduitSubscriptions(true);
        connector.setDuplex(true);
        connector.setStaticBridge(false);
        ArrayList<ActiveMQTopic> staticIncludedDestinations = new ArrayList<ActiveMQTopic>();
        staticIncludedDestinations.add(new ActiveMQTopic(this.staticTopic + "?forceDurable=true"));
        staticIncludedDestinations.add(new ActiveMQTopic(this.staticTopic2));
        connector.setStaticallyIncludedDestinations(staticIncludedDestinations);
        ArrayList<ActiveMQTopic> dynamicIncludedDestinations = new ArrayList<ActiveMQTopic>();
        dynamicIncludedDestinations.add(new ActiveMQTopic("include.test.>?forceDurable=true"));
        dynamicIncludedDestinations.add(new ActiveMQTopic(this.testTopicName2));
        connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations);
        ArrayList<ActiveMQTopic> excludedDestinations = new ArrayList<ActiveMQTopic>();
        excludedDestinations.add(new ActiveMQTopic(this.excludeTopicName));
        connector.setExcludedDestinations(excludedDestinations);
        return connector;
    }

    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(dataDir);
        brokerService.addConnector("tcp://localhost:0");
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(this.testTopicName), new ActiveMQTopic(this.testTopicName2), new ActiveMQTopic(this.excludeTopicName)});
        return brokerService;
    }
}

