/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.CongestionControl;
import io.aeron.driver.CongestionControlUtil;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FeedbackDelayGenerator;
import io.aeron.driver.LossDetector;
import io.aeron.driver.LossHandler;
import io.aeron.driver.PublicationImagePadding3;
import io.aeron.driver.Subscribable;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.reports.LossReport;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.TermGapFiller;
import io.aeron.logbuffer.TermRebuilder;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import java.net.InetSocketAddress;
import org.agrona.UnsafeAccess;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public class PublicationImage
extends PublicationImagePadding3
implements LossHandler,
DriverManagedResource,
Subscribable {
    private long timeOfLastStateChangeNs;
    private long lastLossChangeNumber = -1L;
    private long lastSmChangeNumber = -1L;
    private volatile long beginLossChange = -1L;
    private volatile long endLossChange = -1L;
    private int lossTermId;
    private int lossTermOffset;
    private int lossLength;
    private volatile long beginSmChange = -1L;
    private volatile long endSmChange = -1L;
    private long nextSmPosition;
    private int nextSmReceiverWindowLength;
    private long timeOfLastStatusMessageNs;
    private final long correlationId;
    private final long imageLivenessTimeoutNs;
    private final int sessionId;
    private final int streamId;
    private final int positionBitsToShift;
    private final int termLengthMask;
    private final int initialTermId;
    private final boolean isReliable;
    private boolean noLongerActive;
    private volatile State state = State.INIT;
    private final NanoClock nanoClock;
    private final InetSocketAddress controlAddress;
    private final ReceiveChannelEndpoint channelEndpoint;
    private final UnsafeBuffer[] termBuffers;
    private final Position hwmPosition;
    private final LossDetector lossDetector;
    private final CongestionControl congestionControl;
    private final Position rebuildPosition;
    private final InetSocketAddress sourceAddress;
    private final AtomicCounter heartbeatsReceived;
    private final AtomicCounter statusMessagesSent;
    private final AtomicCounter nakMessagesSent;
    private final AtomicCounter flowControlUnderRuns;
    private final AtomicCounter flowControlOverRuns;
    private final AtomicCounter lossGapFills;
    private final EpochClock epochClock;
    private final RawLog rawLog;

    public PublicationImage(long correlationId, long imageLivenessTimeoutNs, ReceiveChannelEndpoint channelEndpoint, InetSocketAddress controlAddress, int sessionId, int streamId, int initialTermId, int activeTermId, int initialTermOffset, RawLog rawLog, FeedbackDelayGenerator lossFeedbackDelayGenerator, ReadablePosition[] subscriberPositions, Position hwmPosition, Position rebuildPosition, NanoClock nanoClock, EpochClock epochClock, SystemCounters systemCounters, InetSocketAddress sourceAddress, CongestionControl congestionControl, LossReport lossReport, boolean isReliable) {
        long initialPosition;
        long nowNs;
        this.correlationId = correlationId;
        this.imageLivenessTimeoutNs = imageLivenessTimeoutNs;
        this.channelEndpoint = channelEndpoint;
        this.controlAddress = controlAddress;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.rawLog = rawLog;
        this.subscriberPositions = subscriberPositions;
        this.hwmPosition = hwmPosition;
        this.rebuildPosition = rebuildPosition;
        this.sourceAddress = sourceAddress;
        this.initialTermId = initialTermId;
        this.congestionControl = congestionControl;
        this.lossReport = lossReport;
        this.isReliable = isReliable;
        this.heartbeatsReceived = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_RECEIVED);
        this.statusMessagesSent = systemCounters.get(SystemCounterDescriptor.STATUS_MESSAGES_SENT);
        this.nakMessagesSent = systemCounters.get(SystemCounterDescriptor.NAK_MESSAGES_SENT);
        this.flowControlUnderRuns = systemCounters.get(SystemCounterDescriptor.FLOW_CONTROL_UNDER_RUNS);
        this.flowControlOverRuns = systemCounters.get(SystemCounterDescriptor.FLOW_CONTROL_OVER_RUNS);
        this.lossGapFills = systemCounters.get(SystemCounterDescriptor.LOSS_GAP_FILLS);
        this.nanoClock = nanoClock;
        this.epochClock = epochClock;
        this.timeOfLastStateChangeNs = nowNs = nanoClock.nanoTime();
        this.lastPacketTimestampNs = nowNs;
        this.termBuffers = rawLog.termBuffers();
        this.lossDetector = new LossDetector(lossFeedbackDelayGenerator, this);
        int termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
        this.nextSmPosition = initialPosition = LogBufferDescriptor.computePosition((int)activeTermId, (int)initialTermOffset, (int)this.positionBitsToShift, (int)initialTermId);
        this.nextSmReceiverWindowLength = congestionControl.initialWindowLength();
        this.cleanPosition = initialPosition;
        hwmPosition.setOrdered(initialPosition);
        rebuildPosition.setOrdered(initialPosition);
    }

    public void close() {
        this.hwmPosition.close();
        this.rebuildPosition.close();
        for (ReadablePosition position : this.subscriberPositions) {
            position.close();
        }
        this.congestionControl.close();
        this.rawLog.close();
    }

    public long correlationId() {
        return this.correlationId;
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    public String channel() {
        return this.channelEndpoint.originalUriString();
    }

    public boolean matches(ReceiveChannelEndpoint channelEndpoint, int streamId) {
        return this.streamId == streamId && this.channelEndpoint == channelEndpoint;
    }

    @Override
    public void removeSubscriber(ReadablePosition subscriberPosition) {
        this.subscriberPositions = (ReadablePosition[])ArrayUtil.remove((Object[])this.subscriberPositions, (Object)subscriberPosition);
        subscriberPosition.close();
    }

    @Override
    public void addSubscriber(ReadablePosition subscriberPosition) {
        this.subscriberPositions = (ReadablePosition[])ArrayUtil.add((Object[])this.subscriberPositions, (Object)subscriberPosition);
    }

    @Override
    public void onGapDetected(int termId, int termOffset, int length) {
        long changeNumber;
        this.beginLossChange = changeNumber = this.beginLossChange + 1L;
        this.lossTermId = termId;
        this.lossTermOffset = termOffset;
        this.lossLength = length;
        this.endLossChange = changeNumber;
        if (null != this.reportEntry) {
            this.reportEntry.recordObservation(length, this.epochClock.time());
        } else if (null != this.lossReport) {
            this.reportEntry = this.lossReport.createEntry(length, this.epochClock.time(), this.sessionId, this.streamId, this.channel(), this.sourceAddress.toString());
            if (null == this.reportEntry) {
                this.lossReport = null;
            }
        }
    }

    InetSocketAddress sourceAddress() {
        return this.sourceAddress;
    }

    ReceiveChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    void removeFromDispatcher() {
        this.channelEndpoint.removePublicationImage(this);
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    void activate() {
        this.state(State.ACTIVE);
    }

    private void state(State state) {
        this.timeOfLastStateChangeNs = this.nanoClock.nanoTime();
        this.state = state;
    }

    private void scheduleStatusMessage(long nowNs, long smPosition, int receiverWindowLength) {
        long changeNumber;
        this.beginSmChange = changeNumber = this.beginSmChange + 1L;
        this.nextSmPosition = smPosition;
        this.nextSmReceiverWindowLength = receiverWindowLength;
        this.timeOfLastStatusMessageNs = nowNs;
        this.endSmChange = changeNumber;
    }

    void trackRebuild(long nowNs, long statusMessageTimeoutNs) {
        if (this.noLongerActive) {
            return;
        }
        long minSubscriberPosition = Long.MAX_VALUE;
        long maxSubscriberPosition = Long.MIN_VALUE;
        for (ReadablePosition subscriberPosition : this.subscriberPositions) {
            long position = subscriberPosition.getVolatile();
            minSubscriberPosition = Math.min(minSubscriberPosition, position);
            maxSubscriberPosition = Math.max(maxSubscriberPosition, position);
        }
        long rebuildPosition = Math.max(this.rebuildPosition.get(), maxSubscriberPosition);
        long hwmPosition = this.hwmPosition.getVolatile();
        long scanOutcome = this.lossDetector.scan(this.termBuffers[LogBufferDescriptor.indexByPosition((long)rebuildPosition, (int)this.positionBitsToShift)], rebuildPosition, hwmPosition, nowNs, this.termLengthMask, this.positionBitsToShift, this.initialTermId);
        int rebuildTermOffset = (int)rebuildPosition & this.termLengthMask;
        long newRebuildPosition = rebuildPosition - (long)rebuildTermOffset + (long)LossDetector.rebuildOffset(scanOutcome);
        this.rebuildPosition.proposeMaxOrdered(newRebuildPosition);
        long ccOutcome = this.congestionControl.onTrackRebuild(nowNs, minSubscriberPosition, this.nextSmPosition, hwmPosition, rebuildPosition, newRebuildPosition, LossDetector.lossFound(scanOutcome));
        int window = CongestionControlUtil.receiverWindowLength(ccOutcome);
        long threshold = CongestionControlUtil.positionThreshold(window);
        if (CongestionControlUtil.shouldForceStatusMessage(ccOutcome) || nowNs > this.timeOfLastStatusMessageNs + statusMessageTimeoutNs || minSubscriberPosition > this.nextSmPosition + threshold) {
            this.scheduleStatusMessage(nowNs, minSubscriberPosition, window);
            this.cleanBufferTo(minSubscriberPosition - (long)(this.termLengthMask + 1));
        }
    }

    void ifActiveGoInactive() {
        if (State.ACTIVE == this.state) {
            this.state(State.INACTIVE);
        }
    }

    int insertPacket(int termId, int termOffset, UnsafeBuffer buffer, int length) {
        boolean isHeartbeat = DataHeaderFlyweight.isHeartbeat((UnsafeBuffer)buffer, (int)length);
        long packetPosition = LogBufferDescriptor.computePosition((int)termId, (int)termOffset, (int)this.positionBitsToShift, (int)this.initialTermId);
        long proposedPosition = isHeartbeat ? packetPosition : packetPosition + (long)length;
        long windowPosition = this.nextSmPosition;
        if (!this.isFlowControlUnderRun(windowPosition, packetPosition) && !this.isFlowControlOverRun(windowPosition, proposedPosition)) {
            if (isHeartbeat) {
                if (!this.isEndOfStream && DataHeaderFlyweight.isEndOfStream((UnsafeBuffer)buffer)) {
                    this.isEndOfStream = true;
                    LogBufferDescriptor.endOfStreamPosition((UnsafeBuffer)this.rawLog.metaData(), (long)packetPosition);
                }
                this.heartbeatsReceived.incrementOrdered();
            } else {
                UnsafeBuffer termBuffer = this.termBuffers[LogBufferDescriptor.indexByPosition((long)packetPosition, (int)this.positionBitsToShift)];
                TermRebuilder.insert((UnsafeBuffer)termBuffer, (int)termOffset, (UnsafeBuffer)buffer, (int)length);
            }
            this.lastPacketTimestampNs = this.nanoClock.nanoTime();
            this.hwmPosition.proposeMaxOrdered(proposedPosition);
        }
        return length;
    }

    boolean hasActivityAndNotEndOfStream(long nowNs) {
        boolean isActive = true;
        if (nowNs > this.lastPacketTimestampNs + this.imageLivenessTimeoutNs || this.isEndOfStream && this.rebuildPosition.getVolatile() >= this.hwmPosition.get()) {
            isActive = false;
        }
        return isActive;
    }

    int sendPendingStatusMessage() {
        long changeNumber;
        int workCount = 0;
        if (State.ACTIVE == this.state && (changeNumber = this.endSmChange) != this.lastSmChangeNumber) {
            long smPosition = this.nextSmPosition;
            int receiverWindowLength = this.nextSmReceiverWindowLength;
            UnsafeAccess.UNSAFE.loadFence();
            if (changeNumber == this.beginSmChange) {
                int termId = LogBufferDescriptor.computeTermIdFromPosition((long)smPosition, (int)this.positionBitsToShift, (int)this.initialTermId);
                int termOffset = (int)smPosition & this.termLengthMask;
                this.channelEndpoint.sendStatusMessage(this.controlAddress, this.sessionId, this.streamId, termId, termOffset, receiverWindowLength, (short)0);
                this.statusMessagesSent.incrementOrdered();
                this.lastSmChangeNumber = changeNumber;
            }
            workCount = 1;
        }
        return workCount;
    }

    int processPendingLoss() {
        int workCount = 0;
        long changeNumber = this.endLossChange;
        if (changeNumber != this.lastLossChangeNumber) {
            int termId = this.lossTermId;
            int termOffset = this.lossTermOffset;
            int length = this.lossLength;
            UnsafeAccess.UNSAFE.loadFence();
            if (changeNumber == this.beginLossChange) {
                if (this.isReliable) {
                    this.channelEndpoint.sendNakMessage(this.controlAddress, this.sessionId, this.streamId, termId, termOffset, length);
                    this.nakMessagesSent.incrementOrdered();
                } else {
                    UnsafeBuffer termBuffer = this.termBuffers[LogBufferDescriptor.indexByTerm((int)this.initialTermId, (int)termId)];
                    if (TermGapFiller.tryFillGap((UnsafeBuffer)this.rawLog.metaData(), (UnsafeBuffer)termBuffer, (int)termId, (int)termOffset, (int)length)) {
                        this.lossGapFills.incrementOrdered();
                    }
                }
                this.lastLossChangeNumber = changeNumber;
            }
            workCount = 1;
        }
        return workCount;
    }

    int initiateAnyRttMeasurements(long nowNs) {
        int workCount = 0;
        if (this.congestionControl.shouldMeasureRtt(nowNs)) {
            this.channelEndpoint.sendRttMeasurement(this.controlAddress, this.sessionId, this.streamId, nowNs, 0L, true);
            workCount = 1;
        }
        return workCount;
    }

    void onRttMeasurement(RttMeasurementFlyweight header, InetSocketAddress srcAddress) {
        long nowNs = this.nanoClock.nanoTime();
        long rttInNs = nowNs - header.echoTimestampNs() - header.receptionDelta();
        this.congestionControl.onRttMeasurement(nowNs, rttInNs, srcAddress);
    }

    boolean isAcceptingSubscriptions() {
        return this.subscriberPositions.length > 0 && this.state == State.ACTIVE;
    }

    long rebuildPosition() {
        return this.rebuildPosition.get();
    }

    @Override
    public void onTimeEvent(long timeNs, long timesMs, DriverConductor conductor) {
        switch (this.state) {
            case INACTIVE: {
                if (this.isDrained() || timeNs > this.timeOfLastStateChangeNs + this.imageLivenessTimeoutNs) {
                    this.state = State.LINGER;
                    this.timeOfLastStateChangeNs = timeNs;
                    conductor.transitionToLinger(this);
                }
                this.noLongerActive = true;
                break;
            }
            case LINGER: {
                if (timeNs <= this.timeOfLastStateChangeNs + this.imageLivenessTimeoutNs) break;
                this.state = State.DONE;
                conductor.cleanupImage(this);
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return State.DONE == this.state;
    }

    public void timeOfLastStateChange(long time) {
    }

    public long timeOfLastStateChange() {
        return this.timeOfLastStateChangeNs;
    }

    public void delete() {
        this.close();
    }

    private boolean isDrained() {
        long rebuildPosition = this.rebuildPosition.get();
        for (ReadablePosition subscriberPosition : this.subscriberPositions) {
            if (subscriberPosition.getVolatile() >= rebuildPosition) continue;
            return false;
        }
        return true;
    }

    private boolean isFlowControlUnderRun(long windowPosition, long packetPosition) {
        boolean isFlowControlUnderRun;
        boolean bl = isFlowControlUnderRun = packetPosition < windowPosition;
        if (isFlowControlUnderRun) {
            this.flowControlUnderRuns.incrementOrdered();
        }
        return isFlowControlUnderRun;
    }

    private boolean isFlowControlOverRun(long windowPosition, long proposedPosition) {
        boolean isFlowControlOverRun;
        boolean bl = isFlowControlOverRun = proposedPosition > windowPosition + (long)this.nextSmReceiverWindowLength;
        if (isFlowControlOverRun) {
            this.flowControlOverRuns.incrementOrdered();
        }
        return isFlowControlOverRun;
    }

    private void cleanBufferTo(long newCleanPosition) {
        long cleanPosition = this.cleanPosition;
        int bytesForCleaning = (int)(newCleanPosition - cleanPosition);
        UnsafeBuffer dirtyTerm = this.termBuffers[LogBufferDescriptor.indexByPosition((long)cleanPosition, (int)this.positionBitsToShift)];
        int termOffset = (int)cleanPosition & this.termLengthMask;
        int length = Math.min(bytesForCleaning, dirtyTerm.capacity() - termOffset);
        if (length > 0) {
            dirtyTerm.setMemory(termOffset, length, (byte)0);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    static enum State {
        INIT,
        ACTIVE,
        INACTIVE,
        LINGER,
        DONE;

    }
}

