/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueConnection;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class AMQ5266SingleDestTest {
    static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
    String activemqURL;
    BrokerService brokerService;
    public int numDests = 1;
    public int messageSize = 10000;
    @Parameterized.Parameter(value=0)
    public int publisherMessagesPerThread = 1000;
    @Parameterized.Parameter(value=1)
    public int publisherThreadCount = 20;
    @Parameterized.Parameter(value=2)
    public int consumerThreadsPerQueue = 5;
    @Parameterized.Parameter(value=3)
    public int destMemoryLimit = 51200;
    @Parameterized.Parameter(value=4)
    public boolean useCache = true;
    @Parameterized.Parameter(value=5)
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
    @Parameterized.Parameter(value=6)
    public boolean optimizeDispatch = false;
    public int consumerBatchSize = 25;
    String messageText;

    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList({1000, 40, 40, 0x100000, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 40, 40, 0x100000, true, TestSupport.PersistenceAdapterChoice.JDBC, false});
    }

    @BeforeClass
    public static void derbyTestMode() throws Exception {
        System.setProperty("derby.system.durability", "test");
    }

    @Before
    public void startBroker() throws Exception {
        this.brokerService = new BrokerService();
        TestSupport.setPersistenceAdapter(this.brokerService, this.persistenceAdapterChoice);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setUseJmx(false);
        this.brokerService.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setUseConsumerPriority(false);
        defaultEntry.setMaxProducersToAudit(this.publisherThreadCount);
        defaultEntry.setEnableAudit(true);
        defaultEntry.setUseCache(this.useCache);
        defaultEntry.setMaxPageSize(1000);
        defaultEntry.setOptimizedDispatch(this.optimizeDispatch);
        defaultEntry.setMemoryLimit((long)this.destMemoryLimit);
        defaultEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(defaultEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.getSystemUsage().getMemoryUsage().setLimit(0x4000000L);
        TransportConnector transportConnector = this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.activemqURL = transportConnector.getPublishableConnectString();
        this.activemqURL = this.activemqURL + "?jms.watchTopicAdvisories=false";
    }

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
        }
    }

    @Test
    public void test() throws Exception {
        Object activemqQueues = "activemq";
        for (int i = 1; i < this.numDests; ++i) {
            activemqQueues = (String)activemqQueues + ",activemq" + i;
        }
        int consumerWaitForConsumption = 300000;
        ExportQueuePublisher publisher = null;
        ExportQueueConsumer consumer = null;
        LOG.info("Publisher will publish " + this.publisherMessagesPerThread * this.publisherThreadCount + " messages to each queue specified.");
        LOG.info("\nBuilding Publisher...");
        publisher = new ExportQueuePublisher(this.activemqURL, (String)activemqQueues, this.publisherMessagesPerThread, this.publisherThreadCount);
        LOG.info("Building Consumer...");
        consumer = new ExportQueueConsumer(this.activemqURL, (String)activemqQueues, this.consumerThreadsPerQueue, this.consumerBatchSize, this.publisherMessagesPerThread * this.publisherThreadCount);
        long totalStart = System.currentTimeMillis();
        LOG.info("Starting Publisher...");
        publisher.start();
        LOG.info("Starting Consumer...");
        consumer.start();
        int distinctPublishedCount = 0;
        LOG.info("Waiting For Publisher Completion...");
        publisher.waitForCompletion();
        List<String> publishedIds = publisher.getIDs();
        distinctPublishedCount = new TreeSet<String>(publishedIds).size();
        LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
        LOG.info("Publisher duration: {}", (Object)TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
        long endWait = System.currentTimeMillis() + (long)consumerWaitForConsumption;
        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
            try {
                int secs = (int)(endWait - System.currentTimeMillis()) / 1000;
                LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
                Thread.sleep(1000L);
            }
            catch (Exception exception) {}
        }
        LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
        LOG.info("Total duration: {}", (Object)TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
        consumer.shutdown();
        TimeUnit.SECONDS.sleep(2L);
        LOG.info("Consumer Stats:");
        for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
            List<String> idList = entry.getValue();
            int distinctConsumed = new TreeSet<String>(idList).size();
            StringBuilder sb = new StringBuilder();
            sb.append("   Queue: " + entry.getKey() + " -> Total Messages Consumed: " + idList.size() + ", Distinct IDs Consumed: " + distinctConsumed);
            int diff = distinctPublishedCount - distinctConsumed;
            sb.append(" ( " + (Serializable)(diff > 0 ? Integer.valueOf(diff) : "NO") + " STUCK MESSAGES  ) ");
            LOG.info(sb.toString());
            Assert.assertEquals((String)"expect to get all messages!", (long)0L, (long)diff);
        }
        Assert.assertEquals((String)"No pending messages", (long)0L, (long)((RegionBroker)this.brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String getMessageText() {
        if (this.messageText == null) {
            AMQ5266SingleDestTest aMQ5266SingleDestTest = this;
            synchronized (aMQ5266SingleDestTest) {
                if (this.messageText == null) {
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < this.messageSize; ++i) {
                        sb.append("X");
                    }
                    this.messageText = sb.toString();
                }
            }
        }
        return this.messageText;
    }

    public class ExportQueuePublisher {
        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
        private ActiveMQConnectionFactory connectionFactory = null;
        private String activemqURL = null;
        private String activemqQueues = null;
        private List<String> ids = Collections.synchronizedList(new ArrayList());
        private List<PublisherThread> threads;

        public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
            this.activemqURL = activemqURL;
            this.activemqQueues = activemqQueues;
            this.threads = new ArrayList<PublisherThread>();
            for (int i = 0; i < threadCount; ++i) {
                PublisherThread pt = new PublisherThread(messagesPerThread);
                this.threads.add(pt);
            }
        }

        public List<String> getIDs() {
            return this.ids;
        }

        public void start() throws Exception {
            for (PublisherThread pt : this.threads) {
                pt.start();
            }
        }

        public void waitForCompletion() throws Exception {
            for (PublisherThread pt : this.threads) {
                pt.join();
                pt.close();
            }
        }

        private Session newSession(QueueConnection queueConnection) throws Exception {
            return queueConnection.createSession(false, 1);
        }

        private synchronized QueueConnection newQueueConnection() throws Exception {
            if (this.connectionFactory == null) {
                this.connectionFactory = new ActiveMQConnectionFactory(this.amqUser, this.amqPassword, this.activemqURL);
            }
            RedeliveryPolicy policy = this.connectionFactory.getRedeliveryPolicy();
            policy.setMaximumRedeliveries(-1);
            QueueConnection amqConnection = this.connectionFactory.createQueueConnection();
            amqConnection.start();
            return amqConnection;
        }

        private class PublisherThread
        extends Thread {
            private int count;
            private QueueConnection qc;
            private Session session;
            private MessageProducer mp;

            private PublisherThread(int count) throws Exception {
                this.count = count;
                this.qc = ExportQueuePublisher.this.newQueueConnection();
                this.session = ExportQueuePublisher.this.newSession(this.qc);
                ActiveMQQueue q = new ActiveMQQueue(ExportQueuePublisher.this.activemqQueues);
                this.mp = this.session.createProducer((Destination)q);
            }

            @Override
            public void run() {
                try {
                    while (this.count-- > 0) {
                        TextMessage tm = this.session.createTextMessage(AMQ5266SingleDestTest.this.getMessageText());
                        String id = UUID.randomUUID().toString();
                        tm.setStringProperty("KEY", id);
                        ExportQueuePublisher.this.ids.add(id);
                        this.mp.send((Message)tm);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }

            public void close() {
                try {
                    this.mp.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                try {
                    this.session.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                try {
                    this.qc.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }

    public class ExportQueueConsumer {
        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
        private final int totalToExpect;
        private ActiveMQConnectionFactory connectionFactory = null;
        private String activemqURL = null;
        private String activemqQueues = null;
        private String[] queues = null;
        private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
        private Map<String, List<ConsumerThread>> threads;

        public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
            this.activemqURL = activemqURL;
            this.activemqQueues = activemqQueues;
            this.totalToExpect = totalToExpect;
            this.queues = this.activemqQueues.split(",");
            for (int i = 0; i < this.queues.length; ++i) {
                this.queues[i] = this.queues[i].trim();
            }
            this.threads = new HashMap<String, List<ConsumerThread>>();
            for (String q : this.queues) {
                ArrayList<ConsumerThread> list = new ArrayList<ConsumerThread>();
                this.idsByQueue.put(q, Collections.synchronizedList(new ArrayList()));
                for (int i = 0; i < threadsPerQueue; ++i) {
                    list.add(new ConsumerThread(q, batchSize));
                }
                this.threads.put(q, list);
            }
        }

        public Map<String, List<String>> getIDs() {
            return this.idsByQueue;
        }

        public void start() throws Exception {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.start();
                }
            }
        }

        public void shutdown() throws Exception {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.shutdown();
                }
            }
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.join();
                }
            }
        }

        private Session newSession(QueueConnection queueConnection) throws Exception {
            return queueConnection.createSession(false, 1);
        }

        private synchronized QueueConnection newQueueConnection() throws Exception {
            if (this.connectionFactory == null) {
                this.connectionFactory = new ActiveMQConnectionFactory(this.amqUser, this.amqPassword, this.activemqURL);
            }
            RedeliveryPolicy policy = this.connectionFactory.getRedeliveryPolicy();
            policy.setMaximumRedeliveries(-1);
            QueueConnection amqConnection = this.connectionFactory.createQueueConnection();
            amqConnection.start();
            return amqConnection;
        }

        public boolean completed() {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    if (!ct.isAlive()) continue;
                    LOG.info("thread for {} is still alive.", (Object)ct.qName);
                    return false;
                }
            }
            return true;
        }

        private class ConsumerThread
        extends Thread {
            private int batchSize;
            private QueueConnection qc;
            private Session session;
            private MessageConsumer mc;
            private List<String> idList;
            private boolean shutdown = false;
            private String qName;

            private ConsumerThread(String queueName, int batchSize) throws Exception {
                this.batchSize = batchSize;
                this.qName = queueName;
                this.qc = ExportQueueConsumer.this.newQueueConnection();
                this.session = ExportQueueConsumer.this.newSession(this.qc);
                Queue q = this.session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
                this.mc = this.session.createConsumer((Destination)q);
                this.idList = ExportQueueConsumer.this.idsByQueue.get(queueName);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    int count = 0;
                    while (!this.shutdown) {
                        if (this.idList.size() >= ExportQueueConsumer.this.totalToExpect) {
                            LOG.info("Got {} for q: {}", (Object)this.idList.size(), (Object)this.qName);
                            break;
                        }
                        Message m = this.mc.receive(4000L);
                        if (m != null) {
                            this.idList.add(m.getStringProperty("KEY"));
                            if (++count != this.batchSize) continue;
                            count = 0;
                            continue;
                        }
                        count = 0;
                        try {
                            if (this.idList.size() >= ExportQueueConsumer.this.totalToExpect) continue;
                            LOG.info("did not receive on {}, current count: {}", (Object)this.qName, (Object)this.idList.size());
                        }
                        catch (Exception exception) {}
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    this.close();
                }
            }

            public void shutdown() {
                this.shutdown = true;
            }

            public void close() {
                try {
                    this.mc.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                try {
                    this.session.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                try {
                    this.qc.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

