/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedeliveryRestartWithExceptionTest
extends TestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartWithExceptionTest.class);
    ActiveMQConnection connection;
    BrokerService broker = null;
    String queueName = "redeliveryRestartQ";

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.configureBroker(this.broker, true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.broker.stop();
        super.tearDown();
    }

    protected void configureBroker(BrokerService broker, boolean throwExceptionOnUpdate) throws Exception {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setPersistJMSRedelivered(true);
        policyMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(policyMap);
        broker.setPersistenceAdapter((PersistenceAdapter)new KahaDBWithUpdateExceptionPersistenceAdapter(throwExceptionOnUpdate));
        broker.addConnector("tcp://0.0.0.0:0");
    }

    @Test
    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.prefetchPolicy.all=0");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        Queue destination = session.createQueue(this.queueName);
        this.populateDestination(10, (Destination)destination, (Connection)this.connection, true);
        TextMessage msg = null;
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        Exception expectedException = null;
        try {
            for (i = 0; i < 5; ++i) {
                msg = (TextMessage)consumer.receive(5000L);
                LOG.info("not redelivered? got: " + msg);
                RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
                RedeliveryRestartWithExceptionTest.assertTrue((String)"Should not receive the 5th message", (i < 4 ? 1 : 0) != 0);
            }
        }
        catch (Exception e) {
            LOG.info("Got expected:", (Throwable)e);
            expectedException = e;
        }
        RedeliveryRestartWithExceptionTest.assertNotNull((String)"Expecting an exception when updateMessage fails", (Object)expectedException);
        consumer.close();
        this.safeCloseConnection((Connection)this.connection);
        this.restartBroker();
        connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.prefetchPolicy.all=0");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        session = this.connection.createSession(false, 2);
        destination = session.createQueue(this.queueName);
        consumer = session.createConsumer((Destination)destination);
        for (i = 0; i < 4; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message again", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)"re delivery flag", (boolean)true, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertTrue((String)"redelivery count survives restart", (msg.getLongProperty("JMSXDeliveryCount") > 1L ? 1 : 0) != 0);
            msg.acknowledge();
        }
        for (i = 0; i < 6; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("not redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)"not a redelivery", (boolean)false, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertEquals((String)"first delivery", (long)1L, (long)msg.getLongProperty("JMSXDeliveryCount"));
            msg.acknowledge();
        }
        this.connection.close();
    }

    @Test
    public void testValidateRedeliveryFlagAfterTransientFailureConnectionDrop() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.prefetchPolicy.all=0");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        Queue destination = session.createQueue(this.queueName);
        this.populateDestination(10, (Destination)destination, (Connection)this.connection, true);
        TextMessage msg = null;
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        Exception expectedException = null;
        try {
            for (i = 0; i < 5; ++i) {
                msg = (TextMessage)consumer.receive(5000L);
                LOG.info("not redelivered? got: " + msg);
                RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
                RedeliveryRestartWithExceptionTest.assertTrue((String)"Should not receive the 5th message", (i < 4 ? 1 : 0) != 0);
            }
        }
        catch (Exception e) {
            LOG.info("Got expected:", (Throwable)e);
            expectedException = e;
        }
        RedeliveryRestartWithExceptionTest.assertNotNull((String)"Expecting an exception when updateMessage fails", (Object)expectedException);
        consumer.close();
        this.safeCloseConnection((Connection)this.connection);
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        session = this.connection.createSession(false, 2);
        destination = session.createQueue(this.queueName);
        consumer = session.createConsumer((Destination)destination);
        for (i = 0; i < 4; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message again", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)("re delivery flag on:" + i), (boolean)true, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertTrue((String)("redelivery count survives reconnect for:" + i), (msg.getLongProperty("JMSXDeliveryCount") > 1L ? 1 : 0) != 0);
            msg.acknowledge();
        }
        for (i = 0; i < 6; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("not redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)"not a redelivery", (boolean)false, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertEquals((String)"first delivery", (long)1L, (long)msg.getLongProperty("JMSXDeliveryCount"));
            msg.acknowledge();
        }
        this.connection.close();
    }

    @Test
    public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString() + "?jms.prefetchPolicy.all=0");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        Queue destination = session.createQueue(this.queueName);
        this.populateDestination(10, (Destination)destination, (Connection)this.connection, false);
        TextMessage msg = null;
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        for (i = 0; i < 5; ++i) {
            msg = (TextMessage)consumer.receive(5000L);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertFalse((String)"not redelivered", (boolean)msg.getJMSRedelivered());
        }
        ((TcpTransport)this.connection.getTransport().narrow(TcpTransport.class)).getTransportListener().onException(new IOException("Die"));
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.start();
        session = this.connection.createSession(false, 2);
        destination = session.createQueue(this.queueName);
        consumer = session.createConsumer((Destination)destination);
        for (i = 0; i < 5; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message again", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)("redelivery flag set on:" + i), (boolean)true, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertTrue((String)("redelivery count survives reconnect for:" + i), (msg.getLongProperty("JMSXDeliveryCount") > 1L ? 1 : 0) != 0);
            msg.acknowledge();
        }
        for (i = 0; i < 5; ++i) {
            msg = (TextMessage)consumer.receive(4000L);
            LOG.info("not redelivered? got: " + msg);
            RedeliveryRestartWithExceptionTest.assertNotNull((String)"got the message", (Object)msg);
            RedeliveryRestartWithExceptionTest.assertEquals((String)"not a redelivery", (boolean)false, (boolean)msg.getJMSRedelivered());
            RedeliveryRestartWithExceptionTest.assertEquals((String)"first delivery", (long)1L, (long)msg.getLongProperty("JMSXDeliveryCount"));
            msg.acknowledge();
        }
        this.connection.close();
    }

    private void restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = this.createRestartedBroker();
        this.broker.start();
    }

    private BrokerService createRestartedBroker() throws Exception {
        this.broker = new BrokerService();
        this.configureBroker(this.broker, false);
        return this.broker;
    }

    private void populateDestination(int nbMessages, Destination destination, Connection connection, boolean persistent) throws JMSException {
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(persistent ? 2 : 1);
        for (int i = 1; i <= nbMessages; ++i) {
            producer.send((Message)session.createTextMessage("<hello id='" + i + "'/>"));
        }
        producer.close();
        session.close();
    }

    private class KahaDBWithUpdateExceptionPersistenceAdapter
    implements PersistenceAdapter {
        private KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
        private boolean throwExceptionOnUpdate;

        public KahaDBWithUpdateExceptionPersistenceAdapter(boolean throwExceptionOnUpdate) {
            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
        }

        public void start() throws Exception {
            this.kahaDB.start();
        }

        public void stop() throws Exception {
            this.kahaDB.stop();
        }

        public Set<ActiveMQDestination> getDestinations() {
            return this.kahaDB.getDestinations();
        }

        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
            ProxyMessageStoreWithUpdateException proxyMessageStoreWithException = new ProxyMessageStoreWithUpdateException(this.kahaDB.createQueueMessageStore(destination), this.throwExceptionOnUpdate);
            return proxyMessageStoreWithException;
        }

        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
            ProxyTopicMessageStoreWithUpdateException proxyMessageStoreWithException = new ProxyTopicMessageStoreWithUpdateException(this.kahaDB.createTopicMessageStore(destination), this.throwExceptionOnUpdate);
            return proxyMessageStoreWithException;
        }

        public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
            return this.kahaDB.createJobSchedulerStore();
        }

        public void removeQueueMessageStore(ActiveMQQueue destination) {
            this.kahaDB.removeQueueMessageStore(destination);
        }

        public void removeTopicMessageStore(ActiveMQTopic destination) {
            this.kahaDB.removeTopicMessageStore(destination);
        }

        public TransactionStore createTransactionStore() throws IOException {
            return this.kahaDB.createTransactionStore();
        }

        public void beginTransaction(ConnectionContext context) throws IOException {
            this.kahaDB.beginTransaction(context);
        }

        public void commitTransaction(ConnectionContext context) throws IOException {
            this.kahaDB.commitTransaction(context);
        }

        public void rollbackTransaction(ConnectionContext context) throws IOException {
            this.kahaDB.rollbackTransaction(context);
        }

        public long getLastMessageBrokerSequenceId() throws IOException {
            return this.kahaDB.getLastMessageBrokerSequenceId();
        }

        public void deleteAllMessages() throws IOException {
            this.kahaDB.deleteAllMessages();
        }

        public void setUsageManager(SystemUsage usageManager) {
            this.kahaDB.setUsageManager(usageManager);
        }

        public void setBrokerName(String brokerName) {
            this.kahaDB.setBrokerName(brokerName);
        }

        public void setDirectory(File dir) {
            this.kahaDB.setDirectory(dir);
        }

        public File getDirectory() {
            return this.kahaDB.getDirectory();
        }

        public void checkpoint(boolean sync) throws IOException {
            this.kahaDB.checkpoint(sync);
        }

        public long size() {
            return this.kahaDB.size();
        }

        public long getLastProducerSequenceId(ProducerId id) throws IOException {
            return this.kahaDB.getLastProducerSequenceId(id);
        }

        public void allowIOResumption() {
            this.kahaDB.allowIOResumption();
        }
    }

    private class ProxyTopicMessageStoreWithUpdateException
    extends ProxyTopicMessageStore {
        private boolean throwExceptionOnUpdate;
        private int numBeforeException;

        public ProxyTopicMessageStoreWithUpdateException(TopicMessageStore delegate, boolean throwExceptionOnUpdate) {
            super(delegate);
            this.numBeforeException = 4;
            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void updateMessage(org.apache.activemq.command.Message message) throws IOException {
            if (this.throwExceptionOnUpdate) {
                if (this.numBeforeException <= 0) throw new IOException("Hit our simulated exception writing the update to disk");
                --this.numBeforeException;
                super.updateMessage(message);
                return;
            } else {
                super.updateMessage(message);
            }
        }
    }

    private class ProxyMessageStoreWithUpdateException
    extends ProxyMessageStore {
        private boolean throwExceptionOnUpdate;
        private int numBeforeException;

        public ProxyMessageStoreWithUpdateException(MessageStore delegate, boolean throwExceptionOnUpdate) {
            super(delegate);
            this.numBeforeException = 4;
            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
        }

        /*
         * Enabled aggressive block sorting
         */
        public void updateMessage(org.apache.activemq.command.Message message) throws IOException {
            if (!this.throwExceptionOnUpdate) {
                super.updateMessage(message);
                return;
            }
            if (this.numBeforeException > 0) {
                --this.numBeforeException;
                super.updateMessage(message);
                return;
            }
            this.throwExceptionOnUpdate = false;
            throw new IOException("Hit our simulated exception writing the update to disk");
        }
    }
}

