/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4221Test
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class);
    public int PAYLOAD_SIZE_BYTES = 4096;
    public int NUM_TO_SEND = 60000;
    public int NUM_CONCURRENT_PRODUCERS = 20;
    public int QUEUE_COUNT = 1;
    public int TMP_JOURNAL_MAX_FILE_SIZE = 0xA00000;
    public int DLQ_PURGE_INTERVAL = 30000;
    public int MESSAGE_TIME_TO_LIVE = 20000;
    public int EXPIRE_SWEEP_PERIOD = 200;
    public int TMP_JOURNAL_GC_PERIOD = 50;
    public int RECEIVE_POLL_PERIOD = 4000;
    private int RECEIVE_BATCH = 5000;
    final byte[] payload = new byte[this.PAYLOAD_SIZE_BYTES];
    final AtomicInteger counter = new AtomicInteger(0);
    final HashSet<Throwable> exceptions = new HashSet();
    BrokerService brokerService;
    private String brokerUrlString;
    ExecutorService executorService = Executors.newCachedThreadPool();
    final AtomicBoolean done = new AtomicBoolean(false);
    final LinkedList<String> errorsInLog = new LinkedList();

    public static Test suite() {
        return AMQ4221Test.suite(AMQ4221Test.class);
    }

    public void setUp() throws Exception {
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger());
        AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().isMoreSpecificThan(Level.WARN)) {
                    System.err.println("Fail on error in log: " + event.getMessage());
                    AMQ4221Test.this.done.set(true);
                    AMQ4221Test.this.errorsInLog.add(event.getMessage().getFormattedMessage());
                }
            }
        };
        appender.start();
        logger.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        logger.addAppender((Appender)appender);
        this.done.set(false);
        this.errorsInLog.clear();
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")});
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new FilePendingQueueMessageStoragePolicy());
        defaultPolicy.setExpireMessagesPeriod((long)this.EXPIRE_SWEEP_PERIOD);
        defaultPolicy.setProducerFlowControl(false);
        defaultPolicy.setMemoryLimit(0x3200000L);
        this.brokerService.getSystemUsage().getMemoryUsage().setLimit(0x3200000L);
        PolicyMap destinationPolicyMap = new PolicyMap();
        destinationPolicyMap.setDefaultEntry(defaultPolicy);
        this.brokerService.setDestinationPolicy(destinationPolicyMap);
        PListStoreImpl tempDataStore = new PListStoreImpl();
        tempDataStore.setDirectory(this.brokerService.getTmpDataDirectory());
        tempDataStore.setJournalMaxFileLength(this.TMP_JOURNAL_MAX_FILE_SIZE);
        tempDataStore.setCleanupInterval((long)this.TMP_JOURNAL_GC_PERIOD);
        tempDataStore.setIndexPageSize(200);
        tempDataStore.setIndexEnablePageCaching(false);
        this.brokerService.setTempDataStore((PListStore)tempDataStore);
        this.brokerService.setAdvisorySupport(false);
        TransportConnector tcp = this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.start();
        this.brokerUrlString = tcp.getPublishableConnectString();
    }

    public void tearDown() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        this.executorService.shutdownNow();
    }

    public void testProduceConsumeExpireHalf() throws Exception {
        final Queue dlq = (Queue)AMQ4221Test.getDestination(this.brokerService, (ActiveMQDestination)new ActiveMQQueue("ActiveMQ.DLQ"));
        if (this.DLQ_PURGE_INTERVAL > 0) {
            this.executorService.execute(new Runnable(){

                @Override
                public void run() {
                    while (!AMQ4221Test.this.done.get()) {
                        try {
                            Thread.sleep(AMQ4221Test.this.DLQ_PURGE_INTERVAL);
                            LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount());
                            dlq.purge();
                        }
                        catch (InterruptedException interruptedException) {
                        }
                        catch (Throwable e) {
                            e.printStackTrace();
                            AMQ4221Test.this.exceptions.add(e);
                        }
                    }
                }
            });
        }
        final CountDownLatch latch = new CountDownLatch(this.QUEUE_COUNT);
        int i = 0;
        while (i < this.QUEUE_COUNT) {
            final int id = i++;
            this.executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        AMQ4221Test.this.doProduceConsumeExpireHalf(id, latch);
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        AMQ4221Test.this.exceptions.add(e);
                    }
                }
            });
        }
        while (!this.done.get()) {
            this.done.set(latch.await(5L, TimeUnit.SECONDS));
        }
        this.executorService.shutdown();
        this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
        AMQ4221Test.assertTrue((String)("no exceptions:" + this.exceptions), (boolean)this.exceptions.isEmpty());
        AMQ4221Test.assertTrue((String)("No ERROR in log:" + this.errorsInLog), (boolean)this.errorsInLog.isEmpty());
    }

    public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception {
        final ActiveMQQueue queue = new ActiveMQQueue("Q" + id);
        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.brokerUrlString);
        ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy();
        prefecthPolicy.setAll(0);
        factory.setPrefetchPolicy(prefecthPolicy);
        Connection connection = factory.createConnection();
        connection.start();
        final MessageConsumer consumer = connection.createSession(false, 1).createConsumer((Destination)queue, "on = 'true'");
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!AMQ4221Test.this.done.get()) {
                        Thread.sleep(AMQ4221Test.this.RECEIVE_POLL_PERIOD);
                        for (int i = 0; i < AMQ4221Test.this.RECEIVE_BATCH && !AMQ4221Test.this.done.get(); ++i) {
                            Message message = consumer.receive(1000L);
                            if (message == null) continue;
                            AMQ4221Test.this.counter.incrementAndGet();
                            if (AMQ4221Test.this.counter.get() <= 0 || AMQ4221Test.this.counter.get() % 500 != 0) continue;
                            LOG.info("received: " + AMQ4221Test.this.counter.get() + ", " + message.getJMSDestination().toString());
                        }
                    }
                }
                catch (JMSException i) {
                }
                catch (Exception e) {
                    e.printStackTrace();
                    AMQ4221Test.this.exceptions.add(e);
                }
            }
        });
        final AtomicInteger accumulator = new AtomicInteger(0);
        final CountDownLatch producersDone = new CountDownLatch(this.NUM_CONCURRENT_PRODUCERS);
        for (int i = 0; i < this.NUM_CONCURRENT_PRODUCERS; ++i) {
            this.executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Connection sendConnection = factory.createConnection();
                        sendConnection.start();
                        Session sendSession = sendConnection.createSession(false, 1);
                        MessageProducer producer = sendSession.createProducer((Destination)queue);
                        producer.setTimeToLive((long)AMQ4221Test.this.MESSAGE_TIME_TO_LIVE);
                        producer.setDeliveryMode(1);
                        while (accumulator.incrementAndGet() < AMQ4221Test.this.NUM_TO_SEND && !AMQ4221Test.this.done.get()) {
                            BytesMessage message = sendSession.createBytesMessage();
                            message.writeBytes(AMQ4221Test.this.payload);
                            message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0));
                            producer.send((Message)message);
                        }
                        producersDone.countDown();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        AMQ4221Test.this.exceptions.add(e);
                    }
                }
            });
        }
        producersDone.await(10L, TimeUnit.MINUTES);
        DestinationStatistics view = AMQ4221Test.getDestinationStatistics(this.brokerService, (ActiveMQDestination)queue);
        LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName());
        latch.countDown();
    }
}

