/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ6094Test {
    private static Logger LOG = LoggerFactory.getLogger(AMQ6094Test.class);
    private BrokerService brokerService;
    private String connectionUri;

    @Before
    public void before() throws Exception {
        this.brokerService = new BrokerService();
        TransportConnector connector = this.brokerService.addConnector("tcp://localhost:0");
        this.connectionUri = connector.getPublishableConnectString();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void after() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test
    public void testQueueMemoryUsage() throws Exception {
        int i;
        ArrayList<ThreadSlot> producerThreads = new ArrayList<ThreadSlot>();
        ArrayList<ThreadSlot> consumerThreads = new ArrayList<ThreadSlot>();
        for (i = 0; i < 4; ++i) {
            producerThreads.add(AMQ6094Test.runInThread(new UnsafeRunnable(){

                @Override
                public void run() throws Exception {
                    AMQ6094Test.producer(AMQ6094Test.this.connectionUri, "queueA");
                }
            }));
        }
        for (i = 0; i < 4; ++i) {
            consumerThreads.add(AMQ6094Test.runInThread(new UnsafeRunnable(){

                @Override
                public void run() throws Exception {
                    AMQ6094Test.consumer(AMQ6094Test.this.connectionUri, "queueA", 2500);
                }
            }));
        }
        for (int count = 0; count < 10; ++count) {
            Thread.sleep(5000L);
            int i2 = (int)(Math.random() * (double)consumerThreads.size());
            ThreadSlot slot = (ThreadSlot)consumerThreads.get(i2);
            slot.thread.interrupt();
            consumerThreads.remove(i2);
            consumerThreads.add(AMQ6094Test.runInThread(slot.runnable));
            Queue queue = (Queue)this.brokerService.getDestination((ActiveMQDestination)new ActiveMQQueue("queueA"));
            LOG.info("cursorMemoryUsage: " + queue.getMessages().getSystemUsage().getMemoryUsage().getUsage());
            LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
        }
        Queue queue = (Queue)this.brokerService.getDestination((ActiveMQDestination)new ActiveMQQueue("queueA"));
        LOG.info("cursorMemoryUsage: " + queue.getMessages().getSystemUsage().getMemoryUsage().getUsage());
        LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
        for (ThreadSlot threadSlot : producerThreads) {
            threadSlot.thread.interrupt();
            threadSlot.thread.join(4000L);
        }
        for (ThreadSlot threadSlot : consumerThreads) {
            threadSlot.thread.interrupt();
            threadSlot.thread.join(4000L);
        }
        AMQ6094Test.consumer(this.connectionUri, "queueA", 2500, true);
        LOG.info("After drain, cursorMemoryUsage: " + queue.getMessages().getSystemUsage().getMemoryUsage().getUsage());
        LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
        Assert.assertEquals((String)"Queue memory usage to 0", (long)0L, (long)queue.getMessages().getSystemUsage().getMemoryUsage().getUsage());
    }

    public static void producer(String uri, String topic) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri + "?jms.useCompression=true&jms.useAsyncSend=true&daemon=true");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(topic));
        producer.setTimeToLive(6000L);
        producer.setDeliveryMode(1);
        while (true) {
            producer.send((Message)session.createTextMessage(AMQ6094Test.msg()));
            if (!(Math.random() > 0.5)) continue;
            Thread.sleep(1L);
        }
    }

    public static void consumer(String uri, String queue, int prefetchSize) throws Exception {
        AMQ6094Test.consumer(uri, queue, prefetchSize, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void consumer(String uri, String queue, int prefetchSize, boolean drain) throws Exception {
        block8: {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri + "?jms.prefetchPolicy.queuePrefetch=" + prefetchSize + "&jms.useAsyncSend=true");
            Connection connection = null;
            try {
                connection = factory.createConnection();
                connection.start();
                Session session = connection.createSession(false, 1);
                MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(queue));
                if (drain) {
                    Message message = null;
                    while ((message = consumer.receive(4000L)) != null) {
                    }
                    break block8;
                }
                while (true) {
                    consumer.receive();
                }
            }
            finally {
                Thread.interrupted();
                if (!drain) {
                    Thread.sleep(5000L);
                }
                LOG.info("Now closing");
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }

    private static String msg() {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 100; ++i) {
            builder.append("123457890");
        }
        return builder.toString();
    }

    public static ThreadSlot runInThread(final UnsafeRunnable runnable) {
        Thread thread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    runnable.run();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        thread.start();
        ThreadSlot result = new ThreadSlot();
        result.thread = thread;
        result.runnable = runnable;
        return result;
    }

    public static class ThreadSlot {
        private UnsafeRunnable runnable;
        private Thread thread;
    }

    private static interface UnsafeRunnable {
        public void run() throws Exception;
    }
}

