/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.BaseNetworkTest;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.DemandSubscription;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;

public class SimpleNetworkTest
extends BaseNetworkTest {
    protected static final int MESSAGE_COUNT = 10;
    protected AbstractApplicationContext context;
    protected ActiveMQTopic included;
    protected ActiveMQTopic excluded;
    protected String consumerName = "durableSubs";

    @Override
    protected void doSetUp(boolean deleteAllMessages) throws Exception {
        super.doSetUp(deleteAllMessages);
        this.included = new ActiveMQTopic("include.test.bar");
        this.excluded = new ActiveMQTopic("exclude.test.bar");
    }

    @Test(timeout=60000L)
    public void testMessageCompression() throws Exception {
        ActiveMQConnection localAmqConnection = (ActiveMQConnection)this.localConnection;
        localAmqConnection.setUseCompression(true);
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 1, (ActiveMQDestination)this.included);
        for (int i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
            Message msg = consumer1.receive(3000L);
            Assert.assertNotNull((String)("not null? message: " + i), (Object)msg);
            ActiveMQMessage amqMessage = (ActiveMQMessage)msg;
            Assert.assertTrue((boolean)amqMessage.isCompressed());
        }
        Assert.assertNull((Object)consumer1.receive(1000L));
        this.assertNetworkBridgeStatistics(10L, 0L);
    }

    @Test(timeout=60000L)
    public void testRequestReply() throws Exception {
        final MessageProducer remoteProducer = this.remoteSession.createProducer(null);
        MessageConsumer remoteConsumer = this.remoteSession.createConsumer((Destination)this.included);
        remoteConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                try {
                    TextMessage textMsg = (TextMessage)msg;
                    String payload = "REPLY: " + textMsg.getText();
                    Destination replyTo = msg.getJMSReplyTo();
                    textMsg.clearBody();
                    textMsg.setText(payload);
                    remoteProducer.send(replyTo, (Message)textMsg);
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        TopicRequestor requestor = new TopicRequestor((TopicSession)this.localSession, (Topic)this.included);
        Thread.sleep(5000L);
        for (int i = 0; i < 10; ++i) {
            TextMessage msg = this.localSession.createTextMessage("test msg: " + i);
            TextMessage result = (TextMessage)requestor.request((Message)msg);
            Assert.assertNotNull((Object)result);
            this.LOG.info(result.getText());
        }
        this.assertNetworkBridgeStatistics(10L, 10L);
    }

    @Test(timeout=60000L)
    public void testFiltering() throws Exception {
        MessageConsumer includedConsumer = this.remoteSession.createConsumer((Destination)this.included);
        MessageConsumer excludedConsumer = this.remoteSession.createConsumer((Destination)this.excluded);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        MessageProducer excludedProducer = this.localSession.createProducer((Destination)this.excluded);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        includedProducer.send((Message)test);
        excludedProducer.send((Message)test);
        Assert.assertNull((Object)excludedConsumer.receive(1000L));
        Assert.assertNotNull((Object)includedConsumer.receive(1000L));
        this.assertNetworkBridgeStatistics(1L, 0L);
    }

    @Test(timeout=60000L)
    public void testConduitBridge() throws Exception {
        MessageConsumer consumer1 = this.remoteSession.createConsumer((Destination)this.included);
        MessageConsumer consumer2 = this.remoteSession.createConsumer((Destination)this.included);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        producer.setDeliveryMode(1);
        this.waitForConsumerRegistration(this.localBroker, 2, (ActiveMQDestination)this.included);
        for (int i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
            Assert.assertNotNull((Object)consumer1.receive(1000L));
            Assert.assertNotNull((Object)consumer2.receive(1000L));
        }
        Assert.assertNull((Object)consumer1.receive(1000L));
        Assert.assertNull((Object)consumer2.receive(1000L));
        this.assertNetworkBridgeStatistics(10L, 0L);
        Assert.assertNotNull((Object)this.localBroker.getManagementContext().getObjectInstance(this.localBroker.createNetworkConnectorObjectName((NetworkConnector)this.localBroker.getNetworkConnectors().get(0))));
    }

    private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
        Assert.assertTrue((String)"Internal bridge consumers registered in time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] bridges = ((NetworkConnector)brokerService.getNetworkConnectors().get((int)0)).bridges.values().toArray();
                if (bridges.length > 0) {
                    SimpleNetworkTest.this.LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
                    DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport)bridges[0];
                    ConcurrentMap forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
                    SimpleNetworkTest.this.LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
                    if (!forwardingBridges.isEmpty()) {
                        for (DemandSubscription demandSubscription : forwardingBridges.values()) {
                            if (!demandSubscription.getLocalInfo().getDestination().equals((Object)destination)) continue;
                            SimpleNetworkTest.this.LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
                            return demandSubscription.size() >= min;
                        }
                    }
                }
                return false;
            }
        }));
    }

    @Test(timeout=60000L)
    public void testDurableTopicSubForwardMemoryUsage() throws Exception {
        TopicSubscriber remoteConsumer = this.remoteSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        Thread.sleep(1000L);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        for (int i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((long)10L, (long)this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0L;
            }
        }, (long)10000L, (long)500L));
        remoteConsumer.close();
    }

    @Test(timeout=60000L)
    public void testTopicSubForwardMemoryUsage() throws Exception {
        int i;
        MessageConsumer remoteConsumer = this.remoteSession.createConsumer((Destination)this.included);
        Thread.sleep(1000L);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        for (i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((long)10L, (long)this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0L;
            }
        }, (long)10000L, (long)500L));
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((String)("message count: " + i), (Object)remoteConsumer.receive(2500L));
        }
        remoteConsumer.close();
    }

    @Test(timeout=60000L)
    public void testQueueSubForwardMemoryUsage() throws Exception {
        int i;
        ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
        MessageConsumer remoteConsumer = this.remoteSession.createConsumer((Destination)queue);
        Thread.sleep(1000L);
        MessageProducer producer = this.localSession.createProducer((Destination)queue);
        for (i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((long)10L, (long)this.localBroker.getDestination((ActiveMQDestination)queue).getDestinationStatistics().getForwards().getCount());
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return SimpleNetworkTest.this.localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0L;
            }
        }, (long)10000L, (long)500L));
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((String)("message count: " + i), (Object)remoteConsumer.receive(2500L));
        }
        remoteConsumer.close();
    }

    @Test(timeout=60000L)
    public void testDurableStoreAndForward() throws Exception {
        int i;
        TopicSubscriber remoteConsumer = this.remoteSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        Thread.sleep(1000L);
        this.doTearDown();
        this.doSetUp(false);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        for (i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((long)10L, (long)this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics().getForwards().getCount());
        this.doTearDown();
        this.doSetUp(false);
        remoteConsumer = this.remoteSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        for (i = 0; i < 10; ++i) {
            Assert.assertNotNull((String)("message count: " + i), (Object)remoteConsumer.receive(2500L));
        }
    }

    @Ignore(value="This seems like a simple use case, but it is problematic to consume an existing topic store, it requires a connection per durable to match that connectionId")
    public void testDurableStoreAndForwardReconnect() throws Exception {
        int i;
        TopicSubscriber localConsumer = this.localSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        Thread.sleep(5000L);
        this.doTearDown();
        this.doSetUp(false);
        MessageProducer producer = this.localSession.createProducer((Destination)this.included);
        for (i = 0; i < 10; ++i) {
            TextMessage test = this.localSession.createTextMessage("test-" + i);
            producer.send((Message)test);
        }
        Thread.sleep(5000L);
        localConsumer = this.localSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        this.LOG.info("Consume from local consumer: " + (MessageConsumer)localConsumer);
        for (i = 0; i < 5; ++i) {
            Assert.assertNotNull((String)("message count: " + i), (Object)localConsumer.receive(2500L));
        }
        Thread.sleep(5000L);
        this.doTearDown();
        this.doSetUp(false);
        Thread.sleep(5000L);
        this.LOG.info("Consume from remote");
        TopicSubscriber remoteConsumer = this.remoteSession.createDurableSubscriber((Topic)this.included, this.consumerName);
        this.LOG.info("Remote consumer: " + (MessageConsumer)remoteConsumer);
        Thread.sleep(5000L);
        for (int i2 = 0; i2 < 5; ++i2) {
            Assert.assertNotNull((String)("message count: " + i2), (Object)remoteConsumer.receive(10000L));
        }
    }

    protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
        final NetworkBridge localBridge = (NetworkBridge)((NetworkConnector)this.localBroker.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        final NetworkBridge remoteBridge = (NetworkBridge)((NetworkConnector)this.remoteBroker.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && 0L == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && 0L == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
            }
        }));
    }
}

