/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class DurableSubscriptionWithNoLocalTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
    private final int MSG_COUNT = 10;
    private final String KAHADB_DIRECTORY = "target/activemq-data/";
    @Rule
    public TestName name = new TestName();
    private BrokerService brokerService;
    private String connectionUri;
    private ActiveMQConnectionFactory factory;
    private final boolean keepDurableSubsActive;

    @Parameterized.Parameters(name="keepDurableSubsActive={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({true}, {false});
    }

    public DurableSubscriptionWithNoLocalTest(boolean keepDurableSubsActive) {
        this.keepDurableSubsActive = keepDurableSubsActive;
    }

    @Before
    public void setUp() throws Exception {
        this.createBroker(true);
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
    }

    @Test(timeout=60000L)
    public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
        try (ActiveMQConnection connection = null;){
            connection = (ActiveMQConnection)this.factory.createConnection();
            connection.setClientID("test-client");
            connection.start();
            this.test(connection, "test message 1");
            connection.stop();
            connection.start();
            this.test(connection, "test message 2");
        }
    }

    @Test(timeout=60000L)
    public void testNoLocalStillWorksNewConnection() throws Exception {
        try (ActiveMQConnection connection = null;){
            connection = (ActiveMQConnection)this.factory.createConnection();
            connection.setClientID("test-client");
            connection.start();
            this.test(connection, "test message 1");
        }
        try {
            connection = (ActiveMQConnection)this.factory.createConnection();
            connection.setClientID("test-client");
            connection.start();
            this.test(connection, "test message 2");
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    @Test(timeout=60000L)
    public void testNoLocalStillWorksRestartBroker() throws Exception {
        try (ActiveMQConnection connection = null;){
            connection = (ActiveMQConnection)this.factory.createConnection();
            connection.setClientID("test-client");
            connection.start();
            this.test(connection, "test message 1");
        }
        this.tearDown();
        this.createBroker(false);
        try {
            connection = (ActiveMQConnection)this.factory.createConnection();
            connection.setClientID("test-client");
            connection.start();
            this.test(connection, "test message 2");
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    void test(ActiveMQConnection connection, String body) throws Exception {
        Session incomingMessagesSession = connection.createSession(false, 1);
        Topic topic = incomingMessagesSession.createTopic("test.topic");
        TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", null, true);
        Session outgoingMessagesSession = connection.createSession(false, 1);
        Topic destination = outgoingMessagesSession.createTopic("test.topic");
        MessageProducer producer = outgoingMessagesSession.createProducer((Destination)destination);
        TextMessage textMessage = outgoingMessagesSession.createTextMessage(body);
        producer.send((Message)textMessage);
        producer.close();
        System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: " + textMessage.getText());
        outgoingMessagesSession.close();
        Assert.assertNull((Object)consumer.receive(2000L));
        consumer.close();
        incomingMessagesSession.close();
    }

    @Test(timeout=60000L)
    public void testDurableSubWithNoLocalChange() throws Exception {
        int i;
        TopicConnection connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(this.getDestinationName());
        TopicPublisher publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal = true");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        TopicSubscriber durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, true);
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from non-durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)subscriber.receive(500L));
        }
        LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.debug("Close DurableSubscriber with noLocal=true");
        durableSub.close();
        LOG.debug("Create DurableSubscriber with noLocal=false");
        durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, false);
        LOG.info("Attempting to receive messages from reconnected (noLocal=false) subscription");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)durableSub.receive(500L));
        }
        Assert.assertNull((Object)durableSub.receive(100L));
    }

    @Test(timeout=60000L)
    public void testInvertedDurableSubWithNoLocalChange() throws Exception {
        int i;
        TopicConnection connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(this.getDestinationName());
        TopicPublisher publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal = true");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        TopicSubscriber durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, false);
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from non-durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)subscriber.receive(500L));
        }
        LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)durableSub.receive(500L));
        }
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.debug("Close DurableSubscriber with noLocal=true");
        durableSub.close();
        LOG.debug("Create DurableSubscriber with noLocal=false");
        durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, true);
        LOG.info("Attempting to receive messages from reconnected (noLocal=true) subscription");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from reconnected (noLocal=true) subscription");
        Assert.assertNull((Object)durableSub.receive(500L));
        Assert.assertNull((Object)durableSub.receive(100L));
    }

    @Test(timeout=60000L)
    public void testDurableSubWithNoLocalChangeAfterRestart() throws Exception {
        int i;
        TopicConnection connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(this.getDestinationName());
        TopicPublisher publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal = true");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        TopicSubscriber durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, true);
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from non-durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)subscriber.receive(500L));
        }
        LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        this.tearDown();
        this.createBroker(false);
        connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        session = connection.createTopicSession(false, 1);
        topic = session.createTopic(this.getDestinationName());
        publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal=false");
        durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, false);
        LOG.info("Attempting to receive messages from reconnected (noLocal=false) subscription");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)durableSub.receive(500L));
        }
        Assert.assertNull((Object)durableSub.receive(100L));
    }

    @Test(timeout=60000L)
    public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
        int i;
        TopicConnection connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(this.getDestinationName());
        TopicPublisher publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal = true");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        TopicSubscriber durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, false);
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from non-durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)subscriber.receive(500L));
        }
        LOG.info("Attempting to receive messages from (noLocal=false) durable subscriber");
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((Object)durableSub.receive(500L));
        }
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        this.tearDown();
        this.createBroker(false);
        connection = this.factory.createTopicConnection();
        connection.setClientID(this.getClientId());
        connection.start();
        session = connection.createTopicSession(false, 1);
        topic = session.createTopic(this.getDestinationName());
        publisher = session.createPublisher(topic);
        LOG.debug("Create DurableSubscriber with noLocal=true");
        durableSub = session.createDurableSubscriber(topic, this.getSubscriptionName(), null, true);
        LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
        Assert.assertNull((Object)durableSub.receive(500L));
        LOG.debug("Sending 10 messages to topic");
        for (i = 0; i < 10; ++i) {
            publisher.publish(session.createMessage());
        }
        LOG.info("Attempting to receive messages from (noLocal=true) subscriber");
        Assert.assertNull((Object)durableSub.receive(500L));
        Assert.assertNull((Object)durableSub.receive(100L));
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(new File("target/activemq-data/-" + this.name.getMethodName()));
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(true);
        this.brokerService.setPersistenceAdapter((PersistenceAdapter)kaha);
        this.brokerService.setStoreOpenWireVersion(12);
        this.brokerService.setUseJmx(false);
        this.brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.brokerService.setKeepDurableSubsActive(this.keepDurableSubsActive);
        TransportConnector connector = this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.connectionUri = connector.getPublishableConnectString();
        this.factory = new ActiveMQConnectionFactory(this.connectionUri);
    }

    private String getDestinationName() {
        return this.name.getMethodName();
    }

    private String getClientId() {
        return this.name.getMethodName() + "-Client";
    }

    private String getSubscriptionName() {
        return this.name.getMethodName() + "-Subscription";
    }
}

