/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.usecases.TestSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DiscriminatingConsumerLoadTest
extends TestSupport {
    private static final Log LOG = LogFactory.getLog(DiscriminatingConsumerLoadTest.class);
    private Connection producerConnection;
    private Connection consumerConnection;
    public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
    public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
    private final int testSize = 5000;
    BrokerService broker;

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setMaxPageSize(5000);
        policyMap.setDefaultEntry(defaultPolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        super.setUp();
        this.producerConnection = this.createConnection();
        this.consumerConnection = this.createConnection();
    }

    protected void tearDown() throws Exception {
        if (this.producerConnection != null) {
            this.producerConnection.close();
            this.producerConnection = null;
        }
        if (this.consumerConnection != null) {
            this.consumerConnection.close();
            this.consumerConnection = null;
        }
        super.tearDown();
        this.broker.stop();
    }

    public void testNonDiscriminatingConsumer() throws Exception {
        this.consumerConnection = this.createConnection();
        this.consumerConnection.start();
        LOG.info((Object)("consumerConnection = " + this.consumerConnection));
        try {
            Thread.sleep(1000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Consumer consumer = new Consumer(this.consumerConnection, null);
        Thread consumerThread2 = new Thread(consumer);
        consumerThread2.start();
        this.producerConnection = this.createConnection();
        this.producerConnection.start();
        LOG.info((Object)("producerConnection = " + this.producerConnection));
        try {
            Thread.sleep(3000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = new Producer(this.producerConnection);
        Thread producerThread2 = new Thread(producer);
        producerThread2.start();
        consumerThread2.join();
        producer.stop = true;
        if (consumer.getCount() == 5000) {
            LOG.info((Object)"test complete .... all messsages consumed!!");
        } else {
            LOG.info((Object)("test failed .... Sent 5000 messages intended to be consumed ( 5000 total), but only consumed " + consumer.getCount()));
        }
        DiscriminatingConsumerLoadTest.assertTrue((String)("Sent 5000 messages intended to be consumed, but only consumed " + consumer.getCount()), (consumer.getCount() == 5000 ? 1 : 0) != 0);
        DiscriminatingConsumerLoadTest.assertFalse((String)"Delivery of messages to consumer was halted during this test", (boolean)consumer.deliveryHalted());
    }

    public void testDiscriminatingConsumer() throws Exception {
        this.consumerConnection = this.createConnection();
        this.consumerConnection.start();
        LOG.info((Object)("consumerConnection = " + this.consumerConnection));
        try {
            Thread.sleep(1000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Consumer consumer = new Consumer(this.consumerConnection, JMSTYPE_EATME);
        Thread consumerThread2 = new Thread(consumer);
        consumerThread2.start();
        this.producerConnection = this.createConnection();
        this.producerConnection.start();
        LOG.info((Object)("producerConnection = " + this.producerConnection));
        try {
            Thread.sleep(3000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = new Producer(this.producerConnection);
        Thread producerThread2 = new Thread(producer);
        producerThread2.start();
        consumerThread2.join();
        producer.stop = true;
        if (consumer.getCount() == 2500) {
            LOG.info((Object)"test complete .... all messsages consumed!!");
        } else {
            LOG.info((Object)("test failed .... Sent 5000 original messages, only half of which (2500) were intended to be consumed: consumer paused at: " + consumer.getCount()));
        }
        DiscriminatingConsumerLoadTest.assertTrue((String)("Sent 5000 original messages, only half of which (2500) were intended to be consumed: consumer paused at: " + consumer.getCount()), (consumer.getCount() == 2500 ? 1 : 0) != 0);
        DiscriminatingConsumerLoadTest.assertTrue((String)"Delivery of messages to consumer was halted during this test as it only wants half", (boolean)consumer.deliveryHalted());
    }

    private class Consumer
    extends Thread {
        protected int counterReceived = 0;
        private String jmsSelector = null;
        private boolean deliveryHalted = false;

        public Consumer(Connection connection, String jmsSelector) {
            this.jmsSelector = jmsSelector;
        }

        @Override
        public void run() {
            try {
                Session session = DiscriminatingConsumerLoadTest.this.consumerConnection.createSession(false, 1);
                Queue queue = session.createQueue("test");
                MessageConsumer consumer = null;
                consumer = null != this.jmsSelector ? session.createConsumer((Destination)queue, "JMSType='" + this.jmsSelector + "'") : session.createConsumer((Destination)queue);
                while (!this.deliveryHalted && this.counterReceived < 5000) {
                    TextMessage result = (TextMessage)consumer.receive(30000L);
                    if (result != null) {
                        ++this.counterReceived;
                        LOG.info((Object)("consuming .... JMSType = " + result.getJMSType() + " received = " + this.counterReceived));
                        continue;
                    }
                    LOG.info((Object)("consuming .... timeout while waiting for a message ... broker must have stopped delivery ...  received = " + this.counterReceived));
                    this.deliveryHalted = true;
                }
                session.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public int getCount() {
            return this.counterReceived;
        }

        public boolean deliveryHalted() {
            return this.deliveryHalted;
        }
    }

    private class Producer
    extends Thread {
        private int counterSent = 0;
        private Connection connection = null;
        public boolean stop = false;

        public Producer(Connection connection) {
            this.connection = connection;
        }

        @Override
        public void run() {
            try {
                Session session = this.connection.createSession(false, 1);
                Queue queue = session.createQueue("test");
                Thread.sleep(10000L);
                MessageProducer producer = session.createProducer((Destination)queue);
                while (!this.stop && this.counterSent < 5000) {
                    TextMessage message = session.createTextMessage("*** Ill ....... Ini ***");
                    message.setJMSType(DiscriminatingConsumerLoadTest.JMSTYPE_EATME);
                    producer.send((Message)message, 1, 0, 1800000L);
                    ++this.counterSent;
                    message = session.createTextMessage("*** Ill ....... Ini ***");
                    message.setJMSType(DiscriminatingConsumerLoadTest.JMSTYPE_IGNOREME);
                    producer.send((Message)message, 1, 0, 1800000L);
                    ++this.counterSent;
                }
                session.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            LOG.info((Object)("producer thread complete ... " + this.counterSent + " messages sent to the queue"));
        }
    }
}

