/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.usage.MemoryUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ6387Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6387Test.class);
    private final String QUEUE_NAME = "testQueue";
    private final String TOPIC_NAME = "testTopic";
    private final String SUBSCRIPTION_NAME = "subscriberId";
    private final String CLIENT_ID = "client1";
    private final int MSG_COUNT = 150;
    private ActiveMQConnectionFactory connectionFactory;
    private BrokerService brokerService;
    @Rule
    public TestName testName = new TestName();

    @Before
    public void setUp() throws Exception {
        LOG.info("=============== Starting test: {} ====================", (Object)this.testName.getMethodName());
        this.brokerService = new BrokerService();
        this.brokerService.setAdvisorySupport(false);
        this.brokerService.setPersistent(false);
        this.brokerService.setUseJmx(true);
        this.brokerService.setKeepDurableSubsActive(false);
        this.brokerService.start();
        this.connectionFactory = new ActiveMQConnectionFactory(this.brokerService.getVmConnectorURI());
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
        LOG.info("=============== Finished test: {} ====================", (Object)this.testName.getMethodName());
    }

    @Test
    public void testQueueMessagesKeptAfterDelivery() throws Exception {
        this.createDurableSubscription();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.sendBytesMessage(Queue.class);
        this.logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getQueueSubscribers().length);
        this.receiveMessages(Queue.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getQueueSubscribers().length);
        this.logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals((long)0L, (long)this.getCurrentMemoryUsage(Queue.class));
    }

    @Test
    public void testQueueMessagesKeptAfterPurge() throws Exception {
        this.createDurableSubscription();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.sendBytesMessage(Queue.class);
        this.logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getQueueSubscribers().length);
        this.getProxyToQueue("testQueue").purge();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getQueueSubscribers().length);
        this.logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals((long)0L, (long)this.getCurrentMemoryUsage(Queue.class));
    }

    @Test
    public void testDurableTopicSubscriptionMessagesKeptAfterDelivery() throws Exception {
        this.createDurableSubscription();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.sendBytesMessage(Topic.class);
        this.logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.receiveMessages(Topic.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals((long)0L, (long)this.getCurrentMemoryUsage(Topic.class));
    }

    @Test
    public void testDurableTopicSubscriptionMessagesKeptAfterUnsubscribe() throws Exception {
        this.createDurableSubscription();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.sendBytesMessage(Topic.class);
        this.logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.unsubscribeDurableSubscription();
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals((long)0L, (long)this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        this.logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals((long)0L, (long)this.getCurrentMemoryUsage(Topic.class));
    }

    private void createDurableSubscription() throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.setClientID("client1");
        Session session = connection.createSession(false, 1);
        Topic topic = session.createTopic("testTopic");
        connection.start();
        session.createDurableSubscriber(topic, "subscriberId", null, false);
        LOG.info("Created durable subscription.");
        connection.stop();
        connection.close();
    }

    private void receiveMessages(Class<? extends Destination> destType) throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.setClientID("client1");
        Session session = connection.createSession(false, 1);
        Object destination = destType.equals(Queue.class) ? session.createQueue("testQueue") : session.createTopic("testTopic");
        Object consumer = destType.equals(Queue.class) ? session.createConsumer((Destination)destination) : session.createDurableSubscriber((Topic)destination, "subscriberId", null, false);
        connection.start();
        for (int i = 0; i < 150; ++i) {
            Assert.assertNotNull((Object)consumer.receive(5000L));
        }
        connection.close();
    }

    private void sendBytesMessage(Class<? extends Destination> destType) throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        Session session = connection.createSession(false, 1);
        Object destination = destType.equals(Queue.class) ? session.createQueue("testQueue") : session.createTopic("testTopic");
        MessageProducer producer = session.createProducer((Destination)destination);
        BytesMessage bytesMessage = session.createBytesMessage();
        bytesMessage.writeBytes(new byte[0x100000]);
        producer.setDeliveryMode(2);
        for (int i = 0; i < 150; ++i) {
            producer.send((Message)bytesMessage);
        }
        connection.close();
    }

    private void unsubscribeDurableSubscription() throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.setClientID("client1");
        Session session = connection.createSession(false, 1);
        session.unsubscribe("subscriberId");
        LOG.info("Unsubscribed durable subscription.");
        connection.stop();
        connection.close();
    }

    private long getCurrentMemoryUsage(Class<? extends Destination> destType) throws Exception {
        MemoryUsage usage = destType.equals(Queue.class) ? this.brokerService.getDestination(ActiveMQDestination.createDestination((String)"testQueue", (byte)1)).getMemoryUsage() : this.brokerService.getDestination(ActiveMQDestination.createDestination((String)"testTopic", (byte)2)).getMemoryUsage();
        return usage.getUsage();
    }

    private void logBrokerMemoryUsage(Class<? extends Destination> destType) throws Exception {
        LOG.info("Memory usage: broker={}% destination={}", (Object)this.brokerService.getAdminView().getMemoryPercentUsage(), (Object)this.getCurrentMemoryUsage(destType));
    }

    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
        QueueViewMBean proxy = (QueueViewMBean)this.brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }
}

