/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region.cursors;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NegativeQueueTest
extends AutoFailTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
    public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
    private static final String QUEUE_1_NAME = "conn.test.queue.1";
    private static final String QUEUE_2_NAME = "conn.test.queue.2";
    private static final long QUEUE_MEMORY_LIMIT = 0x200000L;
    private static final long MEMORY_USAGE = 400000000L;
    private static final long TEMP_USAGE = 200000000L;
    private static final long STORE_USAGE = 1000000000L;
    private static final int MESSAGE_COUNT = 2100;
    protected static final boolean TRANSACTED = true;
    protected static final boolean DEBUG = true;
    protected static int NUM_CONSUMERS = 20;
    protected static int PREFETCH_SIZE = 1000;
    protected BrokerService broker;
    protected String bindAddress = "tcp://localhost:0";

    public void testWithDefaultPrefetch() throws Exception {
        PREFETCH_SIZE = 1000;
        NUM_CONSUMERS = 20;
        this.blastAndConsume();
    }

    public void x_testWithDefaultPrefetchFiveConsumers() throws Exception {
        PREFETCH_SIZE = 1000;
        NUM_CONSUMERS = 5;
        this.blastAndConsume();
    }

    public void x_testWithDefaultPrefetchTwoConsumers() throws Exception {
        PREFETCH_SIZE = 1000;
        NUM_CONSUMERS = 2;
        this.blastAndConsume();
    }

    public void testWithDefaultPrefetchOneConsumer() throws Exception {
        PREFETCH_SIZE = 1000;
        NUM_CONSUMERS = 1;
        this.blastAndConsume();
    }

    public void testWithMediumPrefetch() throws Exception {
        PREFETCH_SIZE = 50;
        NUM_CONSUMERS = 20;
        this.blastAndConsume();
    }

    public void x_testWithSmallPrefetch() throws Exception {
        PREFETCH_SIZE = 10;
        NUM_CONSUMERS = 20;
        this.blastAndConsume();
    }

    public void testWithNoPrefetch() throws Exception {
        PREFETCH_SIZE = 1;
        NUM_CONSUMERS = 20;
        this.blastAndConsume();
    }

    public void blastAndConsume() throws Exception {
        MessageConsumer consumer;
        Session consumerSession;
        int ix;
        LOG.info(this.getName());
        ActiveMQConnectionFactory factory = this.createConnectionFactory();
        Connection proxyConnection = factory.createConnection();
        proxyConnection.start();
        Session proxySession = proxyConnection.createSession(false, 1);
        final QueueViewMBean proxyQueue1 = this.getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
        final QueueViewMBean proxyQueue2 = this.getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
        Connection producerConnection = factory.createConnection();
        producerConnection.start();
        Session session = producerConnection.createSession(true, 1);
        Queue queue = session.createQueue(QUEUE_1_NAME);
        MessageProducer producer = session.createProducer((Destination)queue);
        ArrayList<TextMessage> senderList = new ArrayList<TextMessage>();
        for (int i = 0; i < 2100; ++i) {
            TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
            senderList.add(msg);
            producer.send((Message)msg);
            session.commit();
            if (i % 100 != 0) continue;
            int index = i / 100 + 1;
            System.out.print(index - index / 10 * 10);
        }
        System.out.println("");
        System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
        System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
        System.out.println("Queue1 Memory Available = " + proxyQueue1.getMemoryLimit());
        CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
        ArrayList<Message> consumerList1 = new ArrayList<Message>();
        Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
        Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
        ArrayList<Message> consumerList2 = new ArrayList<Message>();
        for (ix = 0; ix < NUM_CONSUMERS; ++ix) {
            producerConnections2[ix] = factory.createConnection();
            producerConnections2[ix].start();
            consumerConnections1[ix] = this.getConsumerConnection((ConnectionFactory)factory);
            consumerSession = consumerConnections1[ix].createSession(true, 1);
            consumer = consumerSession.createConsumer((Destination)session.createQueue(QUEUE_1_NAME));
            consumer.setMessageListener((MessageListener)new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
        }
        latch1.await(200000L, TimeUnit.MILLISECONDS);
        System.out.println("");
        System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
        System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
        System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
        for (ix = 0; ix < NUM_CONSUMERS; ++ix) {
            consumerConnections2[ix] = this.getConsumerConnection((ConnectionFactory)factory);
            consumerSession = consumerConnections2[ix].createSession(true, 1);
            consumer = consumerSession.createConsumer((Destination)session.createQueue(QUEUE_2_NAME));
            consumer.setMessageListener((MessageListener)new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
        }
        boolean success = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean done = latch2.await(10L, TimeUnit.SECONDS);
                System.out.println("");
                System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
                System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
                System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
                System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
                System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
                return done;
            }
        }, (long)300000L);
        if (!success) {
            NegativeQueueTest.dumpAllThreads((String)"blocked waiting on 2");
        }
        NegativeQueueTest.assertTrue((String)"got all expected messages on 2", (boolean)success);
        producerConnection.close();
        for (int ix2 = 0; ix2 < NUM_CONSUMERS; ++ix2) {
            consumerConnections1[ix2].close();
            consumerConnections2[ix2].close();
            producerConnections2[ix2].close();
        }
        Thread.sleep(500L);
        System.out.println("");
        System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
        System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
        System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
        System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == proxyQueue1.getQueueSize();
            }
        });
        NegativeQueueTest.assertEquals((String)"Queue1 has gone negative,", (long)0L, (long)proxyQueue1.getQueueSize());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == proxyQueue2.getQueueSize();
            }
        });
        NegativeQueueTest.assertEquals((String)"Queue2 has gone negative,", (long)0L, (long)proxyQueue2.getQueueSize());
        proxyConnection.close();
    }

    private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
        String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }

    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
        Connection connection = fac.createConnection();
        connection.start();
        return connection;
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = this.createBroker();
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(this.bindAddress);
        Properties props = new Properties();
        props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
        props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
        cf.setProperties(props);
        return cf;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        this.configureBroker(answer);
        answer.start();
        answer.waitUntilStarted();
        this.bindAddress = ((TransportConnector)answer.getTransportConnectors().get(0)).getConnectUri().toString();
        return answer;
    }

    protected void configureBroker(BrokerService answer) throws Exception {
        PolicyEntry policy = new PolicyEntry();
        policy.setMemoryLimit(0x200000L);
        policy.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new StorePendingQueueMessageStoragePolicy());
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        answer.setDestinationPolicy(pMap);
        answer.setDeleteAllMessagesOnStartup(true);
        answer.addConnector("tcp://localhost:0");
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setLimit(400000000L);
        memoryUsage.setPercentUsageMinDelta(20);
        TempUsage tempUsage = new TempUsage();
        tempUsage.setLimit(200000000L);
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(1000000000L);
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.setMemoryUsage(memoryUsage);
        systemUsage.setTempUsage(tempUsage);
        systemUsage.setStoreUsage(storeUsage);
        answer.setSystemUsage(systemUsage);
    }

    class SessionAwareMessageListener
    implements MessageListener {
        private final List<Message> consumerList;
        private final CountDownLatch latch;
        private final Session consumerSession;
        private Session producerSession;
        private MessageProducer producer;

        public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList) {
            this(null, consumerSession, null, latch, consumerList);
        }

        public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, CountDownLatch latch, List<Message> consumerList) {
            this.consumerList = consumerList;
            this.latch = latch;
            this.consumerSession = consumerSession;
            if (producerConnection != null) {
                try {
                    this.producerSession = producerConnection.createSession(true, 1);
                    Queue queue = this.producerSession.createQueue(outQueueName);
                    this.producer = this.producerSession.createProducer((Destination)queue);
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(Message msg) {
            try {
                if (this.producer == null) {
                    Thread.sleep(50L);
                } else {
                    this.producer.send(msg);
                    this.producerSession.commit();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            List<Message> e = this.consumerList;
            synchronized (e) {
                this.consumerList.add(msg);
                if (this.consumerList.size() % 100 == 0) {
                    int index = this.consumerList.size() / 100;
                    System.out.print(index - index / 10 * 10);
                }
                if (this.consumerList.size() == 2100) {
                    this.latch.countDown();
                }
            }
            try {
                this.consumerSession.commit();
            }
            catch (JMSException e2) {
                e2.printStackTrace();
            }
        }
    }
}

