/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.policy;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.QueueSubscriptionTest;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;

@RunWith(value=BlockJUnit4ClassRunner.class)
public class RoundRobinDispatchPolicyTest
extends QueueSubscriptionTest {
    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        PolicyEntry policy = new PolicyEntry();
        policy.setDispatchPolicy((DispatchPolicy)new RoundRobinDispatchPolicy());
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        return broker;
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
        this.assertEachConsumerReceivedAtLeastXMessages(1);
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
        this.assertMessagesDividedAmongConsumers();
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
        this.assertEachConsumerReceivedAtLeastXMessages(1);
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
        this.assertMessagesDividedAmongConsumers();
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerManyConsumersFewMessages() throws Exception {
        super.testOneProducerManyConsumersFewMessages();
        this.assertMessagesDividedAmongConsumers();
    }

    @Override
    @Test(timeout=60000L)
    public void testOneProducerManyConsumersManyMessages() throws Exception {
        super.testOneProducerManyConsumersManyMessages();
        this.assertMessagesDividedAmongConsumers();
    }

    @Override
    @Test(timeout=60000L)
    public void testManyProducersManyConsumers() throws Exception {
        super.testManyProducersManyConsumers();
        this.assertMessagesDividedAmongConsumers();
    }

    @Test(timeout=60000L)
    public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
        this.createMessageConsumer(this.createConnectionFactory().createConnection(), (Destination)this.createDestination(), "JMSPriority<1");
        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
        this.assertMessagesDividedAmongConsumers();
    }

    protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception {
        this.connections.add(conn);
        Session sess = conn.createSession(false, 1);
        MessageConsumer consumer = sess.createConsumer(dest, selector);
        conn.start();
        return consumer;
    }

    public void assertMessagesDividedAmongConsumers() {
        this.assertEachConsumerReceivedAtLeastXMessages(this.messageCount * this.producerCount / this.consumerCount);
        this.assertEachConsumerReceivedAtMostXMessages(this.messageCount * this.producerCount / this.consumerCount + 1);
    }
}

