/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueDuplicatesFromStoreTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(QueueDuplicatesFromStoreTest.class);
    ActiveMQQueue destination = new ActiveMQQueue("queue-" + QueueDuplicatesFromStoreTest.class.getSimpleName());
    BrokerService brokerService;
    static final String mesageIdRoot = "11111:22222:";
    final int messageBytesSize = 256;
    final String text = new String(new byte[256]);
    final int ackStartIndex = 100;
    final int ackWindow = 50;
    final int ackBatchSize = 50;
    final int fullWindow = 200;
    protected int count = 5000;

    public void setUp() throws Exception {
        this.brokerService = this.createBroker();
        this.brokerService.setUseJmx(false);
        this.brokerService.deleteAllMessages();
        this.brokerService.start();
    }

    protected BrokerService createBroker() throws Exception {
        return new BrokerService();
    }

    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
        this.doTestNoDuplicateAfterCacheFullAndAcked(10240);
    }

    public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
        this.doTestNoDuplicateAfterCacheFullAndAcked(512);
    }

    public void doTestNoDuplicateAfterCacheFullAndAcked(int auditDepth) throws Exception {
        PersistenceAdapter persistenceAdapter = this.brokerService.getPersistenceAdapter();
        MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(this.destination);
        ConnectionContext contextNotInTx = new ConnectionContext();
        final ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, (ActiveMQDestination)this.destination, queueMessageStore, destinationStatistics, this.brokerService.getTaskRunnerFactory());
        queue.systemUsage.getMemoryUsage().setLimit(0xA00000L);
        queue.setMaxAuditDepth(auditDepth);
        queue.initialize();
        queue.start();
        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
        ProducerInfo producerInfo = new ProducerInfo();
        ProducerState producerState = new ProducerState(producerInfo);
        producerExchange.setProducerState(producerState);
        producerExchange.setConnectionContext(contextNotInTx);
        final CountDownLatch receivedLatch = new CountDownLatch(this.count);
        final AtomicLong ackedCount = new AtomicLong(0L);
        final AtomicLong enqueueCounter = new AtomicLong(0L);
        final Vector errors = new Vector();
        for (int i = 0; i < this.count; ++i) {
            Message message = this.getMessage(i);
            queue.send(producerExchange, message);
        }
        QueueDuplicatesFromStoreTest.assertEquals((String)"store count is correct", (int)this.count, (int)queueMessageStore.getMessageCount());
        Subscription subscription = new Subscription(){
            private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();

            public void add(MessageReference node) throws Exception {
                if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
                    errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + node.getMessageId().getProducerSequenceId());
                }
                TestCase.assertEquals((String)"is in order", (long)enqueueCounter.get(), (long)node.getMessageId().getProducerSequenceId());
                receivedLatch.countDown();
                enqueueCounter.incrementAndGet();
                node.decrementReferenceCount();
            }

            public void add(ConnectionContext context, Destination destination) throws Exception {
            }

            public int countBeforeFull() {
                if (this.isFull()) {
                    return 0;
                }
                return 200 - (int)(enqueueCounter.get() - ackedCount.get());
            }

            public void destroy() {
            }

            public void gc() {
            }

            public ConsumerInfo getConsumerInfo() {
                return consumerInfo;
            }

            public ConnectionContext getContext() {
                return null;
            }

            public long getDequeueCounter() {
                return 0L;
            }

            public long getDispatchedCounter() {
                return 0L;
            }

            public int getDispatchedQueueSize() {
                return 0;
            }

            public long getEnqueueCounter() {
                return 0L;
            }

            public int getInFlightSize() {
                return 0;
            }

            public int getInFlightUsage() {
                return 0;
            }

            public ObjectName getObjectName() {
                return null;
            }

            public int getPendingQueueSize() {
                return 0;
            }

            public long getPendingMessageSize() {
                return 0L;
            }

            public int getPrefetchSize() {
                return 0;
            }

            public String getSelector() {
                return null;
            }

            public boolean isBrowser() {
                return false;
            }

            public boolean isFull() {
                return enqueueCounter.get() - ackedCount.get() >= 200L;
            }

            public boolean isHighWaterMark() {
                return false;
            }

            public boolean isLowWaterMark() {
                return false;
            }

            public boolean isRecoveryRequired() {
                return false;
            }

            public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
                return true;
            }

            public boolean matches(ActiveMQDestination destination) {
                return true;
            }

            public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
            }

            public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
                return null;
            }

            public boolean isWildcard() {
                return false;
            }

            public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
                return null;
            }

            public void setObjectName(ObjectName objectName) {
            }

            public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
            }

            public void updateConsumerPrefetch(int newPrefetch) {
            }

            public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
                return false;
            }

            public ActiveMQDestination getActiveMQDestination() {
                return QueueDuplicatesFromStoreTest.this.destination;
            }

            public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
            }

            public int getCursorMemoryHighWaterMark() {
                return 0;
            }

            public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
            }

            public boolean isSlowConsumer() {
                return false;
            }

            public void unmatched(MessageReference node) throws IOException {
            }

            public long getTimeOfLastMessageAck() {
                return 0L;
            }

            public long getConsumedCount() {
                return 0L;
            }

            public void incrementConsumedCount() {
            }

            public void resetConsumedCount() {
            }

            public SubscriptionStatistics getSubscriptionStatistics() {
                return this.subscriptionStatistics;
            }

            public long getInFlightMessageSize() {
                return this.subscriptionStatistics.getInflightMessageSize().getTotalSize();
            }
        };
        queue.addSubscription(contextNotInTx, subscription);
        int removeIndex = 0;
        do {
            long receivedCount;
            if ((receivedCount = enqueueCounter.get()) <= 100L || receivedCount < (long)(removeIndex + 50)) continue;
            int j = 0;
            while (j < 50) {
                ackedCount.incrementAndGet();
                MessageAck ack = new MessageAck();
                ack.setLastMessageId(new MessageId(mesageIdRoot + removeIndex));
                ack.setMessageCount(1);
                queue.removeMessage(contextNotInTx, subscription, (QueueMessageReference)new IndirectMessageReference(this.getMessage(removeIndex)), ack);
                queue.wakeup();
                ++j;
                ++removeIndex;
            }
            if (removeIndex % 1000 != 0) continue;
            LOG.info("acked: " + removeIndex);
            persistenceAdapter.checkpoint(true);
        } while (!receivedLatch.await(0L, TimeUnit.MILLISECONDS) && errors.isEmpty());
        QueueDuplicatesFromStoreTest.assertTrue((String)("There are no errors: " + errors), (boolean)errors.isEmpty());
        QueueDuplicatesFromStoreTest.assertEquals((long)this.count, (long)enqueueCounter.get());
        QueueDuplicatesFromStoreTest.assertEquals((String)"store count is correct", (int)(this.count - removeIndex), (int)queueMessageStore.getMessageCount());
    }

    private Message getMessage(int i) throws Exception {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setMessageId(new MessageId(mesageIdRoot + i));
        message.setDestination((ActiveMQDestination)this.destination);
        message.setPersistent(true);
        message.setResponseRequired(true);
        message.setText("Msg:" + i + " " + this.text);
        QueueDuplicatesFromStoreTest.assertEquals((long)message.getMessageId().getProducerSequenceId(), (long)i);
        return message;
    }
}

