/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ6059Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class);
    private BrokerService broker;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testDLQRecovery() throws Exception {
        this.sendMessage((Destination)new ActiveMQQueue("QName"));
        TimeUnit.SECONDS.sleep(3L);
        LOG.info("### Check for expired message moving to DLQ.");
        Queue dlqQueue = (Queue)this.createDlqDestination();
        this.verifyIsDlq(dlqQueue);
        final QueueViewMBean queueViewMBean = this.getProxyToQueue(dlqQueue.getQueueName());
        Assert.assertTrue((String)"The message expired", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}", new Object[]{queueViewMBean.getEnqueueCount(), queueViewMBean.getDispatchCount(), queueViewMBean.getExpiredCount(), queueViewMBean.getInFlightCount()});
                return queueViewMBean.getEnqueueCount() == 1L;
            }
        }));
        this.verifyMessageIsRecovered(dlqQueue);
        this.restartBroker();
        this.verifyIsDlq(dlqQueue);
        this.verifyMessageIsRecovered(dlqQueue);
    }

    @Test
    public void testSetDlqFlag() throws Exception {
        ActiveMQQueue toFlp = new ActiveMQQueue("QNameToFlip");
        this.sendMessage((Destination)toFlp);
        QueueViewMBean queueViewMBean = this.getProxyToQueue(toFlp.getQueueName());
        Assert.assertFalse((boolean)queueViewMBean.isDLQ());
        queueViewMBean.setDLQ(true);
        Assert.assertTrue((boolean)queueViewMBean.isDLQ());
    }

    protected BrokerService createBroker() throws Exception {
        return this.createBrokerWithDLQ(true);
    }

    private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
        BrokerService broker = new BrokerService();
        ActiveMQQueue dlq = new ActiveMQQueue("ActiveMQ.DLQ?isDLQ=true");
        broker.setDestinations(new ActiveMQDestination[]{dlq});
        PolicyMap pMap = new PolicyMap();
        SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy();
        sharedDLQStrategy.setProcessNonPersistent(true);
        sharedDLQStrategy.setProcessExpired(true);
        sharedDLQStrategy.setDeadLetterQueue((ActiveMQDestination)dlq);
        sharedDLQStrategy.setExpiration(10000L);
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setDeadLetterStrategy((DeadLetterStrategy)sharedDLQStrategy);
        defaultPolicy.setExpireMessagesPeriod(2000L);
        defaultPolicy.setUseCache(false);
        pMap.put((ActiveMQDestination)new ActiveMQQueue(">"), (Object)defaultPolicy);
        broker.setDestinationPolicy(pMap);
        if (purge) {
            broker.setDeleteAllMessagesOnStartup(true);
        }
        return broker;
    }

    private void restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = this.createBrokerWithDLQ(false);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private void verifyMessageIsRecovered(Queue dlqQueue) throws Exception, JMSException {
        Connection connection = this.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        QueueBrowser browser = session.createBrowser(dlqQueue);
        Enumeration elements = browser.getEnumeration();
        Assert.assertTrue((boolean)elements.hasMoreElements());
        Message browsed = (Message)elements.nextElement();
        Assert.assertNotNull((String)"Recover message after broker restarts", (Object)browsed);
    }

    private void sendMessage(Destination destination) throws Exception {
        Connection connection = this.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(destination);
        producer.send(destination, (Message)session.createTextMessage("DLQ message"), 2, 4, 1000L);
        connection.stop();
        LOG.info("### Send message that will expire.");
    }

    private Connection createConnection() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        return factory.createConnection();
    }

    private Destination createDlqDestination() {
        return new ActiveMQQueue("ActiveMQ.DLQ");
    }

    private void verifyIsDlq(Queue dlqQ) throws Exception {
        QueueViewMBean queueViewMBean = this.getProxyToQueue(dlqQ.getQueueName());
        Assert.assertTrue((String)"is dlq", (boolean)queueViewMBean.isDLQ());
    }

    private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }
}

