/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.stress;

import io.aeron.Aeron;
import io.aeron.ControlledFragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.samples.stress.CRC64;
import io.aeron.samples.stress.SimpleReservedValueSupplier;
import io.aeron.samples.stress.StressUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.Agent;

public class StressUnicastServer
implements Agent {
    private final String serverAddress;
    private final String clientAddress;
    private final ControlledFragmentAssembler unicastFragmentAssembler = new ControlledFragmentAssembler(this::unicastReqHandler);
    private final SimpleReservedValueSupplier valueSupplier = new SimpleReservedValueSupplier();
    private final CRC64 crc = new CRC64();
    private Aeron aeron;
    private Subscription unicastSubscription;
    private Publication unicastPublication;

    public StressUnicastServer(String serverAddress, String clientAddress) {
        this.serverAddress = serverAddress;
        this.clientAddress = clientAddress;
    }

    @Override
    public void onStart() {
        StressUtil.info("server=" + this.serverAddress + ", client=" + this.clientAddress);
        this.aeron = Aeron.connect();
        StressUtil.info("Connected to Aeron dir=" + this.aeron.context().aeronDirectoryName());
        this.unicastSubscription = this.aeron.addSubscription(StressUtil.unicastReqChannel(this.serverAddress).build(), 10001, StressUtil::imageAvailable, StressUtil::imageUnavailable);
        this.unicastPublication = this.aeron.addPublication(StressUtil.unicastRspChannel(this.clientAddress).build(), 10001);
        StressUtil.info("publications and subscriptions created");
    }

    @Override
    public int doWork() {
        int count = 0;
        return count += this.pollUnicast();
    }

    private int pollUnicast() {
        return this.unicastSubscription.controlledPoll(this.unicastFragmentAssembler, 1);
    }

    private ControlledFragmentHandler.Action unicastReqHandler(DirectBuffer msg, int offset, int length, Header header) {
        long correlationId = header.reservedValue();
        StressUtil.validateMessage(this.crc, msg, offset, length, correlationId);
        long result = this.unicastPublication.offer(msg, offset, length, this.valueSupplier.set(correlationId));
        return result < 0L ? ControlledFragmentHandler.Action.ABORT : ControlledFragmentHandler.Action.COMMIT;
    }

    @Override
    public String roleName() {
        return "Stress Server";
    }

    @Override
    public void onClose() {
        CloseHelper.quietCloseAll(this.unicastSubscription, this.unicastPublication, this.aeron);
    }

    public static void main(String[] args) {
        StressUnicastServer server = new StressUnicastServer(StressUtil.serverAddress(), StressUtil.clientAddress());
        server.onStart();
        try {
            while (!Thread.currentThread().isInterrupted()) {
                server.doWork();
            }
        }
        finally {
            server.onClose();
        }
    }
}

