/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MessagePriorityTest
extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class);
    BrokerService broker;
    PersistenceAdapter adapter;
    protected ActiveMQConnectionFactory factory;
    protected Connection conn;
    protected Session sess;
    public boolean useCache = true;
    public int deliveryMode = 2;
    public boolean dispatchAsync = true;
    public boolean prioritizeMessages = true;
    public boolean immediatePriorityDispatch = true;
    public int prefetchVal = 500;
    public int expireMessagePeriod = 30000;
    public int MSG_NUM = 600;
    public int HIGH_PRI = 7;
    public int LOW_PRI = 3;
    public int MED_PRI = 4;

    protected abstract PersistenceAdapter createPersistenceAdapter(boolean var1) throws Exception;

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("priorityTest");
        this.broker.setAdvisorySupport(false);
        this.adapter = this.createPersistenceAdapter(true);
        this.broker.setPersistenceAdapter(this.adapter);
        PolicyEntry policy = new PolicyEntry();
        policy.setPrioritizedMessages(this.prioritizeMessages);
        policy.setUseCache(this.useCache);
        policy.setExpireMessagesPeriod((long)this.expireMessagePeriod);
        StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy();
        durableSubPending.setImmediatePriorityDispatch(this.immediatePriorityDispatch);
        durableSubPending.setUseCache(this.useCache);
        policy.setPendingDurableSubscriberPolicy((PendingDurableSubscriberMessageStoragePolicy)durableSubPending);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("TEST"), (Object)policy);
        policyMap.put((ActiveMQDestination)new ActiveMQTopic("TEST"), (Object)policy);
        PolicyEntry ignoreExpired = new PolicyEntry();
        SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
        ignoreExpiredStrategy.setProcessExpired(false);
        ignoreExpired.setDeadLetterStrategy((DeadLetterStrategy)ignoreExpiredStrategy);
        policyMap.put((ActiveMQDestination)new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), (Object)ignoreExpired);
        PolicyEntry noCachePolicy = new PolicyEntry();
        noCachePolicy.setUseCache(false);
        noCachePolicy.setPrioritizedMessages(true);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("TEST_LOW_THEN_HIGH_10"), (Object)noCachePolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory("vm://priorityTest");
        this.factory.setMessagePrioritySupported(true);
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(this.prefetchVal);
        this.factory.setPrefetchPolicy(prefetch);
        this.factory.setWatchTopicAdvisories(false);
        this.factory.setDispatchAsync(this.dispatchAsync);
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
    }

    protected void tearDown() throws Exception {
        try {
            this.sess.close();
            this.conn.close();
        }
        catch (Exception exception) {
        }
        finally {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    public void testStoreConfigured() throws Exception {
        final Queue queue = this.sess.createQueue("TEST");
        final Topic topic = this.sess.createTopic("TEST");
        MessageProducer queueProducer = this.sess.createProducer((jakarta.jms.Destination)queue);
        MessageProducer topicProducer = this.sess.createProducer((jakarta.jms.Destination)topic);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MessagePriorityTest.this.broker.getRegionBroker().getDestinationMap().get(queue) != null;
            }
        });
        MessagePriorityTest.assertTrue((boolean)((Destination)this.broker.getRegionBroker().getDestinationMap().get(queue)).getMessageStore().isPrioritizedMessages());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MessagePriorityTest.this.broker.getRegionBroker().getDestinationMap().get(topic) != null;
            }
        });
        MessagePriorityTest.assertTrue((boolean)((Destination)this.broker.getRegionBroker().getDestinationMap().get(topic)).getMessageStore().isPrioritizedMessages());
        queueProducer.close();
        topicProducer.close();
    }

    public void initCombosForTestQueues() {
        this.addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
    }

    public void testQueues() throws Exception {
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST");
        ProducerThread lowPri = new ProducerThread((ActiveMQDestination)queue, this.MSG_NUM, this.LOW_PRI);
        ProducerThread highPri = new ProducerThread((ActiveMQDestination)queue, this.MSG_NUM, this.HIGH_PRI);
        lowPri.start();
        highPri.start();
        lowPri.join();
        highPri.join();
        MessageConsumer queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (int i = 0; i < this.MSG_NUM * 2; ++i) {
            Message msg = queueConsumer.receive(5000L);
            LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)(i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI), (int)msg.getJMSPriority());
        }
    }

    protected Message createMessage(int priority) throws Exception {
        String text = "priority " + priority;
        TextMessage msg = this.sess.createTextMessage(text);
        LOG.info("Sending  " + text);
        return msg;
    }

    public void initCombosForTestDurableSubs() {
        this.addCombinationValues("prefetchVal", new Object[]{1000, this.MSG_NUM / 4});
    }

    public void testDurableSubs() throws Exception {
        Message msg;
        int i;
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priority");
        sub.close();
        ProducerThread lowPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.LOW_PRI);
        ProducerThread highPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.HIGH_PRI);
        lowPri.start();
        highPri.start();
        lowPri.join();
        highPri.join();
        sub = this.sess.createDurableSubscriber((Topic)topic, "priority");
        for (i = 0; i < this.MSG_NUM * 2; ++i) {
            msg = sub.receive(5000L);
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)(i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI), (int)msg.getJMSPriority());
        }
        topic = (ActiveMQTopic)this.sess.createTopic("HAS_NO_PRIORITY");
        sub = this.sess.createDurableSubscriber((Topic)topic, "no_priority");
        sub.close();
        lowPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.LOW_PRI);
        highPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.HIGH_PRI);
        lowPri.start();
        highPri.start();
        lowPri.join();
        highPri.join();
        sub = this.sess.createDurableSubscriber((Topic)topic, "no_priority");
        for (i = 0; i < this.MSG_NUM * 2; ++i) {
            msg = sub.receive(5000L);
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
        }
    }

    public void initCombosForTestDurableSubsReconnect() {
        this.addCombinationValues("prefetchVal", new Object[]{1000, this.MSG_NUM / 2});
        this.addCombinationValues("dispatchAsync", new Object[]{Boolean.FALSE});
        this.addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testDurableSubsReconnect() throws Exception {
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        ProducerThread lowPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.LOW_PRI);
        ProducerThread highPri = new ProducerThread((ActiveMQDestination)topic, this.MSG_NUM, this.HIGH_PRI);
        lowPri.start();
        highPri.start();
        lowPri.join();
        highPri.join();
        int closeFrequency = this.MSG_NUM / 4;
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        for (int i = 0; i < this.MSG_NUM * 2; ++i) {
            Message msg = sub.receive(15000L);
            LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)(i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI), (int)msg.getJMSPriority());
            if (i <= 0 || i % closeFrequency != 0) continue;
            LOG.info("Closing durable sub.. on: " + i);
            sub.close();
            sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        }
    }

    public void testHighPriorityDelivery() throws Exception {
        Message msg;
        int i;
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(0);
        this.factory.setPrefetchPolicy(prefetch);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        int numToProduce = 2000;
        int[] dups = new int[4000];
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)topic, 2000, this.LOW_PRI + 1);
        producerThread2.run();
        LOG.info("Low priority messages sent");
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        int batchSize = 250;
        int lowLowCount = 0;
        for (i = 0; i < 2000; ++i) {
            msg = sub.receive(15000L);
            LOG.info("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null));
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)(this.LOW_PRI + 1), (int)msg.getJMSPriority());
            MessagePriorityTest.assertTrue((String)"not duplicate ", (dups[i] == 0 ? 1 : 0) != 0);
            dups[i] = 1;
            if (i % 250 != 0) continue;
            producerThread2.setMessagePriority(this.HIGH_PRI);
            producerThread2.setMessageCount(1);
            producerThread2.run();
            LOG.info("High priority message sent, should be able to receive immediately");
            if (i % 250 * 2 == 0) {
                producerThread2.setMessagePriority(this.HIGH_PRI - 1);
                producerThread2.setMessageCount(1);
                producerThread2.run();
                LOG.info("High -1 priority message sent, should be able to receive immediately");
            }
            if (i % 250 * 4 == 0) {
                producerThread2.setMessagePriority(this.LOW_PRI);
                producerThread2.setMessageCount(1);
                producerThread2.run();
                ++lowLowCount;
                LOG.info("Low low priority message sent, should not be able to receive immediately");
            }
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            LOG.info("received hi? : " + msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.HIGH_PRI, (int)msg.getJMSPriority());
            if (i % 250 * 2 != 0) continue;
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            LOG.info("received hi -1 ? i=" + i + ", " + msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)(this.HIGH_PRI - 1), (int)msg.getJMSPriority());
        }
        for (i = 0; i < lowLowCount; ++i) {
            msg = sub.receive(15000L);
            LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)this.LOW_PRI, (int)msg.getJMSPriority());
        }
    }

    public void initCombosForTestHighPriorityDeliveryInterleaved() {
        this.addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testHighPriorityDeliveryInterleaved() throws Exception {
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(0);
        this.factory.setPrefetchPolicy(prefetch);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)topic, 1, this.HIGH_PRI);
        producerThread2.run();
        producerThread2.setMessagePriority(this.HIGH_PRI - 1);
        producerThread2.setMessageCount(1);
        producerThread2.run();
        producerThread2.setMessagePriority(this.LOW_PRI);
        producerThread2.setMessageCount(1);
        producerThread2.run();
        LOG.info("Ordered priority messages sent");
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        Message msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
        MessagePriorityTest.assertEquals((String)"Message has wrong priority", (int)this.HIGH_PRI, (int)msg.getJMSPriority());
        producerThread2.setMessagePriority(this.LOW_PRI + 1);
        producerThread2.setMessageCount(1);
        producerThread2.run();
        msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
        MessagePriorityTest.assertEquals((String)"high priority", (int)(this.HIGH_PRI - 1), (int)msg.getJMSPriority());
        msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        LOG.info("received hi? : " + msg);
        MessagePriorityTest.assertEquals((String)"high priority", (int)(this.LOW_PRI + 1), (int)msg.getJMSPriority());
        msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        LOG.info("received hi? : " + msg);
        MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
        msg = sub.receive(4000L);
        MessagePriorityTest.assertNull((String)"Message was null", (Object)msg);
    }

    public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
        this.addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        this.addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.TRUE});
    }

    public void testHighPriorityDeliveryThroughBackLog() throws Exception {
        Message msg;
        int count;
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(0);
        this.factory.setPrefetchPolicy(prefetch);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)topic, 600, this.LOW_PRI);
        producerThread2.run();
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        for (count = 0; count < 300; ++count) {
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
        }
        producerThread2.setMessagePriority(this.HIGH_PRI);
        producerThread2.setMessageCount(1);
        producerThread2.run();
        msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        MessagePriorityTest.assertEquals((String)"high priority", (int)this.HIGH_PRI, (int)msg.getJMSPriority());
        while (count < 600) {
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
            ++count;
        }
    }

    public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
        this.addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        this.addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.FALSE});
    }

    public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
        Message msg;
        int count;
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setAll(0);
        this.factory.setPrefetchPolicy(prefetch);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)topic, 600, this.LOW_PRI);
        producerThread2.run();
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        for (count = 0; count < 300; ++count) {
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
        }
        producerThread2.setMessagePriority(this.HIGH_PRI);
        producerThread2.setMessageCount(1);
        producerThread2.run();
        while (count < 400) {
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
            ++count;
        }
        msg = sub.receive(15000L);
        MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
        MessagePriorityTest.assertEquals((String)"high priority", (int)this.HIGH_PRI, (int)msg.getJMSPriority());
        while (count < 600) {
            msg = sub.receive(15000L);
            MessagePriorityTest.assertNotNull((String)"Message was null", (Object)msg);
            MessagePriorityTest.assertEquals((String)"high priority", (int)this.LOW_PRI, (int)msg.getJMSPriority());
            ++count;
        }
    }

    public void initCombosForTestQueueBacklog() {
        this.addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        this.addCombinationValues("expireMessagePeriod", new Object[]{0});
    }

    public void testQueueBacklog() throws Exception {
        int backlog = 1800;
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST");
        ProducerThread lowPri = new ProducerThread((ActiveMQDestination)queue, 1800, this.LOW_PRI);
        ProducerThread highPri = new ProducerThread((ActiveMQDestination)queue, 10, this.HIGH_PRI);
        lowPri.start();
        lowPri.join();
        highPri.start();
        highPri.join();
        LOG.info("Starting consumer...");
        MessageConsumer queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (int i = 0; i < 500; ++i) {
            Message msg = queueConsumer.receive(20000L);
            LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
            if (msg == null) {
                MessagePriorityTest.dumpAllThreads((String)"backlog");
            }
            MessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            MessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)(i < 10 ? this.HIGH_PRI : this.LOW_PRI), (int)msg.getJMSPriority());
        }
        final DestinationStatistics destinationStatistics = ((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics();
        MessagePriorityTest.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
                return destinationStatistics.getEnqueues().getCount() == 1810L && destinationStatistics.getDequeues().getCount() == 500L;
            }
        }, (long)10000L));
    }

    public void initCombosForTestLowThenHighBatch() {
        this.addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        this.addCombinationValues("expireMessagePeriod", new Object[]{0});
    }

    public void testLowThenHighBatch() throws Exception {
        Message message;
        int i;
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST_LOW_THEN_HIGH_10");
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)queue, 10, this.LOW_PRI);
        producerThread2.run();
        MessageConsumer queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (i = 0; i < 10; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.LOW_PRI, (int)message.getJMSPriority());
        }
        queueConsumer.close();
        producerThread2.priority = this.HIGH_PRI;
        producerThread2.run();
        queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (i = 0; i < 10; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.HIGH_PRI, (int)message.getJMSPriority());
        }
        queueConsumer.close();
        producerThread2.priority = this.LOW_PRI;
        producerThread2.run();
        producerThread2.priority = this.MED_PRI;
        producerThread2.run();
        queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (i = 0; i < 10; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.MED_PRI, (int)message.getJMSPriority());
        }
        for (i = 0; i < 10; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.LOW_PRI, (int)message.getJMSPriority());
        }
        queueConsumer.close();
        producerThread2.priority = this.HIGH_PRI;
        producerThread2.run();
        queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (i = 0; i < 10; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.HIGH_PRI, (int)message.getJMSPriority());
        }
        queueConsumer.close();
    }

    public void testInterleaveHiNewConsumerGetsHi() throws Exception {
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST");
        this.doTestInterleaveHiNewConsumerGetsHi(queue);
    }

    public void testInterleaveHiNewConsumerGetsHiPull() throws Exception {
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST?consumer.prefetchSize=0");
        this.doTestInterleaveHiNewConsumerGetsHi(queue);
    }

    public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) throws Exception {
        ProducerThread producerThread2 = new ProducerThread((ActiveMQDestination)queue, 3, this.LOW_PRI);
        producerThread2.run();
        producerThread2 = new ProducerThread((ActiveMQDestination)queue, 1, this.HIGH_PRI);
        producerThread2.run();
        producerThread2 = new ProducerThread((ActiveMQDestination)queue, 3, this.LOW_PRI);
        producerThread2.run();
        MessageConsumer queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        Message message = queueConsumer.receive(10000L);
        MessagePriorityTest.assertNotNull((String)"expect #", (Object)message);
        MessagePriorityTest.assertEquals((String)"correct priority", (int)this.HIGH_PRI, (int)message.getJMSPriority());
        queueConsumer.close();
        producerThread2 = new ProducerThread((ActiveMQDestination)queue, 3, this.LOW_PRI);
        producerThread2.run();
        producerThread2 = new ProducerThread((ActiveMQDestination)queue, 1, this.HIGH_PRI);
        producerThread2.run();
        queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        message = queueConsumer.receive(10000L);
        MessagePriorityTest.assertNotNull((String)"expect #", (Object)message);
        MessagePriorityTest.assertEquals((String)"correct priority", (int)this.HIGH_PRI, (int)message.getJMSPriority());
        queueConsumer.close();
        queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        for (int i = 0; i < 9; ++i) {
            message = queueConsumer.receive(10000L);
            MessagePriorityTest.assertNotNull((String)("expect #" + i), (Object)message);
            MessagePriorityTest.assertEquals((String)"correct priority", (int)this.LOW_PRI, (int)message.getJMSPriority());
        }
        queueConsumer.close();
    }

    public void initCombosForTestEveryXHi() {
        this.addCombinationValues("useCache", new Object[]{Boolean.FALSE, Boolean.TRUE});
        this.addCombinationValues("expireMessagePeriod", new Object[]{0});
    }

    public void testEveryXHi() throws Exception {
        ActiveMQQueue queue = (ActiveMQQueue)this.sess.createQueue("TEST");
        this.broker.getDestinationPolicy().getEntryFor((ActiveMQDestination)queue).setMemoryLimit(51200L);
        String payload = new String(new byte[1024]);
        int numMessages = 500;
        final AtomicInteger received = new AtomicInteger(0);
        MessageConsumer queueConsumer = this.sess.createConsumer((jakarta.jms.Destination)queue);
        queueConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.incrementAndGet();
                if (received.get() < 20) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        MessageProducer producer = this.sess.createProducer((jakarta.jms.Destination)queue);
        for (int i = 0; i < 500; ++i) {
            Message message = this.sess.createMessage();
            message.setStringProperty("payload", payload);
            if (i % 5 == 0) {
                message.setJMSPriority(9);
            } else {
                message.setJMSPriority(4);
            }
            producer.send(message, 2, message.getJMSPriority(), 0L);
        }
        MessagePriorityTest.assertTrue((String)"Got all", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 500 == received.get();
            }
        }));
        final DestinationStatistics destinationStatistics = ((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics();
        MessagePriorityTest.assertTrue((String)"Nothing else Like dlq involved", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
                return destinationStatistics.getEnqueues().getCount() == 500L && destinationStatistics.getDequeues().getCount() == 500L;
            }
        }, (long)10000L));
        received.set(0);
        destinationStatistics.reset();
        for (int i = 0; i < 500; ++i) {
            Message message = this.sess.createMessage();
            message.setStringProperty("payload", payload);
            if (i % 5 == 0) {
                message.setJMSPriority(9);
            } else {
                message.setJMSPriority(4);
            }
            producer.send(message, 2, message.getJMSPriority(), 0L);
        }
        MessagePriorityTest.assertTrue((String)"Got all", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 500 == received.get();
            }
        }));
        MessagePriorityTest.assertTrue((String)"Nothing else Like dlq involved", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
                return destinationStatistics.getEnqueues().getCount() == 500L && destinationStatistics.getDequeues().getCount() == 500L;
            }
        }, (long)10000L));
        queueConsumer.close();
    }

    protected class ProducerThread
    extends Thread {
        int priority;
        int messageCount;
        ActiveMQDestination dest;

        public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
            this.messageCount = messageCount;
            this.priority = priority;
            this.dest = dest;
        }

        @Override
        public void run() {
            try {
                MessageProducer producer = MessagePriorityTest.this.sess.createProducer((jakarta.jms.Destination)this.dest);
                producer.setPriority(this.priority);
                producer.setDeliveryMode(MessagePriorityTest.this.deliveryMode);
                for (int i = 0; i < this.messageCount; ++i) {
                    producer.send((Message)MessagePriorityTest.this.sess.createTextMessage("message priority: " + this.priority));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void setMessagePriority(int priority) {
            this.priority = priority;
        }

        public void setMessageCount(int messageCount) {
            this.messageCount = messageCount;
        }
    }
}

