/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.Image;
import io.aeron.ImageFragmentAssembler;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchivingMediaDriver;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.RecordingDescriptorConsumer;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.Header;
import io.aeron.samples.SampleConfiguration;
import io.aeron.samples.archive.EmbeddedReplayThroughputRhsPadding;
import io.aeron.samples.archive.Samples;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.agrona.BufferUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.SystemUtil;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;
import org.agrona.console.ContinueBarrier;

public class EmbeddedReplayThroughput
extends EmbeddedReplayThroughputRhsPadding
implements AutoCloseable {
    private static final int REPLAY_STREAM_ID = 101;
    private static final String REPLAY_URI = "aeron:ipc";
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private final ArchivingMediaDriver archivingMediaDriver;
    private final Aeron aeron;
    private final AeronArchive aeronArchive;
    private final UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_LENGTH, 64));
    private int publicationSessionId;

    public static void main(String[] args) {
        SystemUtil.loadPropertiesFiles(args);
        try (EmbeddedReplayThroughput test = new EmbeddedReplayThroughput();){
            System.out.println("Making a recording for playback...");
            long recordingLength = test.makeRecording();
            System.out.println("Finding the recording...");
            long recordingId = test.findRecordingId(ChannelUri.addSessionId(CHANNEL, test.publicationSessionId));
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            do {
                System.out.printf("Replaying %,d messages%n", NUMBER_OF_MESSAGES);
                long startNs = System.nanoTime();
                test.replayRecording(recordingLength, recordingId);
                long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
                double dataRate = (double)recordingLength * 1000.0 / (double)durationMs / 1048576.0;
                double recordingMb = (double)recordingLength / 1048576.0;
                long msgRate = NUMBER_OF_MESSAGES / durationMs * 1000L;
                System.out.println("Performance inclusive of replay request and connection setup:");
                System.out.printf("Replayed %.02f MB @ %.02f MB/s - %,d msg/sec - %d byte payload + 32 byte header%n", recordingMb, dataRate, msgRate, MESSAGE_LENGTH);
            } while (barrier.await());
        }
    }

    EmbeddedReplayThroughput() {
        String archiveDirName = Archive.Configuration.archiveDirName();
        File archiveDir = "aeron-archive".equals(archiveDirName) ? Samples.createTempDir() : new File(archiveDirName);
        this.archivingMediaDriver = ArchivingMediaDriver.launch(new MediaDriver.Context().dirDeleteOnStart(true), new Archive.Context().archiveDir(archiveDir).recordingEventsEnabled(false));
        this.aeron = Aeron.connect();
        this.aeronArchive = AeronArchive.connect(new AeronArchive.Context().aeron(this.aeron));
    }

    @Override
    public void close() {
        CloseHelper.closeAll(this.aeronArchive, this.aeron, this.archivingMediaDriver, () -> this.archivingMediaDriver.archive().context().deleteDirectory(), () -> this.archivingMediaDriver.mediaDriver().context().deleteDirectory());
    }

    void onMessage(DirectBuffer buffer, int offset, int length, Header header) {
        long count = buffer.getLong(offset);
        if (count != this.messageCount) {
            throw new IllegalStateException("invalid message count=" + count + " @ " + this.messageCount);
        }
        ++this.messageCount;
    }

    /*
     * Exception decompiling
     */
    private long makeRecording() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void awaitRecordingComplete(int counterId, long position, IdleStrategy idleStrategy) {
        CountersReader counters = this.aeron.countersReader();
        idleStrategy.reset();
        while (counters.getCounterValue(counterId) < position) {
            idleStrategy.idle();
        }
    }

    private void replayRecording(long recordingLength, long recordingId) {
        try (Subscription subscription = this.aeronArchive.replay(recordingId, 0L, recordingLength, REPLAY_URI, 101);){
            IdleStrategy idleStrategy = SampleConfiguration.newIdleStrategy();
            while (!subscription.isConnected()) {
                idleStrategy.idle();
            }
            this.messageCount = 0L;
            Image image = subscription.imageAtIndex(0);
            ImageFragmentAssembler fragmentAssembler = new ImageFragmentAssembler(this::onMessage);
            while (this.messageCount < NUMBER_OF_MESSAGES) {
                int fragments = image.poll(fragmentAssembler, FRAGMENT_COUNT_LIMIT);
                if (0 == fragments && image.isClosed()) {
                    System.out.println("\n*** unexpected end of stream at message count: " + this.messageCount);
                    break;
                }
                idleStrategy.idle(fragments);
            }
        }
    }

    private long findRecordingId(String expectedChannel) {
        MutableLong foundRecordingId = new MutableLong();
        RecordingDescriptorConsumer consumer = (controlSessionId, correlationId, recordingId, startTimestamp, stopTimestamp, startPosition, stopPosition, initialTermId, segmentFileLength, termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity) -> foundRecordingId.set(recordingId);
        int recordingsFound = this.aeronArchive.listRecordingsForUri(0L, 10, expectedChannel, STREAM_ID, consumer);
        if (1 != recordingsFound) {
            throw new IllegalStateException("should have been only one recording");
        }
        return foundRecordingId.get();
    }
}

