/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueReceiver;
import jakarta.jms.QueueSession;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4485LowLimitTest
extends JmsMultipleBrokersTestSupport {
    static final String payload = new String(new byte[10240]);
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
    final int portBase = 61600;
    int numBrokers = 8;
    final int numProducers = 30;
    final int numMessages = 1000;
    final int consumerSleepTime = 40;
    StringBuilder brokersUrl = new StringBuilder();
    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap();
    private ArrayList<Throwable> exceptions = new ArrayList();

    protected void buildUrlList() throws Exception {
        for (int i = 0; i < this.numBrokers; ++i) {
            this.brokersUrl.append("tcp://localhost:" + (61600 + i));
            if (i == this.numBrokers - 1) continue;
            this.brokersUrl.append(',');
        }
    }

    protected BrokerService createBroker(int brokerid) throws Exception {
        return this.createBroker(brokerid, true);
    }

    protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.getManagementContext().setCreateConnector(false);
        broker.setUseJmx(true);
        broker.setBrokerName("B" + brokerid);
        broker.addConnector(new URI("tcp://localhost:" + (61600 + brokerid)));
        if (addToNetwork) {
            this.addNetworkConnector(broker);
        }
        broker.setSchedulePeriodForDestinationPurge(0);
        broker.getSystemUsage().getMemoryUsage().setLimit(0x10000000L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setQueuePrefetch(1000);
        policyEntry.setMemoryLimit(0x200000L);
        policyEntry.setProducerFlowControl(false);
        policyEntry.setEnableAudit(true);
        policyEntry.setUseCache(true);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("GW.>"), (Object)policyEntry);
        PolicyEntry inPolicyEntry = new PolicyEntry();
        inPolicyEntry.setExpireMessagesPeriod(0L);
        inPolicyEntry.setQueuePrefetch(1000);
        inPolicyEntry.setMemoryLimit(0x500000L);
        inPolicyEntry.setProducerFlowControl(true);
        inPolicyEntry.setEnableAudit(true);
        inPolicyEntry.setUseCache(true);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("IN"), (Object)inPolicyEntry);
        broker.setDestinationPolicy(policyMap);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
        this.brokers.put(broker.getBrokerName(), new JmsMultipleBrokersTestSupport.BrokerItem(broker));
        return broker;
    }

    private void addNetworkConnector(BrokerService broker) throws Exception {
        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(this.brokersUrl.toString());
        networkConnectorUrl.append(')');
        for (int i = 0; i < 2; ++i) {
            DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
            nc.setName("Bridge-" + i);
            nc.setNetworkTTL(1);
            nc.setDecreaseNetworkConsumerPriority(true);
            nc.setDynamicOnly(true);
            nc.setPrefetchSize(100);
            nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQQueue("GW.*")));
            broker.addNetworkConnector((NetworkConnector)nc);
        }
    }

    public void x_testInterleavedSend() throws Exception {
        BrokerService b = this.createBroker(0, false);
        b.start();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61600");
        connectionFactory.setWatchTopicAdvisories(false);
        QueueConnection c1 = connectionFactory.createQueueConnection();
        QueueConnection c2 = connectionFactory.createQueueConnection();
        QueueConnection c3 = connectionFactory.createQueueConnection();
        c1.start();
        c2.start();
        c3.start();
        ActiveMQQueue dest = new ActiveMQQueue("IN");
        QueueSession s1 = c1.createQueueSession(true, 0);
        TextMessage txMessage = s1.createTextMessage("TX");
        final TextMessage noTxMessage = s1.createTextMessage("NO_TX");
        MessageProducer txProducer = s1.createProducer((Destination)dest);
        final MessageProducer nonTxProducer = c2.createQueueSession(false, 1).createProducer((Destination)dest);
        txProducer.send((Message)txMessage);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable(){
            final /* synthetic */ Session val$s1;
            {
                this.val$s1 = session;
            }

            @Override
            public void run() {
                try {
                    this.val$s1.commit();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    nonTxProducer.send((Message)noTxMessage);
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.shutdown();
        executorService.awaitTermination(10L, TimeUnit.MINUTES);
    }

    public void testBrokers() throws Exception {
        this.buildUrlList();
        for (int i = 0; i < this.numBrokers; ++i) {
            this.createBroker(i);
        }
        this.startAllBrokers();
        this.waitForBridgeFormation(this.numBrokers - 1);
        this.verifyPeerBrokerInfos(this.numBrokers - 1);
        final List<ConsumerState> consumerStates = this.startAllGWConsumers(this.numBrokers);
        this.startAllGWFanoutConsumers(this.numBrokers);
        LOG.info("Waiting for percolation of consumers..");
        TimeUnit.SECONDS.sleep(5L);
        LOG.info("Produce mesages..");
        long startTime = System.currentTimeMillis();
        this.produce(1000);
        AMQ4485LowLimitTest.assertTrue((String)"Got all sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                for (ConsumerState tally : consumerStates) {
                    int expected = 1000 * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
                    if (tally.accumulator.get() != expected) {
                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
                        if (tally.accumulator.get() > expected - 50) {
                            AMQ4485LowLimitTest.this.dumpQueueStat(null);
                        }
                        if (tally.expected.size() == 1) {
                            AMQ4485LowLimitTest.this.startConsumer(tally.brokerName, tally.destination);
                        }
                        return false;
                    }
                    LOG.info("got tally on " + tally.brokerName);
                }
                return true;
            }
        }, (long)60000000L, (long)20000L));
        AMQ4485LowLimitTest.assertTrue((String)("No exceptions:" + this.exceptions), (boolean)this.exceptions.isEmpty());
        LOG.info("done");
        long duration = System.currentTimeMillis() - startTime;
        LOG.info("Duration:" + TimeUtils.printDuration((double)duration));
        AMQ4485LowLimitTest.assertEquals((String)"nothing in the dlq's", (long)0L, (long)this.dumpQueueStat((ActiveMQDestination)new ActiveMQQueue("ActiveMQ.DLQ")));
    }

    private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
        int id = Integer.parseInt(brokerName.substring(1));
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (61600 + id));
        connectionFactory.setWatchTopicAdvisories(false);
        QueueConnection queueConnection = connectionFactory.createQueueConnection();
        queueConnection.start();
        queueConnection.createQueueSession(false, 1).createConsumer((Destination)destination);
        queueConnection.close();
    }

    private long dumpQueueStat(ActiveMQDestination destination) throws Exception {
        long sumTotal = 0L;
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            BrokerService brokerService = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
                if (destination == null || !objectName.toString().contains(destination.getPhysicalName())) continue;
                QueueViewMBean qViewMBean = (QueueViewMBean)brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
                LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:" + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize());
                sumTotal += qViewMBean.getQueueSize();
            }
        }
        return sumTotal;
    }

    private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
        StringBuffer compositeDest = new StringBuffer();
        for (int k = 0; k < nBrokers; ++k) {
            compositeDest.append("GW." + k);
            if (k + 1 == nBrokers) continue;
            compositeDest.append(',');
        }
        ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
        for (int id = 0; id < nBrokers; ++id) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();
            final QueueSession queueSession = queueConnection.createQueueSession(true, 0);
            final MessageProducer producer = queueSession.createProducer((Destination)compositeQ);
            queueSession.createReceiver((Queue)new ActiveMQQueue("IN")).setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    try {
                        producer.send(message);
                        queueSession.commit();
                    }
                    catch (Exception e) {
                        LOG.error("Failed to fanout to GW: " + message, (Throwable)e);
                    }
                }
            });
        }
    }

    private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
        LinkedList<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
        for (int id = 0; id < nBrokers; ++id) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();
            QueueSession queueSession = queueConnection.createQueueSession(false, 1);
            ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
            QueueReceiver queueReceiver = queueSession.createReceiver((Queue)destination);
            final ConsumerState consumerState = new ConsumerState();
            consumerState.brokerName = ((ActiveMQConnection)queueConnection).getBrokerName();
            consumerState.receiver = queueReceiver;
            consumerState.destination = destination;
            for (int j = 0; j < 1000 * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); ++j) {
                consumerState.expected.add(j);
            }
            if (!this.accumulators.containsKey(destination)) {
                this.accumulators.put(destination, new AtomicInteger(0));
            }
            consumerState.accumulator = this.accumulators.get(destination);
            queueReceiver.setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(40L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        consumerState.accumulator.incrementAndGet();
                        try {
                            consumerState.expected.remove(((ActiveMQMessage)message).getProperty("NUM"));
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Failed to commit slow receipt of " + message, (Throwable)e);
                    }
                }
            });
            consumerStates.add(consumerState);
        }
        return consumerStates;
    }

    private void produce(final int numMessages) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(30);
        final AtomicInteger toSend = new AtomicInteger(numMessages);
        for (int i = 1; i <= 30; ++i) {
            final int id = i % this.numBrokers;
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
                        connectionFactory.setWatchTopicAdvisories(false);
                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
                        queueConnection.start();
                        QueueSession queueSession = queueConnection.createQueueSession(false, 1);
                        MessageProducer producer = queueSession.createProducer(null);
                        int val = 0;
                        while ((val = toSend.decrementAndGet()) >= 0) {
                            int id2 = numMessages - val - 1;
                            ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
                            TextMessage textMessage = queueSession.createTextMessage(((ActiveMQConnection)queueConnection).getBrokerName() + "->" + id2 + " payload:" + payload);
                            textMessage.setIntProperty("NUM", id2);
                            producer.send((Destination)compositeQ, (Message)textMessage);
                        }
                        queueConnection.close();
                    }
                    catch (Throwable throwable) {
                        throwable.printStackTrace();
                        AMQ4485LowLimitTest.this.exceptions.add(throwable);
                    }
                }
            });
        }
    }

    private void verifyPeerBrokerInfo(JmsMultipleBrokersTestSupport.BrokerItem brokerItem, final int max) throws Exception {
        final BrokerService broker = brokerItem.broker;
        final RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return max == regionBroker.getPeerBrokerInfos().length;
            }
        });
        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        ArrayList<CallSite> missing = new ArrayList<CallSite>();
        for (int i = 0; i < max; ++i) {
            missing.add((CallSite)((Object)("B" + i)));
        }
        if (max != regionBroker.getPeerBrokerInfos().length) {
            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
                LOG.info(info.getBrokerName());
                missing.remove(info.getBrokerName());
            }
            LOG.info("Broker infos off.." + missing);
        }
        AMQ4485LowLimitTest.assertEquals((String)broker.getBrokerName(), (int)max, (int)regionBroker.getPeerBrokerInfos().length);
    }

    private void verifyPeerBrokerInfos(int max) throws Exception {
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)i.next(), max);
        }
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
    }

    class ConsumerState {
        AtomicInteger accumulator;
        String brokerName;
        QueueReceiver receiver;
        ActiveMQDestination destination;
        ConcurrentLinkedQueue<Integer> expected = new ConcurrentLinkedQueue();

        ConsumerState() {
        }
    }
}

