/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubsOfflineSelectorIndexUseTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubsOfflineSelectorIndexUseTest.class);
    public int messageCount = 400;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private List<Throwable> exceptions = new ArrayList<Throwable>();

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    @Override
    protected Connection createConnection() throws Exception {
        return this.createConnection("id");
    }

    protected Connection createConnection(String name) throws Exception {
        Connection con = super.createConnection();
        con.setClientID(name);
        con.start();
        return con;
    }

    public static Test suite() {
        return DurableSubsOfflineSelectorIndexUseTest.suite(DurableSubsOfflineSelectorIndexUseTest.class);
    }

    protected void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = (ActiveMQTopic)this.createDestination();
        this.createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.destroyBroker();
    }

    private void createBroker() throws Exception {
        this.createBroker(true);
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + this.getName(true) + ")"));
        this.broker.setBrokerName(this.getName(true));
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void initCombosForTestIndexPageUsage() {
        this.addCombinationValues("messageCount", new Integer[]{890, 900, 400});
    }

    public void testIndexPageUsage() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "true", "filter = 'true'", true);
        session.close();
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "false", "filter = 'false'", true);
        session.close();
        con.close();
        final Connection sendCon = this.createConnection("send");
        final Session sendSession = sendCon.createSession(false, 1);
        final MessageProducer producer = sendSession.createProducer(null);
        Thread sendThread = new Thread(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < DurableSubsOfflineSelectorIndexUseTest.this.messageCount; ++i) {
                        boolean filter = i % 2 == 1;
                        Message message = sendSession.createMessage();
                        message.setStringProperty("filter", filter ? "true" : "false");
                        producer.send((Destination)DurableSubsOfflineSelectorIndexUseTest.this.topic, message);
                        if (i <= 0 || i % 1000 != 0) continue;
                        LOG.info("Sent:" + i);
                    }
                    sendSession.close();
                    sendCon.close();
                }
                catch (Exception e) {
                    DurableSubsOfflineSelectorIndexUseTest.this.exceptions.add(e);
                }
            }
        };
        sendThread.start();
        sendThread.join();
        TimeUnit.SECONDS.sleep(4L);
        con = this.createConnection();
        session = con.createSession(false, 1);
        TopicSubscriber consumerTrue = session.createDurableSubscriber((Topic)this.topic, "true", "filter = 'true'", true);
        Listener listenerT = new Listener();
        consumerTrue.setMessageListener((MessageListener)listenerT);
        this.waitFor(listenerT, this.messageCount / 2);
        TopicSubscriber consumerFalse = session.createDurableSubscriber((Topic)this.topic, "false", "filter = 'false'", true);
        Listener listenerF = new Listener();
        consumerFalse.setMessageListener((MessageListener)listenerF);
        this.waitFor(listenerF, this.messageCount / 2);
        DurableSubsOfflineSelectorIndexUseTest.assertEquals((int)(this.messageCount / 2), (int)listenerT.count);
        DurableSubsOfflineSelectorIndexUseTest.assertEquals((int)(this.messageCount / 2), (int)listenerF.count);
        consumerTrue.close();
        session.unsubscribe("true");
        consumerFalse.close();
        session.unsubscribe("false");
        session.close();
        con.close();
        PersistenceAdapter persistenceAdapter = this.broker.getPersistenceAdapter();
        if (persistenceAdapter instanceof KahaDBStore) {
            final KahaDBStore store = ((KahaDBPersistenceAdapter)persistenceAdapter).getStore();
            LOG.info("Store page count: " + store.getPageFile().getPageCount());
            LOG.info("Store free page count: " + store.getPageFile().getFreePageCount());
            LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount()));
            DurableSubsOfflineSelectorIndexUseTest.assertTrue((String)"no leak of pages, always use just 10", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return 10L == store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount();
                }
            }, (long)TimeUnit.SECONDS.toMillis(10L)));
        }
    }

    private void waitFor(final Listener listener, final int count) throws Exception {
        DurableSubsOfflineSelectorIndexUseTest.assertTrue((String)"got all messages on time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return listener.count == count;
            }
        }, (long)TimeUnit.MINUTES.toMillis(10L)));
    }

    public static class Listener
    implements MessageListener {
        int count = 0;
        String id = null;

        Listener() {
        }

        public void onMessage(Message message) {
            ++this.count;
            if (this.id != null) {
                try {
                    LOG.info(this.id + ", " + message.getJMSMessageID());
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

