/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PfcTimeoutTest {
    private static final Logger LOG = LoggerFactory.getLogger(PfcTimeoutTest.class);
    private static final String TRANSPORT_URL = "tcp://0.0.0.0:0";
    private static final String DESTINATION = "testQ1";

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setProducerFlowControl(true);
        pe.setMemoryLimit(10240L);
        pe.setCursorMemoryHighWaterMark(140);
        pe.setExpireMessagesPeriod(0L);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);
        broker.addConnector(TRANSPORT_URL);
        broker.start();
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactedSendWithTimeout() throws Exception {
        BrokerService broker = this.createBroker();
        broker.waitUntilStarted();
        CountDownLatch gotTimeoutException = new CountDownLatch(1);
        try {
            int sendTimeout = 5000;
            this.sendMessages(broker, gotTimeoutException, sendTimeout, 3);
            Assert.assertTrue((boolean)gotTimeoutException.await(sendTimeout * 2, TimeUnit.MILLISECONDS));
        }
        finally {
            broker.stop();
            broker.waitUntilStopped();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactedSendWithTimeoutRollbackUsage() throws Exception {
        BrokerService broker = this.createBroker();
        broker.waitUntilStarted();
        CountDownLatch gotTimeoutException = new CountDownLatch(1);
        try {
            int sendTimeout = 5000;
            int numberOfMessageSent = this.sendMessages(broker, gotTimeoutException, sendTimeout, 3);
            Assert.assertTrue((boolean)gotTimeoutException.await(sendTimeout * 2, TimeUnit.MILLISECONDS));
            this.consumeMessages(broker, numberOfMessageSent);
            QueueView queueView = this.getQueueView(broker, DESTINATION);
            long queueSize = queueView.getQueueSize();
            long memoryUsage = queueView.getCursorMemoryUsage();
            LOG.info("queueSize after test = " + queueSize);
            LOG.info("memoryUsage after test = " + memoryUsage);
            Assert.assertEquals((String)"queue size after test ", (long)0L, (long)queueSize);
            Assert.assertEquals((String)"memory size after test ", (long)0L, (long)memoryUsage);
        }
        finally {
            broker.stop();
            broker.waitUntilStopped();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int sendMessages(BrokerService broker, CountDownLatch gotTimeoutException, int sendTimeeOut, int messageCount) throws Exception {
        int numberOfMessageSent = 0;
        ActiveMQConnectionFactory connectionFactory = this.newConnectionFactory(broker);
        connectionFactory.setSendTimeout(sendTimeeOut);
        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        connection.start();
        Session producerSession = connection.createSession(true, 0);
        try {
            MessageProducer jmsProducer = producerSession.createProducer((Destination)producerSession.createQueue(DESTINATION));
            TextMessage sendMessage = producerSession.createTextMessage(this.createTextMessage(5000));
            for (int i = 0; i < messageCount; ++i) {
                jmsProducer.send((Message)sendMessage);
                producerSession.commit();
                ++numberOfMessageSent;
            }
            LOG.info(" Finished after producing : " + numberOfMessageSent);
            int n = numberOfMessageSent;
            return n;
        }
        catch (Exception ex) {
            LOG.info("Exception received producing ", (Throwable)ex);
            LOG.info("finishing after exception :" + numberOfMessageSent);
            LOG.info("rolling back current transaction ");
            gotTimeoutException.countDown();
            producerSession.rollback();
            int n = numberOfMessageSent;
            return n;
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    private String createTextMessage(int size) {
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < size; ++i) {
            buffer.append("9");
        }
        return buffer.toString();
    }

    private ActiveMQConnectionFactory newConnectionFactory(BrokerService broker) throws Exception {
        ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("admin", "admin", broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        result.setWatchTopicAdvisories(false);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int consumeMessages(BrokerService broker, int messageCount) throws Exception {
        int numberOfMessageConsumed = 0;
        ActiveMQConnectionFactory connectionFactory = this.newConnectionFactory(broker);
        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        connection.start();
        Session consumerSession = connection.createSession(false, 1);
        try {
            MessageConsumer jmsConsumer = consumerSession.createConsumer((Destination)consumerSession.createQueue(DESTINATION));
            for (int i = 0; i < messageCount; ++i) {
                jmsConsumer.receive(1000L);
                ++numberOfMessageConsumed;
            }
            LOG.info(" Finished after consuming  : " + numberOfMessageConsumed);
            int n = numberOfMessageConsumed;
            return n;
        }
        catch (Exception ex) {
            LOG.info("Exception received producing ", (Throwable)ex);
            LOG.info("finishing after exception :" + numberOfMessageConsumed);
            int n = numberOfMessageConsumed;
            return n;
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
        Map queueViews = broker.getAdminView().getBroker().getQueueViews();
        for (ObjectName key : queueViews.keySet()) {
            QueueView queueView;
            DestinationView destinationView = (DestinationView)queueViews.get(key);
            if (!(destinationView instanceof QueueView) || !(queueView = (QueueView)destinationView).getName().equals(queueName)) continue;
            return queueView;
        }
        return null;
    }
}

