/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoBrokerVirtualTopicSelectorAwareForwardingTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerVirtualTopicSelectorAwareForwardingTest.class);
    private static final String PERSIST_SELECTOR_CACHE_FILE_BASEPATH = "./target/selectorCache-";

    public void testJMX() throws Exception {
        this.clearSelectorCacheFiles();
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        this.createConsumer("BrokerB", (jakarta.jms.Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false), "foo = 'bar'");
        final BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
        VirtualDestinationSelectorCacheViewMBean cache = this.getVirtualDestinationSelectorCacheMBean(brokerA);
        Set selectors = cache.selectorsForDestination(testQueue);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)selectors.size());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertTrue((boolean)selectors.contains("foo = 'bar'"));
        boolean removed = cache.deleteSelectorForDestination(testQueue, "foo = 'bar'");
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertTrue((boolean)removed);
        selectors = cache.selectorsForDestination(testQueue);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)0, (int)selectors.size());
        this.createConsumer("BrokerB", (jakarta.jms.Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false), "ceposta = 'redhat'");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 2;
            }
        }, (long)500L);
        selectors = cache.selectorsForDestination(testQueue);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)selectors.size());
        cache.deleteAllSelectorsForDestination(testQueue);
        selectors = cache.selectorsForDestination(testQueue);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)0, (int)selectors.size());
    }

    public void testMessageLeaks() throws Exception {
        this.clearSelectorCacheFiles();
        this.startAllBrokers();
        final BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        ActiveMQDestination consumerQueue = this.createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer consumer1 = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerQueue, "SYMBOL = 'AAPL'");
        MessageConsumer consumer2 = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerQueue, "SYMBOL = 'AAPL'");
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        ProducerThreadTester producerTester = this.createProducerTester("BrokerA", (jakarta.jms.Destination)virtualTopic);
        producerTester.setRunIndefinitely(true);
        producerTester.setSleep(5);
        producerTester.addMessageProperty("AAPL");
        producerTester.addMessageProperty("VIX");
        producerTester.start();
        int currentCount = producerTester.getSentCount();
        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        MessageIdList consumer1Messages = this.getConsumerMessages("BrokerA", consumer1);
        consumer1Messages.waitForMessagesToArrive(50, 1000L);
        consumer1.close();
        consumer1 = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerQueue, "SYMBOL = 'VIX'");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size() == 2;
            }
        });
        currentCount = producerTester.getSentCount();
        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        consumer2.close();
        consumer2 = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerQueue, "SYMBOL = 'VIX'");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size() == 2;
            }
        });
        currentCount = producerTester.getSentCount();
        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX"));
        Thread.sleep(2000L);
        currentCount = producerTester.getSentCount();
        LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + producerTester.getCountForProperty("AAPL") + ", VIX=" + producerTester.getCountForProperty("VIX"));
        final long currentDepth = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount();
        LOG.info(">>>>> Orphaned messages? " + currentDepth);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount() > currentDepth;
            }
        }, (long)5000L);
        producerTester.setRunning(false);
        producerTester.join();
        Thread.sleep(1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertTrue((brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount() <= currentDepth ? 1 : 0) != 0);
    }

    private ProducerThreadTester createProducerTester(String brokerName, jakarta.jms.Destination destination) throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(brokerName);
        Connection conn = brokerItem.createConnection();
        conn.start();
        Session sess = conn.createSession(false, 1);
        ProducerThreadTester rc = new ProducerThreadTester(sess, destination);
        rc.setPersistent(this.persistentDelivery);
        return rc;
    }

    public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception {
        this.clearSelectorCacheFiles();
        this.startAllBrokers();
        BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        ActiveMQDestination consumerQueue = this.createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer selectingConsumer = this.establishConsumer("BrokerA", consumerQueue);
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1);
        selectingConsumer.close();
        selectingConsumer = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerQueue, "foo = 'bar'");
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("foo", "bar"));
        MessageIdList selectingConsumerMessages = this.getConsumerMessages("BrokerA", selectingConsumer);
        selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)selectingConsumerMessages.getMessageCount());
        selectingConsumerMessages.waitForMessagesToArrive(10, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)selectingConsumerMessages.getMessageCount());
        this.waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)2L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)1L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)1L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
    }

    private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem item = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(broker);
        Connection c = item.createConnection();
        c.start();
        Session s = c.createSession(false, 1);
        return s.createConsumer((jakarta.jms.Destination)consumerQueue);
    }

    public void testSelectorsAndNonSelectors() throws Exception {
        this.clearSelectorCacheFiles();
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        final BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        BrokerService brokerB = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker;
        ActiveMQDestination consumerBQueue = this.createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer selectingConsumer = this.createConsumer("BrokerB", (jakarta.jms.Destination)consumerBQueue, "foo = 'bar'");
        MessageConsumer nonSelectingConsumer = this.createConsumer("BrokerB", (jakarta.jms.Destination)consumerBQueue);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 2;
            }
        }, (long)500L);
        Destination destination = TestSupport.getDestination(brokerB, consumerBQueue);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)2, (int)destination.getConsumers().size());
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 10, this.asMap("foo", "bar"));
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 10);
        MessageIdList selectingConsumerMessages = this.getConsumerMessages("BrokerB", selectingConsumer);
        MessageIdList nonSelectingConsumerMessages = this.getConsumerMessages("BrokerB", nonSelectingConsumer);
        selectingConsumerMessages.waitForMessagesToArrive(5, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)5, (int)selectingConsumerMessages.getMessageCount());
        nonSelectingConsumerMessages.waitForMessagesToArrive(15, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)15, (int)nonSelectingConsumerMessages.getMessageCount());
        this.waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        this.waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        nonSelectingConsumer.close();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, (long)500L);
        selectingConsumerMessages.flushMessages();
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 10, this.asMap("ceposta", "redhat"));
        selectingConsumerMessages = this.getConsumerMessages("BrokerB", selectingConsumer);
        selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)0, (int)selectingConsumerMessages.getMessageCount());
        this.waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        this.waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        selectingConsumer.close();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 0;
            }
        }, (long)500L);
        selectingConsumerMessages.flushMessages();
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 10, this.asMap("foo", "bar"));
        this.waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)30L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)10L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        this.waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)20L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        selectingConsumer = this.createConsumer("BrokerB", (jakarta.jms.Destination)consumerBQueue, "foo = 'bar'");
        selectingConsumerMessages = this.getConsumerMessages("BrokerB", selectingConsumer);
        selectingConsumerMessages.waitForMessagesToArrive(10);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)10, (int)selectingConsumerMessages.getMessageCount());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, (long)500L);
        this.waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)30L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)30L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        this.waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)30L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)30L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getDequeues().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
    }

    public VirtualDestinationSelectorCacheViewMBean getVirtualDestinationSelectorCacheMBean(BrokerService broker) throws MalformedObjectNameException {
        ObjectName objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName((ObjectName)broker.getBrokerObjectName(), (String)"plugin", (String)"virtualDestinationCache");
        return (VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext().newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true);
    }

    public void testSelectorAwareForwarding() throws Exception {
        this.clearSelectorCacheFiles();
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        BrokerService brokerB = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker;
        final BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        MessageConsumer remoteConsumer = this.createConsumer("BrokerB", (jakarta.jms.Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false), "foo = 'bar'");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, (long)500L);
        ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
        Destination destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker, (ActiveMQDestination)queueB);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)virtualTopic));
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker, (ActiveMQDestination)virtualTopic));
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("foo", "bar"));
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("ceposta", "redhat"));
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", remoteConsumer);
        msgsB.waitForMessagesToArrive(1);
        msgsB.waitForMessagesToArrive(1, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)msgsB.getMessageCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)1L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
        remoteConsumer.close();
        brokerA.stop();
        brokerA.waitUntilStopped();
        this.deleteSelectorCacheFile("BrokerA");
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)0, (int)destination.getConsumers().size());
        remoteConsumer = this.createConsumer("BrokerB", (jakarta.jms.Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false), "ceposta = 'redhat'");
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        brokerA.start(true);
        brokerA.waitUntilStarted();
        System.out.println(brokerA.getNetworkConnectors());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            Destination dest;
            {
                this.dest = brokerA.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
            }

            public boolean isSatisified() throws Exception {
                return this.dest.getConsumers().size() == 1;
            }
        }, (long)500L);
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("foo", "bar"));
        this.sendMessages("BrokerB", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("foo", "bar"));
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("ceposta", "redhat"));
        this.sendMessages("BrokerB", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("ceposta", "redhat"));
        msgsB = this.getConsumerMessages("BrokerB", remoteConsumer);
        msgsB.waitForMessagesToArrive(2);
        msgsB.waitForMessagesToArrive(1, 1000L);
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((int)2, (int)msgsB.getMessageCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)0L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getMessages().getCount());
        TwoBrokerVirtualTopicSelectorAwareForwardingTest.assertEquals((long)3L, (long)brokerB.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getDestinationStatistics().getEnqueues().getCount());
    }

    public void testSelectorNoMatchInCache() throws Exception {
        this.clearSelectorCacheFiles();
        BrokerService brokerA = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker;
        ((SubQueueSelectorCacheBrokerPlugin)brokerA.getPlugins()[0]).setIgnoreWildcardSelectors(true);
        this.startAllBrokers();
        ActiveMQDestination consumerBQueue = this.createDestination("Consumer.B.VirtualTopic.tempTopic", false);
        MessageConsumer nonMatchingConsumer = this.createConsumer("BrokerA", (jakarta.jms.Destination)consumerBQueue, "foo = 'bar%'");
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        this.sendMessages("BrokerA", (jakarta.jms.Destination)virtualTopic, 1, this.asMap("foo", "notBar"));
    }

    private HashMap<String, Object> asMap(String key, Object value) {
        HashMap<String, Object> rc = new HashMap<String, Object>(1);
        rc.put(key, value);
        return rc;
    }

    private void bridgeAndConfigureBrokers(String local, String remote) throws Exception {
        NetworkConnector bridge = this.bridgeBrokers(local, remote, false, 1, false);
        bridge.setDecreaseNetworkConsumerPriority(true);
        bridge.setDuplex(true);
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String options = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
    }

    private void clearSelectorCacheFiles() {
        String[] brokerNames;
        for (String brokerName : brokerNames = new String[]{"BrokerA", "BrokerB"}) {
            this.deleteSelectorCacheFile(brokerName);
        }
    }

    private void deleteSelectorCacheFile(String brokerName) {
        File brokerPersisteFile = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + brokerName);
        if (brokerPersisteFile.exists()) {
            brokerPersisteFile.delete();
        }
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService broker = this.createBroker(uri);
        broker.setUseJmx(true);
        VirtualTopic virtualTopic = new VirtualTopic();
        virtualTopic.setSelectorAware(true);
        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
        interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
        broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
        this.configurePersistenceAdapter(broker);
        SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
        selectorCacheBrokerPlugin.setSingleSelectorPerDestination(true);
        File persisteFile = new File(PERSIST_SELECTOR_CACHE_FILE_BASEPATH + broker.getBrokerName());
        selectorCacheBrokerPlugin.setPersistFile(persisteFile);
        broker.setPlugins(new BrokerPlugin[]{selectorCacheBrokerPlugin});
        return broker;
    }

    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        broker.setPersistenceAdapter((PersistenceAdapter)kaha);
    }

    private void waitForMessagesToBeConsumed(final BrokerService broker, String destinationName, boolean topic, int numEnqueueMsgs, int numDequeueMsgs, int waitTime) throws Exception {
        Object destination = topic ? new ActiveMQTopic(destinationName) : new ActiveMQQueue(destinationName);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            final /* synthetic */ ActiveMQDestination val$destination;
            final /* synthetic */ int val$numEnqueueMsgs;
            {
                this.val$destination = activeMQDestination;
                this.val$numEnqueueMsgs = n;
            }

            public boolean isSatisified() throws Exception {
                return broker.getDestination(this.val$destination).getDestinationStatistics().getEnqueues().getCount() == (long)this.val$numEnqueueMsgs;
            }
        }, (long)waitTime);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            final /* synthetic */ ActiveMQDestination val$destination;
            final /* synthetic */ int val$numDequeueMsgs;
            {
                this.val$destination = activeMQDestination;
                this.val$numDequeueMsgs = n;
            }

            public boolean isSatisified() throws Exception {
                return broker.getDestination(this.val$destination).getDestinationStatistics().getDequeues().getCount() == (long)this.val$numDequeueMsgs;
            }
        }, (long)waitTime);
    }

    class ProducerThreadTester
    extends ProducerThread {
        private Set<String> selectors;
        private Map<String, AtomicInteger> selectorCounts;
        private Random rand;

        public ProducerThreadTester(Session session, jakarta.jms.Destination destination) {
            super(session, destination);
            this.selectors = new LinkedHashSet<String>();
            this.selectorCounts = new HashMap<String, AtomicInteger>();
            this.rand = new Random(System.currentTimeMillis());
        }

        protected Message createMessage(int i) throws Exception {
            TextMessage msg = TwoBrokerVirtualTopicSelectorAwareForwardingTest.this.createTextMessage(this.session, "Message-" + i);
            if (this.selectors.size() > 0) {
                String value = this.getRandomKey();
                msg.setStringProperty("SYMBOL", value);
                AtomicInteger currentCount = this.selectorCounts.get(value);
                currentCount.incrementAndGet();
            }
            return msg;
        }

        public void resetCounters() {
            super.resetCounters();
            for (String key : this.selectorCounts.keySet()) {
                this.selectorCounts.put(key, new AtomicInteger(0));
            }
        }

        private String getRandomKey() {
            ArrayList<String> keys = new ArrayList<String>(this.selectors);
            return keys.get(this.rand.nextInt(keys.size()));
        }

        public void addMessageProperty(String value) {
            if (!this.selectors.contains(value)) {
                this.selectors.add(value);
                this.selectorCounts.put(value, new AtomicInteger(0));
            }
        }

        public int getCountForProperty(String key) {
            return this.selectorCounts.get(key).get();
        }
    }
}

