/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ5712Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ5712Test.class);
    @Rule
    public TestName name = new TestName();
    private BrokerService brokerService;
    private Connection connection;

    @Before
    public void setUp() throws Exception {
        this.brokerService = this.createBroker();
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }

    private Connection createConnection() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        factory.setAlwaysSyncSend(true);
        return factory.createConnection();
    }

    @Test(timeout=120000L)
    public void test() throws Exception {
        this.connection = this.createConnection();
        this.connection.start();
        int MSG_COUNT = 100;
        final Session session = this.connection.createSession(false, 2);
        final Queue queue = session.createQueue(this.name.getMethodName());
        MessageProducer producer = session.createProducer((Destination)queue);
        producer.setDeliveryMode(1);
        final QueueViewMBean queueView = this.getProxyToQueue(this.name.getMethodName());
        byte[] payload = new byte[65535];
        Arrays.fill(payload, (byte)-1);
        final CountDownLatch done = new CountDownLatch(1);
        final AtomicInteger counter = new AtomicInteger();
        Thread purge = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!done.await(5L, TimeUnit.SECONDS)) {
                        if (queueView.getBlockedSends() <= 0L || queueView.getQueueSize() <= 0L) continue;
                        long queueSize = queueView.getQueueSize();
                        LOG.info("Queue send blocked at {} messages", (Object)queueSize);
                        MessageConsumer consumer = session.createConsumer((Destination)queue);
                        int i = 0;
                        while ((long)i < queueSize) {
                            Message message = consumer.receive(60000L);
                            if (message != null) {
                                counter.incrementAndGet();
                                message.acknowledge();
                            } else {
                                LOG.warn("Got null message when none as expected.");
                            }
                            ++i;
                        }
                        consumer.close();
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        purge.start();
        for (int i = 0; i < 100; ++i) {
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(payload);
            producer.send((Message)message);
            LOG.info("sent message: {}", (Object)i);
        }
        done.countDown();
        purge.join(60000L);
        if (purge.isAlive()) {
            Assert.fail((String)"Consumer thread should have read initial batch and completed.");
        }
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return queueView.getDequeueCount() == (long)counter.get();
            }
        }));
        long remainingQueued = queueView.getQueueSize();
        LOG.info("Remaining messages to consume: {}", (Object)remainingQueued);
        Assert.assertEquals((long)remainingQueued, (long)(100 - counter.get()));
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        for (int i = counter.get(); i < 100; ++i) {
            Message message = consumer.receive(5000L);
            Assert.assertNotNull((String)"Should not get null message", (Object)consumer);
            counter.incrementAndGet();
            message.acknowledge();
            LOG.info("Read message: {}", (Object)i);
        }
        Assert.assertEquals((String)"Should consume all messages", (long)100L, (long)counter.get());
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        KahaDBStore persistence = this.createStore(true);
        persistence.setJournalMaxFileLength(0x100000);
        answer.setPersistent(true);
        answer.setPersistenceAdapter((PersistenceAdapter)persistence);
        answer.setDeleteAllMessagesOnStartup(true);
        answer.getSystemUsage().getMemoryUsage().setLimit(0x600000L);
        answer.getSystemUsage().getTempUsage().setLimit(0x500000L);
        answer.getSystemUsage().getStoreUsage().setLimit(0x500000L);
        answer.setUseJmx(true);
        answer.getManagementContext().setCreateConnector(false);
        answer.setSchedulerSupport(false);
        answer.setAdvisorySupport(false);
        PListStoreImpl tempStore = (PListStoreImpl)answer.getSystemUsage().getTempUsage().getStore();
        tempStore.setCleanupInterval(10000L);
        tempStore.setJournalMaxFileLength(0x200000);
        PolicyEntry policy = new PolicyEntry();
        policy.setProducerFlowControl(false);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policy);
        answer.setDestinationPolicy(policyMap);
        return answer;
    }

    private KahaDBStore createStore(boolean delete) throws IOException {
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(new File("target/activemq-data/kahadb"));
        if (delete) {
            kaha.deleteAllMessages();
        }
        return kaha;
    }

    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
        QueueViewMBean proxy = (QueueViewMBean)this.brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }
}

