/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.policy;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.policy.DeadLetterTest;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeadLetterExpiryTest
extends DeadLetterTest {
    private static final Logger LOG = LoggerFactory.getLogger(DeadLetterExpiryTest.class);

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        PolicyMap pMap = broker.getDestinationPolicy();
        PolicyEntry policy = new PolicyEntry();
        IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
        strategy.setExpiration(4000L);
        strategy.setProcessNonPersistent(true);
        policy.setDeadLetterStrategy((DeadLetterStrategy)strategy);
        pMap.put((ActiveMQDestination)new ActiveMQQueue(this.getDestinationString()), (Object)policy);
        pMap.put((ActiveMQDestination)new ActiveMQTopic(this.getDestinationString()), (Object)policy);
        SharedDeadLetterStrategy sharedLoopStrategy = new SharedDeadLetterStrategy();
        strategy.setProcessNonPersistent(true);
        sharedLoopStrategy.setExpiration(1000L);
        sharedLoopStrategy.setDeadLetterQueue((ActiveMQDestination)new ActiveMQQueue("DLQ.loop"));
        PolicyEntry buggyLoopingDLQPolicy = new PolicyEntry();
        buggyLoopingDLQPolicy.setDeadLetterStrategy((DeadLetterStrategy)sharedLoopStrategy);
        pMap.put((ActiveMQDestination)new ActiveMQQueue("loop"), (Object)buggyLoopingDLQPolicy);
        pMap.put((ActiveMQDestination)new ActiveMQQueue("DLQ.loop"), (Object)buggyLoopingDLQPolicy);
        SharedDeadLetterStrategy auditConfigured = new SharedDeadLetterStrategy();
        auditConfigured.setDeadLetterQueue((ActiveMQDestination)new ActiveMQQueue("DLQ.auditConfigured"));
        auditConfigured.setProcessNonPersistent(true);
        auditConfigured.setProcessExpired(true);
        auditConfigured.setMaxProducersToAudit(1);
        auditConfigured.setMaxAuditDepth(10);
        PolicyEntry auditConfiguredDlqPolicy = new PolicyEntry();
        auditConfiguredDlqPolicy.setDeadLetterStrategy((DeadLetterStrategy)auditConfigured);
        auditConfiguredDlqPolicy.setExpireMessagesPeriod(1000L);
        pMap.put((ActiveMQDestination)new ActiveMQQueue("Comp.One"), (Object)auditConfiguredDlqPolicy);
        pMap.put((ActiveMQDestination)new ActiveMQQueue("Comp.Two"), (Object)auditConfiguredDlqPolicy);
        PolicyEntry auditConfiguredPolicy = new PolicyEntry();
        auditConfiguredPolicy.setEnableAudit(false);
        pMap.put((ActiveMQDestination)new ActiveMQQueue("DLQ.auditConfigured"), (Object)auditConfiguredPolicy);
        PolicyEntry policyWithExpiryProcessing = pMap.getDefaultEntry();
        policyWithExpiryProcessing.setExpireMessagesPeriod(1000L);
        pMap.setDefaultEntry(policyWithExpiryProcessing);
        broker.setDestinationPolicy(pMap);
        return broker;
    }

    @Override
    protected Destination createDlqDestination() {
        String prefix = this.topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
        return new ActiveMQQueue(prefix + ((Object)((Object)this)).getClass().getName() + "." + this.getName());
    }

    @Override
    protected void doTest() throws Exception {
        this.connection.start();
        this.messageCount = 4;
        ActiveMQConnection amqConnection = (ActiveMQConnection)this.connection;
        this.rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
        LOG.info("Will redeliver messages: " + this.rollbackCount + " times");
        this.makeConsumer();
        this.sendMessages();
        for (int i = 0; i < this.messageCount; ++i) {
            this.consumeAndRollback(i);
        }
        Queue dlqQueue = (Queue)this.createDlqDestination();
        this.verifyIsDlq(dlqQueue);
        final QueueViewMBean queueViewMBean = this.getProxyToQueue(dlqQueue.getQueueName());
        DeadLetterExpiryTest.assertTrue((String)"all dlq messages expired", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Queue size:" + queueViewMBean.getQueueSize());
                return queueViewMBean.getExpiredCount() == (long)DeadLetterExpiryTest.this.messageCount;
            }
        }));
        this.makeDlqConsumer();
        DeadLetterExpiryTest.assertNull((String)"no message available", (Object)this.dlqConsumer.receive(1000L));
        final QueueViewMBean sharedDlqViewMBean = this.getProxyToQueue("ActiveMQ.DLQ");
        DeadLetterExpiryTest.assertTrue((String)"messages stay on shared dlq which has default expiration=0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Q " + sharedDlqViewMBean.getName() + " size:" + sharedDlqViewMBean.getQueueSize());
                return sharedDlqViewMBean.getQueueSize() == (long)DeadLetterExpiryTest.this.messageCount;
            }
        }));
    }

    public void testAuditConfigured() throws Exception {
        this.destination = new ActiveMQQueue("Comp.One,Comp.Two");
        this.connection.start();
        this.messageCount = 1;
        this.timeToLive = 2000L;
        this.deliveryMode = 1;
        this.sendMessages();
        this.sendMessages();
        DeadLetterExpiryTest.assertTrue((String)"all messages expired even duplicates!", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                try {
                    QueueViewMBean queueViewMBean = DeadLetterExpiryTest.this.getProxyToQueue("DLQ.auditConfigured");
                    LOG.info("Queue " + queueViewMBean.getName() + ", size:" + queueViewMBean.getQueueSize());
                    return queueViewMBean.getQueueSize() == 4L;
                }
                catch (Exception exception) {
                    return false;
                }
            }
        }));
    }

    public void testNoDLQLoop() throws Exception {
        this.destination = new ActiveMQQueue("loop");
        this.messageCount = 2;
        this.connection.start();
        ActiveMQConnection amqConnection = (ActiveMQConnection)this.connection;
        this.rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
        LOG.info("Will redeliver messages: " + this.rollbackCount + " times");
        this.makeConsumer();
        this.sendMessages();
        for (int i = 0; i < this.messageCount; ++i) {
            this.consumeAndRollback(i);
        }
        final QueueViewMBean queueViewMBean = this.getProxyToQueue("DLQ.loop");
        DeadLetterExpiryTest.assertTrue((String)"all dlq messages expired", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Queue size:" + queueViewMBean.getQueueSize());
                return queueViewMBean.getExpiredCount() == (long)DeadLetterExpiryTest.this.messageCount;
            }
        }));
        DeadLetterExpiryTest.assertEquals((String)"it should be empty", (long)0L, (long)queueViewMBean.getQueueSize());
    }

    @Override
    protected void consumeAndRollback(int messageCounter) throws Exception {
        for (int i = 0; i < this.rollbackCount; ++i) {
            Message message = this.consumer.receive(5000L);
            DeadLetterExpiryTest.assertNotNull((String)("No message received for message: " + messageCounter + " and rollback loop: " + i), (Object)message);
            this.assertMessage(message, messageCounter);
            this.session.rollback();
        }
        LOG.info("Rolled back: " + this.rollbackCount + " times");
    }

    @Override
    protected void setUp() throws Exception {
        this.transactedMode = true;
        this.deliveryMode = 2;
        this.timeToLive = 0L;
        super.setUp();
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory answer = super.createConnectionFactory();
        RedeliveryPolicy policy = new RedeliveryPolicy();
        policy.setMaximumRedeliveries(3);
        policy.setBackOffMultiplier(1.0);
        policy.setInitialRedeliveryDelay(10L);
        policy.setUseExponentialBackOff(false);
        answer.setRedeliveryPolicy(policy);
        return answer;
    }
}

