/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ3405Test
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ3405Test.class);
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private MessageProducer producer;
    private int deliveryMode = 2;
    private Destination dlqDestination;
    private MessageConsumer dlqConsumer;
    private BrokerService broker;
    private int messageCount;
    private Destination destination;
    private int rollbackCount;
    private Session dlqSession;
    private final Error[] error = new Error[1];
    private boolean topic = true;
    private boolean durableSubscriber = true;

    public void testTransientTopicMessage() throws Exception {
        this.topic = true;
        this.deliveryMode = 1;
        this.durableSubscriber = true;
        this.doTest();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        PolicyEntry policy = new PolicyEntry();
        DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
        if (defaultDeadLetterStrategy != null) {
            defaultDeadLetterStrategy.setProcessNonPersistent(true);
        }
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        return broker;
    }

    protected void doTest() throws Exception {
        this.messageCount = 200;
        this.connection.start();
        final QueueViewMBean dlqView = this.getProxyToDLQ();
        ActiveMQConnection amqConnection = (ActiveMQConnection)this.connection;
        this.rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
        LOG.info("Will redeliver messages: " + this.rollbackCount + " times");
        this.makeConsumer();
        this.makeDlqConsumer();
        this.dlqConsumer.close();
        this.sendMessages();
        int maxRollbacks = this.messageCount * this.rollbackCount;
        this.consumer.setMessageListener((MessageListener)new RollbackMessageListener(maxRollbacks, this.rollbackCount));
        for (int i = 0; i < 2; ++i) {
            AMQ3405Test.assertTrue((String)"DLQ was not filled as expected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return dlqView.getQueueSize() == (long)AMQ3405Test.this.messageCount;
                }
            }));
            this.connection.stop();
            AMQ3405Test.assertEquals((String)"DLQ should be full now.", (long)this.messageCount, (long)dlqView.getQueueSize());
            String moveTo = this.topic ? "topic://" + ((Topic)this.getDestination()).getTopicName() : "queue://" + ((Queue)this.getDestination()).getQueueName();
            LOG.debug("Moving " + this.messageCount + " messages from ActiveMQ.DLQ to " + moveTo);
            dlqView.moveMatchingMessagesTo("", moveTo);
            AMQ3405Test.assertTrue((String)"DLQ was not emptied as expected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return dlqView.getQueueSize() == 0L;
                }
            }));
            this.connection.start();
        }
    }

    protected void makeConsumer() throws JMSException {
        Destination destination = this.getDestination();
        LOG.info("Consuming from: " + destination);
        this.consumer = this.durableSubscriber ? this.session.createDurableSubscriber((Topic)destination, destination.toString()) : this.session.createConsumer(destination);
    }

    protected void makeDlqConsumer() throws JMSException {
        this.dlqDestination = this.createDlqDestination();
        LOG.info("Consuming from dead letter on: " + this.dlqDestination);
        this.dlqConsumer = this.dlqSession.createConsumer(this.dlqDestination);
    }

    protected void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connection = this.createConnection();
        this.connection.setClientID(this.createClientId());
        this.session = this.connection.createSession(true, 1);
        this.connection.start();
        this.dlqSession = this.connection.createSession(false, 1);
    }

    protected void tearDown() throws Exception {
        this.dlqConsumer.close();
        this.dlqSession.close();
        this.session.close();
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory answer = super.createConnectionFactory();
        RedeliveryPolicy policy = new RedeliveryPolicy();
        policy.setMaximumRedeliveries(3);
        policy.setBackOffMultiplier(1.0);
        policy.setRedeliveryDelay(0L);
        policy.setInitialRedeliveryDelay(0L);
        policy.setUseExponentialBackOff(false);
        answer.setRedeliveryPolicy(policy);
        return answer;
    }

    protected void sendMessages() throws JMSException {
        Session session = this.connection.createSession(false, 1);
        this.producer = session.createProducer(this.getDestination());
        this.producer.setDeliveryMode(this.deliveryMode);
        LOG.info("Sending " + this.messageCount + " messages to: " + this.getDestination());
        for (int i = 0; i < this.messageCount; ++i) {
            TextMessage message = this.createMessage(session, i);
            this.producer.send((Message)message);
        }
    }

    protected TextMessage createMessage(Session session, int i) throws JMSException {
        return session.createTextMessage(this.getMessageText(i));
    }

    protected String getMessageText(int i) {
        return "message: " + i;
    }

    protected Destination createDlqDestination() {
        return new ActiveMQQueue("ActiveMQ.DLQ");
    }

    private QueueViewMBean getProxyToDLQ() throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=ActiveMQ.DLQ");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }

    protected Destination getDestination() {
        if (this.destination == null) {
            this.destination = this.createDestination();
        }
        return this.destination;
    }

    protected String createClientId() {
        return this.toString();
    }

    class RollbackMessageListener
    implements MessageListener {
        final int maxRollbacks;
        final int deliveryCount;
        final AtomicInteger rollbacks = new AtomicInteger();

        RollbackMessageListener(int c, int delvery) {
            this.maxRollbacks = c;
            this.deliveryCount = delvery;
        }

        public void onMessage(Message message) {
            try {
                int expectedMessageId = this.rollbacks.get() / this.deliveryCount;
                LOG.info("expecting messageId: " + expectedMessageId);
                this.rollbacks.incrementAndGet();
                AMQ3405Test.this.session.rollback();
            }
            catch (Throwable e) {
                LOG.error("unexpected exception:" + e, e);
                if (e instanceof Error) {
                    AMQ3405Test.this.error[0] = (Error)e;
                }
                TestCase.fail((String)("unexpected exception: " + e));
            }
        }
    }
}

