/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.state;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.common.FollowerRestorePoint;
import kafka.log.Log;
import kafka.log.Log$;
import kafka.log.TierLogSegment;
import kafka.server.LogDirFailureChannel;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.AbstractTierSegmentMetadata;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.domain.TierPartitionFence;
import kafka.tier.domain.TierPartitionForceRestore;
import kafka.tier.domain.TierPartitionUnfreezeLogStartOffset;
import kafka.tier.domain.TierRecordType;
import kafka.tier.domain.TierSegmentDeleteComplete;
import kafka.tier.domain.TierSegmentDeleteInitiate;
import kafka.tier.domain.TierSegmentUploadComplete;
import kafka.tier.domain.TierSegmentUploadInitiate;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.exceptions.TierPartitionStateIllegalListenerException;
import kafka.tier.serdes.TierPartitionStateHeader;
import kafka.tier.state.ChecksumMigration;
import kafka.tier.state.FileTierPartitionIterator;
import kafka.tier.state.FileTierPartitionStateUtils;
import kafka.tier.state.Header;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.SegmentState;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.utils.Scheduler;
import kafka.utils.checksum.Algorithm;
import kafka.utils.checksum.CheckedFileIO;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.runtime.BoxedUnit;

public class FileTierPartitionState
implements TierPartitionState,
AutoCloseable {
    static final byte CURRENT_VERSION = 6;
    private static final int ENTRY_LENGTH_SIZE = 2;
    private static final long FILE_OFFSET = 0L;
    static final long OLD_STATE_CLOSE_DELAY_MS = 3600000L;
    private static final Logger log = LoggerFactory.getLogger(FileTierPartitionState.class);
    private static final Set<TierObjectMetadata.State> FENCED_STATES = Collections.singleton(TierObjectMetadata.State.SEGMENT_FENCED);
    private final TopicPartition topicPartition;
    private final byte version;
    private final Scheduler scheduler;
    private final Object stateLock = new Object();
    private final Consumer<IOException> ioExceptionHandler;
    private volatile State state;
    private volatile TopicIdPartition topicIdPartition;
    private volatile boolean tieringEnabled;
    private volatile String basePath;
    private volatile File dir;
    private final Algorithm checksumAlgorithm;
    private final short checksumSuperBlockLength;
    public static final short SUPER_BLOCK_LENGTH_ADLER = 512;
    public static final short SUPER_BLOCK_LENGTH_NO_CHECKSUM = 0;

    public FileTierPartitionState(File dir, LogDirFailureChannel logDirFailureChannel, TopicPartition topicPartition, boolean tieringEnabled, Scheduler scheduler, boolean checksumEnabled) throws IOException {
        this(dir, logDirFailureChannel, topicPartition, tieringEnabled, 6, scheduler, checksumEnabled);
    }

    FileTierPartitionState(File dir, LogDirFailureChannel logDirFailureChannel, TopicPartition topicPartition, boolean tieringEnabled, byte version, Scheduler scheduler, boolean checksumEnabled) throws IOException {
        this.topicPartition = topicPartition;
        this.dir = dir;
        this.ioExceptionHandler = e -> logDirFailureChannel.maybeAddOfflineLogDir(this.dir().getParent(), (Function0<String>)((Function0)() -> "IOException encountered in TierPartitionState at " + this.dir().getParent()), (IOException)e);
        this.basePath = Log.tierStateFile(dir, 0L, "").getAbsolutePath();
        this.tieringEnabled = tieringEnabled;
        this.state = State.EMPTY;
        this.version = version;
        this.scheduler = scheduler;
        if (checksumEnabled) {
            this.checksumAlgorithm = Algorithm.ADLER;
            this.checksumSuperBlockLength = (short)512;
        } else {
            this.checksumAlgorithm = Algorithm.NO_CHECKSUM;
            this.checksumSuperBlockLength = 0;
        }
        this.maybeOpenChannel(false);
    }

    @Override
    public TopicPartition topicPartition() {
        return this.topicPartition;
    }

    @Override
    public Optional<TopicIdPartition> topicIdPartition() {
        return Optional.ofNullable(this.topicIdPartition);
    }

    @Override
    public boolean setTopicId(UUID topicId) throws IOException {
        if (this.topicIdPartition != null) {
            if (!this.topicIdPartition.topicId().equals(topicId)) {
                throw new IllegalStateException("Illegal reassignment of topic id. Current: " + this.topicIdPartition + " Assigned: " + topicId);
            }
            return false;
        }
        this.topicIdPartition = new TopicIdPartition(this.topicPartition.topic(), topicId, this.topicPartition.partition());
        log.info("Setting topicIdPartition {}", (Object)this.topicIdPartition);
        this.maybeOpenChannel(false);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isTieringEnabled() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.tieringEnabled && this.topicIdPartition != null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean setTieringEnabled() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            log.info("Setting tieringEnabled to true (earlier value: " + this.tieringEnabled + ")");
            this.tieringEnabled = true;
            if (!this.status().isOpen()) {
                this.maybeOpenChannel(false);
                log.info((this.status().isOpen() ? "Successfully opened " : "Not able to open ") + "the channel");
                return this.status().isOpen();
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setTieringDisabled() {
        Object object = this.stateLock;
        synchronized (object) {
            log.info("Setting tieringEnabled to false (earlier value: " + this.tieringEnabled + ")");
            this.tieringEnabled = false;
        }
    }

    @Override
    public Optional<Long> startOffset() {
        return this.state.startOffset();
    }

    @Override
    public long endOffset() {
        return this.state.endOffset();
    }

    @Override
    public long committedEndOffset() {
        return this.state.committedEndOffset();
    }

    @Override
    public long totalSize() {
        return this.state.totalSize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean flush() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.flush();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean validateChecksum() throws IOException, InstantiationException, IllegalAccessException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.validate();
        }
    }

    private static void backupState(TopicIdPartition topicIdPartition, String basePath, Path dstPath, Algorithm checksumAlgorithm) throws IOException {
        Path srcPath = FileTierPartitionState.mutableFilePath(basePath, checksumAlgorithm);
        if (!Files.exists(srcPath, new LinkOption[0])) {
            return;
        }
        Files.copy(srcPath, FileTierPartitionState.tmpFilePath(basePath, checksumAlgorithm), StandardCopyOption.REPLACE_EXISTING);
        Utils.atomicMoveWithFallback((Path)FileTierPartitionState.tmpFilePath(basePath, checksumAlgorithm), (Path)dstPath);
        log.info("Backed up mutable file from: {} to: {}, topicIdPartition={}", new Object[]{srcPath, dstPath, topicIdPartition});
    }

    @Override
    public int tierEpoch() {
        return this.state.currentEpoch();
    }

    @Override
    public File dir() {
        return this.dir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void delete() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            this.closeHandlersImpl();
            for (StateFileType type : StateFileType.values()) {
                Files.deleteIfExists(type.filePath(this.basePath, this.checksumAlgorithm));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateDir(File dir) {
        Object object = this.stateLock;
        synchronized (object) {
            this.basePath = Log.tierStateFile(dir, 0L, "").getAbsolutePath();
            this.dir = dir;
            this.state.updateBasePath(this.basePath);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TierPartitionState.RestoreResult forceRestoreState(TierPartitionForceRestore metadata, ByteBuffer targetState, TierPartitionStatus targetStatus, OffsetAndEpoch offsetAndEpoch) {
        Object object = this.stateLock;
        synchronized (object) {
            if (!this.state.status().hasError()) {
                log.warn(String.format("TierPartitionState %s was expected to be in an error status when restoring state via metadata %s with target status %s at offsetEpoch %s", new Object[]{this.state, metadata, targetStatus, offsetAndEpoch}));
                return TierPartitionState.RestoreResult.FAILED;
            }
            if (this.state.status() == TierPartitionStatus.ERROR && metadata.restoreLogStartOffset()) {
                log.warn(String.format("Cannot process metadata %s at offsetEpoch %s%n. FileTierPartitionState must be in FROZEN_LOG_START_OFFSET status if we need to process TierPartitionForceRestore event and recompute the log start offset", metadata, offsetAndEpoch));
                return TierPartitionState.RestoreResult.FAILED;
            }
            if (FileTierPartitionState.allowedSourceOffset(offsetAndEpoch, this.state.localMaterializedOffsetAndEpoch) && FileTierPartitionState.allowedStateOffset(metadata.stateOffsetAndEpoch(), this.state.restoreOffsetAndEpoch)) {
                try {
                    if (this.state.recoveryWorkflowCb == null && metadata.restoreLogStartOffset()) {
                        throw new IllegalStateException(String.format("Cannot process metadata %s at offsetEpoch %s%n. TierPartitionState needs a recovery workflow callback to process a restore state event.", metadata, offsetAndEpoch));
                    }
                    log.info("Restoring TierPartitionState for {} from object storage due to event {}", (Object)this.topicIdPartition, (Object)metadata);
                    Path restorePath = FileTierPartitionState.recoverPath(this.basePath, this.checksumAlgorithm);
                    CheckedFileIO channel = CheckedFileIO.openOrCreate(restorePath, this.checksumAlgorithm, this.checksumSuperBlockLength, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE);
                    channel.write(targetState);
                    channel.flush();
                    Optional<Header> initialHeaderOpt = FileTierPartitionState.readHeader(channel);
                    if (!initialHeaderOpt.isPresent()) {
                        throw new IllegalStateException(String.format("TierPartitionState being restored does not contain a valid header, aborting restore. Metadata %s with target status %s at offsetEpoch %s", metadata, targetState, offsetAndEpoch));
                    }
                    channel = FileTierPartitionState.maybeMigrateHeader(this.topicPartition, this.basePath, this.version, restorePath, channel, initialHeaderOpt.get(), this.checksumAlgorithm, this.checksumSuperBlockLength);
                    State newState = State.createRestoredState(this.topicPartition, this.basePath, this.version, channel, this.ioExceptionHandler, offsetAndEpoch, targetStatus, this.state.recoveryWorkflowCb, this.checksumAlgorithm, this.checksumSuperBlockLength);
                    State oldState = this.state;
                    this.safeSwapForRestoredState(offsetAndEpoch, restorePath, newState);
                    log.info("Restored TierPartitionState for {} from object storage due to event {}, old state: {} new state: {}", new Object[]{this.topicIdPartition, metadata, oldState, newState});
                    if (metadata.restoreLogStartOffset()) {
                        this.state.recoveryWorkflowCb.accept(TierPartitionState.RecoveryOperation.RECOMPUTE_MERGED_LOG_START_OFFSET);
                    }
                    oldState.setStatus(TierPartitionStatus.READ_ONLY);
                    oldState.closeListeners();
                    this.scheduleDelayedClose(oldState);
                    return TierPartitionState.RestoreResult.SUCCEEDED;
                }
                catch (IOException ioe) {
                    TierPartitionStatus previousStatus = this.state.status();
                    boolean freezeMergedLogStartOffset = previousStatus == TierPartitionStatus.FROZEN_LOG_START_OFFSET;
                    this.state.setErrorStatus(offsetAndEpoch, false, freezeMergedLogStartOffset);
                    this.ioExceptionHandler.accept(ioe);
                    throw new KafkaStorageException("Failed to restore state " + metadata + ", currentEpoch=" + this.state.currentEpoch + ", tierTopicPartitionOffsetAndEpoch=" + offsetAndEpoch + ", previousTierPartitionStatus=" + (Object)((Object)previousStatus) + ", newTierPartitionStatus=" + (Object)((Object)this.state.getStatus()), (Throwable)ioe);
                }
                catch (Exception e) {
                    TierPartitionStatus previousStatus = this.state.status();
                    boolean freezeMergedLogStartOffset = previousStatus == TierPartitionStatus.FROZEN_LOG_START_OFFSET;
                    this.state.setErrorStatus(offsetAndEpoch, false, freezeMergedLogStartOffset);
                    String logMsg = String.format("Failed to restore state %s, currentEpoch=%d, tierTopicPartitionOffsetAndEpoch=%s, previousTierPartitionStatus=%s, newTierPartitionStatus=%s", new Object[]{metadata, this.state.currentEpoch, offsetAndEpoch, previousStatus, this.state.getStatus()});
                    log.error(logMsg, (Throwable)e);
                    return TierPartitionState.RestoreResult.FAILED;
                }
            }
            log.info("Ignoring state recovery {} at offset {} as last materialized offset is {} for {}", new Object[]{metadata, offsetAndEpoch, this.state.localMaterializedOffsetAndEpoch, this.topicIdPartition});
            return TierPartitionState.RestoreResult.FAILED;
        }
    }

    @Override
    public TierPartitionState.RestoreResult processRestoreEvents(AbstractTierMetadata event, Optional<ByteBuffer> targetStateOpt, TierPartitionStatus targetStatus, OffsetAndEpoch offsetAndEpoch) {
        TierPartitionState.RestoreResult result = TierPartitionState.RestoreResult.SUCCEEDED;
        if (event instanceof TierPartitionForceRestore) {
            result = this.forceRestoreState((TierPartitionForceRestore)event, targetStateOpt.get(), targetStatus, offsetAndEpoch);
        } else if (event instanceof TierPartitionUnfreezeLogStartOffset) {
            result = this.processUnfreezeLogStartOffset((TierPartitionUnfreezeLogStartOffset)event, targetStatus, offsetAndEpoch);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TierPartitionState.RestoreResult processUnfreezeLogStartOffset(TierPartitionUnfreezeLogStartOffset metadata, TierPartitionStatus restoreStatus, OffsetAndEpoch sourceOffsetAndEpoch) throws KafkaStorageException {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state.status() != TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
                log.warn(String.format("Cannot process metadata %s (at offsetEpoch %s). TierPartitionState was expected to be in FROZEN_LOG_START_OFFSET state to be able to process this event.", metadata, sourceOffsetAndEpoch));
                return TierPartitionState.RestoreResult.FAILED;
            }
            try {
                if (FileTierPartitionState.allowedSourceOffset(sourceOffsetAndEpoch, this.state.localMaterializedOffsetAndEpoch) && FileTierPartitionState.allowedStateOffset(metadata.stateOffsetAndEpoch(), this.state.restoreOffsetAndEpoch)) {
                    if (this.state.recoveryWorkflowCb == null) {
                        throw new IllegalStateException(String.format("Cannot process metadata %s at offsetEpoch %s%n. TierPartitionState needs a recovery workflow callback to process an unfreeze log start offset event.", metadata, sourceOffsetAndEpoch));
                    }
                } else {
                    log.warn(String.format("Cannot process recovery completion metadata event %s at offset %s. State materialized till %s. restoreOffsetAndEpoch %s", metadata, sourceOffsetAndEpoch, this.state.localMaterializedOffsetAndEpoch, this.state.restoreOffsetAndEpoch));
                    return TierPartitionState.RestoreResult.FAILED;
                }
                log.info("Marking end of data recovery by unfreezing log start offset for {} with materialization of event {} (at offsetEpoch {})", new Object[]{this.topicIdPartition, metadata, sourceOffsetAndEpoch});
                this.state.setStatus(restoreStatus);
                this.state.errorStatusReachedViaFenceEvent = false;
                this.state.localMaterializedOffsetAndEpoch = sourceOffsetAndEpoch;
                this.state.dirty = true;
                this.state.flush();
                this.state.recoveryWorkflowCb.accept(TierPartitionState.RecoveryOperation.UNFREEZE_MERGED_LOG_START_OFFSET);
            }
            catch (IOException ioe) {
                this.ioExceptionHandler.accept(ioe);
                throw new KafkaStorageException("Failed to apply " + metadata + " from tierTopicPartitionOffsetAndEpoch:" + sourceOffsetAndEpoch + " due to IO error. TierPartitionStatus=" + (Object)((Object)this.state.status), (Throwable)ioe);
            }
            catch (Exception e) {
                log.error(String.format("Cannot process metadata event %s at %s%n%s", metadata, sourceOffsetAndEpoch, e.toString()));
                return TierPartitionState.RestoreResult.FAILED;
            }
            return TierPartitionState.RestoreResult.SUCCEEDED;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeHandlers() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            this.closeHandlersImpl();
        }
    }

    @Override
    public TierPartitionStatus status() {
        return this.state.status();
    }

    @Override
    public long materializationLag() {
        MaterializationListener.ReplicationTargetObjectId targetObjectIdListener;
        MaterializationListener.ReplicationTargetOffset targetOffsetListener = (MaterializationListener.ReplicationTargetOffset)this.state.listeners.get(MaterializationListener.ReplicationTargetOffset.class);
        long targetOffsetProgress = 0L;
        long targetObjectIdProgress = 0L;
        if (targetOffsetListener != null) {
            targetOffsetProgress = targetOffsetListener.materializationProgress(Math.max(0L, this.state.endOffset));
        }
        if ((targetObjectIdListener = (MaterializationListener.ReplicationTargetObjectId)this.state.listeners.get(MaterializationListener.ReplicationTargetObjectId.class)) != null) {
            targetObjectIdProgress = targetObjectIdListener.materializationProgress(Math.max(0L, this.state.endOffset));
        }
        return Math.max(targetOffsetProgress, targetObjectIdProgress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beginCatchup() {
        Object object = this.stateLock;
        synchronized (object) {
            this.state.beginCatchup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onCatchUpComplete() {
        Object object = this.stateLock;
        synchronized (object) {
            this.state.onCatchUpComplete();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int numSegments(long from, long to) {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.segmentOffsets(from, to).size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int numSegments() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.segmentOffsets().size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<TierLogSegment> materializeUptoOffset(long targetOffset) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.materializationListener(targetOffset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<TierLogSegment> materializeUptoObjectIdAndRestoreEpoch(long upperBoundEndOffset, UUID targetObjectId, int targetRestoreEpoch) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.materializationListener(upperBoundEndOffset, targetObjectId, targetRestoreEpoch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Optional<TierLogSegment>> materializeUptoLeaderEpoch(int targetEpoch) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.materializeUptoLeaderEpoch(targetEpoch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Boolean> trackMetadataInitialization(int targetLeaderEpoch) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.trackMetadataInitialization(targetLeaderEpoch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                this.state.flush();
            }
            finally {
                this.closeHandlersImpl();
                log.info("Tier partition state for {} closed.", (Object)this.topicIdPartition().map(TopicIdPartition::toString).orElse(this.topicPartition.toString()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TierPartitionState.AppendResult append(AbstractTierMetadata metadata, OffsetAndEpoch offsetAndEpoch) {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.appendMetadata(metadata, offsetAndEpoch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    TierPartitionState.AppendResult appendUnhandled(AbstractTierMetadata metadata, OffsetAndEpoch offsetAndEpoch) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.appendMetadataUnhandled(metadata, offsetAndEpoch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Iterator<TierLogSegment> segments() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.segments().stream().map(segment -> new TierLogSegment(this.topicIdPartition, (SegmentState)segment)).iterator();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Iterator<TierLogSegment> segments(long from, long to) {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.segments(from, to).stream().map(segment -> new TierLogSegment(this.topicIdPartition, (SegmentState)segment)).iterator();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<SegmentState> segmentInMemoryMetadataRange(long from, long to) {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.fetchInMemoryMetadataRange(from, to);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<SegmentState> previousMetadataBeforeOffset(long targetStartOffset) {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.previousMetadataBeforeOffset(targetStartOffset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public FollowerRestorePoint followerRestorePoint(long localLogStartOffset) {
        int restoreEpoch;
        Optional<Supplier> optIter;
        Object object = this.stateLock;
        synchronized (object) {
            optIter = this.state.previousMetadataForFollowerRestorePoint(localLogStartOffset).map(x$0 -> this.state.metadataForInMemorySegmentMetadata(x$0));
            restoreEpoch = this.state.restoreOffsetAndEpoch.epoch().orElse(-1);
        }
        return FollowerRestorePoint.apply(localLogStartOffset, optIter.flatMap(Supplier::get).map(TierObjectMetadata::objectId), restoreEpoch);
    }

    @Override
    public Optional<TierLogSegment> metadata(long targetOffset) throws IOException {
        return this.state.metadata(targetOffset).map(state -> new TierLogSegment(this.topicIdPartition, (SegmentState)state));
    }

    @Override
    public OffsetAndEpoch lastLocalMaterializedSrcOffsetAndEpoch() {
        return this.state.localMaterializedOffsetAndEpoch;
    }

    OffsetAndEpoch lastFlushedSrcOffsetAndEpoch() {
        return this.state.globalMaterializedOffsetAndEpoch;
    }

    OffsetAndEpoch restoreOffsetAndEpoch() {
        return this.state.restoreOffsetAndEpoch;
    }

    OffsetAndEpoch lastFlushedErrorOffsetAndEpoch() {
        return this.state.errorOffsetAndEpoch;
    }

    public String flushedPath() {
        return FileTierPartitionState.flushedFilePath(this.basePath, this.checksumAlgorithm).toFile().getAbsolutePath();
    }

    @Override
    public Collection<TierLogSegment> fencedSegments() {
        return this.state.metadataForStates(FileTierPartitionState.FENCED_STATES).stream().map(seg -> new TierLogSegment(this.topicIdPartition, (SegmentState)seg)).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.tieringEnabled) {
                return "FileTierPartitionState(topicIdPartition=" + this.topicIdPartition + ", state=" + this.state + ", checksumAlgorithm=" + (Object)((Object)this.checksumAlgorithm) + ")";
            }
            return "FileTierPartitionState(topicIdPartition=" + this.topicIdPartition + ", tieringEnabled=" + this.tieringEnabled + ")";
        }
    }

    public static Optional<Header> readHeader(CheckedFileIO channel) throws IOException {
        Optional<Short> headerSizeOpt = FileTierPartitionState.readHeaderSize(channel);
        if (!headerSizeOpt.isPresent()) {
            return Optional.empty();
        }
        short headerSize = headerSizeOpt.get();
        ByteBuffer headerBuf = ByteBuffer.allocate(headerSize);
        channel.read(headerBuf, 2L);
        headerBuf.flip();
        if (headerBuf.limit() != headerSize) {
            return Optional.empty();
        }
        return Optional.of(new Header(TierPartitionStateHeader.getRootAsTierPartitionStateHeader(headerBuf)));
    }

    public static Optional<FileTierPartitionIterator> iterator(TopicPartition topicPartition, CheckedFileIO channel) throws IOException {
        Optional<Header> headerOpt = FileTierPartitionState.readHeader(channel);
        if (!headerOpt.isPresent()) {
            return Optional.empty();
        }
        return Optional.of(FileTierPartitionState.iterator(new TopicIdPartition(topicPartition.topic(), headerOpt.get().topicId(), topicPartition.partition()), channel, headerOpt.get().size()));
    }

    byte version() {
        return this.version;
    }

    String basePath() {
        return this.basePath;
    }

    private static FileTierPartitionIterator iterator(TopicIdPartition topicIdPartition, CheckedFileIO channel, long position) throws IOException {
        return new FileTierPartitionIterator(topicIdPartition, channel, position);
    }

    private void scheduleDelayedClose(State state) {
        this.scheduler.schedule("FileTierPartitionState_oldState_close", (Function0<BoxedUnit>)((Function0)() -> {
            log.info("Closing an earlier tier partition state that was already replaced in a restore operation, for partition: " + this.topicIdPartition);
            try {
                state.close();
            }
            catch (IOException ioe) {
                this.ioExceptionHandler.accept(ioe);
            }
            return null;
        }), 3600000L, -1L, TimeUnit.MILLISECONDS);
    }

    private void safeSwapForRestoredState(OffsetAndEpoch offsetAndEpoch, Path restorePath, State newState) throws IOException {
        boolean movedMutableFile = false;
        boolean movedRestoreFile = false;
        try {
            Utils.atomicMoveWithFallback((Path)FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm), (Path)FileTierPartitionState.discardedFilePath(this.basePath, offsetAndEpoch, this.checksumAlgorithm), (boolean)false);
            movedMutableFile = true;
            Utils.atomicMoveWithFallback((Path)restorePath, (Path)FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm), (boolean)false);
            movedRestoreFile = true;
            newState.flush();
        }
        catch (Exception e) {
            if (movedRestoreFile) {
                Utils.atomicMoveWithFallback((Path)FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm), (Path)restorePath, (boolean)false);
            }
            if (movedMutableFile) {
                Utils.atomicMoveWithFallback((Path)FileTierPartitionState.discardedFilePath(this.basePath, offsetAndEpoch, this.checksumAlgorithm), (Path)FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm), (boolean)false);
            }
            newState.close();
            throw e;
        }
        finally {
            Utils.flushDir((Path)FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm).toAbsolutePath().normalize().getParent());
        }
        this.state = newState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean mayContainTieredData() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.topicIdPartition != null && this.state.status.isOpen();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean maybeOpenChannelOnOffsetTieredException() throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            if (!this.state.status.isOpen()) {
                this.maybeOpenChannel(true);
                if (!this.state.status.isOpen()) {
                    throw new IllegalStateException("Could not open TierPartitionState channel. Current state is " + (Object)((Object)this.status()));
                }
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void maybeOpenChannel(boolean onOffsetTiered) throws IOException {
        Object object = this.stateLock;
        synchronized (object) {
            Path flushedFilePath = FileTierPartitionState.flushedFilePath(this.basePath, this.checksumAlgorithm);
            ChecksumMigration.maybeMigrateChecksumFormat(this.checksumAlgorithm, this.checksumSuperBlockLength, flushedFilePath);
            if ((this.tieringEnabled || Files.exists(flushedFilePath, new LinkOption[0]) || onOffsetTiered) && !this.state.status.isOpen()) {
                if (!Files.exists(flushedFilePath, new LinkOption[0])) {
                    CheckedFileIO.create(flushedFilePath, this.checksumAlgorithm, this.checksumSuperBlockLength);
                }
                Path mutableFilePath = FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm);
                ChecksumMigration.maybeRemovePreviousFormatPath(mutableFilePath);
                Files.copy(flushedFilePath, mutableFilePath, StandardCopyOption.REPLACE_EXISTING);
                CheckedFileIO channel = FileTierPartitionState.getChannelMaybeReinitialize(this.topicPartition, this.topicIdPartition, this.basePath, this.version, this.checksumAlgorithm, this.checksumSuperBlockLength);
                if (channel == null) {
                    this.state = State.EMPTY;
                    return;
                }
                try {
                    this.state = new State(this.topicPartition, this.basePath, this.version, channel, this.ioExceptionHandler, this.checksumAlgorithm, this.checksumSuperBlockLength);
                    this.topicIdPartition = this.state.topicIdPartition;
                    log.info("Opened tier partition state {}", (Object)this);
                }
                catch (Exception e) {
                    try {
                        FileTierPartitionState.backupState(this.topicIdPartition, this.basePath, FileTierPartitionState.errorFilePath(this.basePath, this.checksumAlgorithm), this.checksumAlgorithm);
                        this.closeHandlersImpl();
                    }
                    catch (Exception exceptionToIgnore) {
                        log.warn("Failed to backup / close tier partition state for {}", (Object)this.topicIdPartition, (Object)exceptionToIgnore);
                    }
                    IOException ioexp = new IOException("Exception in initializing TierMetadataState for " + this.topicIdPartition, e);
                    this.ioExceptionHandler.accept(ioexp);
                    throw new KafkaStorageException((Throwable)ioexp);
                }
            }
        }
    }

    private void closeHandlersImpl() throws IOException {
        if (this.state.status != TierPartitionStatus.UNINITIALIZED) {
            try {
                this.state.close();
            }
            finally {
                this.state = State.EMPTY;
            }
        }
    }

    public static void writeHeader(CheckedFileIO channel, Header header) throws IOException {
        int remaining = header.payloadBuffer().remaining();
        short sizePrefix = (short)remaining;
        if (sizePrefix != remaining) {
            throw new IllegalStateException(String.format("Unexpected header size: %d", remaining));
        }
        ByteBuffer sizeBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        sizeBuf.putShort(sizePrefix);
        sizeBuf.flip();
        channel.write(sizeBuf, 0L);
        channel.write(header.payloadBuffer(), 2L);
    }

    private static Optional<Short> readHeaderSize(CheckedFileIO channel) throws IOException {
        ByteBuffer headerPrefixBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        channel.read(headerPrefixBuf, 0L);
        headerPrefixBuf.flip();
        if (headerPrefixBuf.limit() == 2) {
            return Optional.of(headerPrefixBuf.getShort());
        }
        return Optional.empty();
    }

    private static void copy(CheckedFileIO src, CheckedFileIO dest) throws IOException {
        long srcSize = src.size();
        long position = src.position();
        src.transferTo(position, srcSize - position, dest);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private static CheckedFileIO getChannelMaybeReinitialize(TopicPartition topicPartition, TopicIdPartition topicIdPartition, String basePath, byte version, Algorithm checksumAlgorithm, short checksumSuperBlockLength) throws IOException {
        Path mutableFilePath = FileTierPartitionState.mutableFilePath(basePath, checksumAlgorithm);
        CheckedFileIO channel = CheckedFileIO.openOrCreate(mutableFilePath, checksumAlgorithm, checksumSuperBlockLength, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
        try {
            Optional<Header> initialHeaderOpt = FileTierPartitionState.readHeader(channel);
            if (initialHeaderOpt.isPresent()) return FileTierPartitionState.maybeMigrateHeader(topicPartition, basePath, version, mutableFilePath, channel, initialHeaderOpt.get(), checksumAlgorithm, checksumSuperBlockLength);
            if (topicIdPartition != null) {
                log.info("Writing new header to tier partition state for {}", (Object)topicIdPartition);
                channel.truncate(0L);
                FileTierPartitionState.writeHeader(channel, new Header(topicIdPartition.topicId(), version, -1, TierPartitionStatus.INIT, -1L, -1L, OffsetAndEpoch.EMPTY, OffsetAndEpoch.EMPTY, OffsetAndEpoch.EMPTY, OffsetAndEpoch.EMPTY));
                return channel;
            }
            channel.close();
            return null;
        }
        catch (IOException e) {
            channel.close();
            throw e;
        }
    }

    private static CheckedFileIO maybeMigrateHeader(TopicPartition topicPartition, String basePath, byte requiredVersion, Path destination, CheckedFileIO channel, Header existingHeader, Algorithm checksumAlgorithm, short checksumSuperBlockLength) throws IOException {
        if (existingHeader.version() != requiredVersion) {
            Path tmpFilePath = FileTierPartitionState.tmpFilePath(basePath, checksumAlgorithm);
            ChecksumMigration.maybeRemovePreviousFormatPath(tmpFilePath);
            TopicIdPartition topicIdPartition = new TopicIdPartition(topicPartition.topic(), existingHeader.topicId(), topicPartition.partition());
            try (CheckedFileIO tmpChannel = CheckedFileIO.openOrCreate(tmpFilePath, checksumAlgorithm, checksumSuperBlockLength, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE);){
                log.info("Rewriting tier partition state with version {} to {} for {}", new Object[]{existingHeader.version(), requiredVersion, topicIdPartition});
                Header newHeader = new Header(topicIdPartition.topicId(), requiredVersion, existingHeader.tierEpoch(), existingHeader.status(), existingHeader.startOffset(), existingHeader.endOffset(), existingHeader.globalMaterializedOffsetAndEpoch(), existingHeader.localMaterializedOffsetAndEpoch(), existingHeader.errorOffsetAndEpoch(), existingHeader.restoreOffsetAndEpoch());
                FileTierPartitionState.writeHeader(tmpChannel, newHeader);
                tmpChannel.position(newHeader.size());
                channel.position(existingHeader.size());
                FileTierPartitionState.copy(channel, tmpChannel);
            }
            Utils.atomicMoveWithFallback((Path)tmpFilePath, (Path)destination);
            channel.close();
            return CheckedFileIO.openOrCreate(destination, checksumAlgorithm, checksumSuperBlockLength, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
        }
        return channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean dirty() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.dirty;
        }
    }

    private static void validateEpoch(Optional<Integer> current, Optional<Integer> checked) {
        if (current.isPresent() && checked.isPresent() && checked.get() < current.get()) {
            throw new IllegalStateException("New epoch " + checked + " must dominate old epoch " + current);
        }
    }

    private static boolean allowedStateOffset(OffsetAndEpoch stateOffsetAndEpoch, OffsetAndEpoch current) {
        if (stateOffsetAndEpoch.equals(OffsetAndEpoch.EMPTY)) {
            return true;
        }
        if (stateOffsetAndEpoch.offset() >= current.offset()) {
            FileTierPartitionState.validateEpoch(current.epoch(), stateOffsetAndEpoch.epoch());
            return true;
        }
        return false;
    }

    private static boolean allowedSourceOffset(OffsetAndEpoch toProcess, OffsetAndEpoch current) {
        if (toProcess.offset() > current.offset()) {
            FileTierPartitionState.validateEpoch(current.epoch(), toProcess.epoch());
            return true;
        }
        if (toProcess.epoch().isPresent() && current.epoch().isPresent() && toProcess.epoch().get() > current.epoch().get()) {
            throw new IllegalStateException("Incorrect epoch in " + toProcess + " with current epoch " + current);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setTieredPartitionRecoveryWorkflowCb(Consumer<TierPartitionState.RecoveryOperation> cb) {
        Object object = this.stateLock;
        synchronized (object) {
            this.state.recoveryWorkflowCb = cb;
            if (this.state.status() == TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
                this.state.errorStatusReachedViaFenceEvent = true;
                this.state.recoveryWorkflowCb.accept(TierPartitionState.RecoveryOperation.FREEZE_MERGED_LOG_START_OFFSET);
            }
        }
    }

    static Path flushedFilePath(String basePath, Algorithm checksumAlgorithm) {
        return StateFileType.FLUSHED.filePath(basePath, checksumAlgorithm);
    }

    static Path mutableFilePath(String basePath, Algorithm checksumAlgorithm) {
        return StateFileType.MUTABLE.filePath(basePath, checksumAlgorithm);
    }

    static Path recoverPath(String basePath, Algorithm checksumAlgorithm) {
        return StateFileType.RECOVER.filePath(basePath, checksumAlgorithm);
    }

    static Path discardedFilePath(String basePath, OffsetAndEpoch restoreAt, Algorithm checksumAlgorithm) {
        return Paths.get(String.format("%s.%s.%s_epoch_%s_offset_%s", new Object[]{basePath, StateFileType.DISCARDED, checksumAlgorithm.suffix, restoreAt.epoch().orElse(-1), restoreAt.offset()}), new String[0]);
    }

    static Path tmpFilePath(String basePath, Algorithm checksumAlgorithm) {
        return StateFileType.TEMPORARY.filePath(basePath, checksumAlgorithm);
    }

    static Path errorFilePath(String basePath, Algorithm checksumAlgorithm) {
        return StateFileType.ERROR.filePath(basePath, checksumAlgorithm);
    }

    public boolean isErrorStatusReachedViaFenceEvent() {
        return this.state.errorStatusReachedViaFenceEvent;
    }

    public static interface MaterializationListener {
        public boolean mayComplete(State var1, Optional<AbstractTierMetadata> var2) throws IOException;

        public boolean complete(State var1) throws IOException;

        public void cancel(Exception var1);

        public static class Initialization
        implements MaterializationListener {
            private final Logger log;
            private final TopicIdPartition topicIdPartition;
            private final CompletableFuture<Boolean> promise;
            private final int leaderEpochToMaterialize;

            public Initialization(Logger log, TopicIdPartition topicIdPartition, CompletableFuture<Boolean> promise, int leaderEpochToMaterialize) {
                this.log = log;
                this.topicIdPartition = topicIdPartition;
                this.promise = promise;
                this.leaderEpochToMaterialize = leaderEpochToMaterialize;
            }

            @Override
            public boolean mayComplete(State unflushedState, Optional<AbstractTierMetadata> unflushedMessage) {
                if (unflushedState.status.isOpen()) {
                    return unflushedState.currentEpoch >= this.leaderEpochToMaterialize;
                }
                return true;
            }

            @Override
            public boolean complete(State flushedState) {
                if (flushedState.status.isOpen()) {
                    if (flushedState.currentEpoch >= this.leaderEpochToMaterialize) {
                        this.promise.complete(true);
                        this.log.info("Successfully completing tracking for metadata initialization of {} for epoch {} ", (Object)this.topicIdPartition, (Object)this.leaderEpochToMaterialize);
                        return true;
                    }
                    return false;
                }
                this.log.error("Tier partition state for " + this.topicIdPartition + " not open");
                this.promise.complete(false);
                return true;
            }

            @Override
            public void cancel(Exception e) {
                if (!this.promise.isDone()) {
                    this.log.info("Completing {} exceptionally", (Object)this, (Object)e);
                    this.promise.completeExceptionally(e);
                }
            }

            public String toString() {
                return "MaterializationListener.Initialization{topicIdPartition=" + this.topicIdPartition + ", leaderEpochToMaterialize=" + this.leaderEpochToMaterialize + '}';
            }
        }

        public static class ReplicationTargetObjectId
        implements MaterializationListener {
            private final Logger log;
            private final TopicIdPartition topicIdPartition;
            private final CompletableFuture<TierLogSegment> promise;
            private final UUID targetObjectId;
            private final long targetRestoreEpoch;
            private final long upperBoundEndOffset;

            public ReplicationTargetObjectId(Logger log, TopicIdPartition topicIdPartition, CompletableFuture<TierLogSegment> promise, UUID targetObjectId, long targetRestoreEpoch, long upperBoundEndOffset) {
                this.log = log;
                this.topicIdPartition = topicIdPartition;
                this.promise = promise;
                this.targetObjectId = targetObjectId;
                this.targetRestoreEpoch = targetRestoreEpoch;
                this.upperBoundEndOffset = upperBoundEndOffset;
            }

            @Override
            public boolean mayComplete(State unflushedState, Optional<AbstractTierMetadata> unflushedMessage) throws IOException {
                AbstractTierMetadata message;
                if (!unflushedState.status.isOpen()) {
                    return true;
                }
                if ((long)unflushedState.restoreOffsetAndEpoch.epoch().orElse(-1).intValue() < this.targetRestoreEpoch) {
                    return false;
                }
                boolean foundObjectId = unflushedMessage.isPresent() ? ((message = unflushedMessage.get()).type() == TierRecordType.SegmentUploadComplete ? ((TierSegmentUploadComplete)message).objectId().equals(this.targetObjectId) : false) : this.findTierObjectMetadata(unflushedState).isPresent();
                if (unflushedState.endOffset > this.upperBoundEndOffset && !foundObjectId) {
                    return true;
                }
                return foundObjectId;
            }

            private Optional<SegmentState> findTierObjectMetadata(State state) {
                SegmentState candidate = state.getState(this.targetObjectId);
                if (candidate == null) {
                    return Optional.empty();
                }
                SegmentState validSegment = (SegmentState)state.logSegments.get(candidate.baseOffset());
                if (validSegment == null) {
                    return Optional.empty();
                }
                if (!validSegment.objectId().equals(this.targetObjectId) || !candidate.state().equals((Object)TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE)) {
                    return Optional.empty();
                }
                return Optional.of(candidate);
            }

            @Override
            public boolean complete(State flushedState) throws IOException {
                if (!flushedState.status.isOpen()) {
                    this.promise.completeExceptionally(new TierPartitionStateIllegalListenerException(String.format("Tier partition state for %s is not open.", this.topicIdPartition)));
                    return true;
                }
                if ((long)flushedState.restoreOffsetAndEpoch.epoch().orElse(-1).intValue() < this.targetRestoreEpoch) {
                    return false;
                }
                Optional<SegmentState> tierSegment = this.findTierObjectMetadata(flushedState);
                if (tierSegment.isPresent()) {
                    this.log.info("Completing {} successfully.", (Object)this);
                    this.promise.complete(new TierLogSegment(this.topicIdPartition, tierSegment.get()));
                    return true;
                }
                if (flushedState.endOffset > this.upperBoundEndOffset) {
                    this.promise.completeExceptionally(new TierPartitionStateIllegalListenerException(String.format("Tier partition state for %s the upperBoundEndOffset %d at endOffset %d. This suggests that the materialization target objectId %s at restoreEpoch %d was deleted", this.topicIdPartition, this.upperBoundEndOffset, flushedState.endOffset, this.targetObjectId, this.targetRestoreEpoch)));
                    return true;
                }
                return false;
            }

            @Override
            public void cancel(Exception e) {
                if (!this.promise.isDone()) {
                    this.log.info("Completing {} exceptionally", (Object)this, (Object)e);
                    this.promise.completeExceptionally(e);
                }
            }

            public long materializationProgress(long endOffset) {
                return Math.max(this.upperBoundEndOffset - endOffset, 0L);
            }

            public String toString() {
                return "MaterializationListener.ReplicationTargetObjectId{targetObjectId=" + this.targetObjectId + ", targetRestoreEpoch=" + this.targetRestoreEpoch + ", upperBoundEndOffset=" + this.upperBoundEndOffset + ", topicIdPartition=" + this.topicIdPartition + '}';
            }
        }

        public static class ReplicationTargetOffset
        implements MaterializationListener {
            private final Logger log;
            private final TopicIdPartition topicIdPartition;
            private final CompletableFuture<TierLogSegment> promise;
            private final long targetEndOffset;

            public ReplicationTargetOffset(Logger log, TopicIdPartition topicIdPartition, CompletableFuture<TierLogSegment> promise, long targetEndOffset) {
                this.log = log;
                this.topicIdPartition = topicIdPartition;
                this.promise = promise;
                this.targetEndOffset = targetEndOffset;
            }

            @Override
            public boolean mayComplete(State unflushedState, Optional<AbstractTierMetadata> unflushedMessage) throws IOException {
                if (unflushedState.status.isOpen()) {
                    long uncommitedEndOffset = unflushedState.endOffset;
                    return uncommitedEndOffset >= this.targetEndOffset;
                }
                return true;
            }

            @Override
            public boolean complete(State flushedState) throws IOException {
                if (flushedState.status.isOpen()) {
                    Optional<Object> metadata = Optional.empty();
                    long endOffset = flushedState.endOffset;
                    if (endOffset != -1L && this.targetEndOffset <= endOffset) {
                        metadata = flushedState.metadata(this.targetEndOffset);
                    }
                    if (metadata.isPresent()) {
                        if (((SegmentState)metadata.get()).endOffset() < this.targetEndOffset) {
                            throw new IllegalStateException("Metadata lookup for offset " + this.targetEndOffset + " returned unexpected segment " + metadata + " for " + this.topicIdPartition);
                        }
                        this.promise.complete(new TierLogSegment(this.topicIdPartition, (SegmentState)metadata.get()));
                        return true;
                    }
                    return false;
                }
                this.promise.completeExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicIdPartition + " is not open."));
                return true;
            }

            @Override
            public void cancel(Exception e) {
                if (!this.promise.isDone()) {
                    this.log.info("Completing {} exceptionally", (Object)this, (Object)e);
                    this.promise.completeExceptionally(e);
                }
            }

            public long materializationProgress(long endOffset) {
                return Math.max(this.targetEndOffset - endOffset, 0L);
            }

            public String toString() {
                return "MaterializationListener.ReplicationTargetOffset{targetOffset=" + this.targetEndOffset + ", topicIdPartition=" + this.topicIdPartition + '}';
            }
        }

        public static class LeaderEpoch
        implements MaterializationListener {
            private final Logger log;
            private final TopicIdPartition topicIdPartition;
            private final CompletableFuture<Optional<TierLogSegment>> promise;
            private final int leaderEpochToMaterialize;

            public LeaderEpoch(Logger log, TopicIdPartition topicIdPartition, CompletableFuture<Optional<TierLogSegment>> promise, int leaderEpochToMaterialize) {
                this.log = log;
                this.topicIdPartition = topicIdPartition;
                this.promise = promise;
                this.leaderEpochToMaterialize = leaderEpochToMaterialize;
            }

            @Override
            public boolean mayComplete(State unflushedState, Optional<AbstractTierMetadata> ignored) {
                if (unflushedState.status.isOpen()) {
                    return unflushedState.currentEpoch >= this.leaderEpochToMaterialize;
                }
                return true;
            }

            @Override
            public boolean complete(State flushedState) throws IOException {
                if (this.promise.isDone()) {
                    throw new IllegalStateException("promise can only be completed once");
                }
                if (flushedState.status.isOpen()) {
                    if (flushedState.currentEpoch >= this.leaderEpochToMaterialize) {
                        Optional<TierLogSegment> lastSegmentOpt = Optional.ofNullable(flushedState.logSegments.lastEntry()).map(kv -> new TierLogSegment(this.topicIdPartition, (SegmentState)kv.getValue()));
                        this.log.info("Completing {} successfully.", (Object)this);
                        this.promise.complete(lastSegmentOpt);
                        return true;
                    }
                    return false;
                }
                this.promise.completeExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicIdPartition + " is not open."));
                return true;
            }

            @Override
            public void cancel(Exception e) {
                if (!this.promise.isDone()) {
                    this.log.info("Completing {} exceptionally", (Object)this, (Object)e);
                    this.promise.completeExceptionally(e);
                }
            }

            public String toString() {
                return "MaterializationListener.LeaderEpoch(topicIdPartition: " + this.topicIdPartition + ", leaderEpochToMaterialize: " + this.leaderEpochToMaterialize + ")";
            }
        }
    }

    private static class StateCorruptedException
    extends RetriableException {
        StateCorruptedException(String message) {
            super(message);
        }
    }

    static class State {
        private static final State EMPTY = new State();
        private final CheckedFileIO channel;
        private final ConcurrentNavigableMap<Long, SegmentState> logSegments = new ConcurrentSkipListMap<Long, SegmentState>();
        private final ConcurrentNavigableMap<UUID, SegmentState> allSegmentStates = new ConcurrentSkipListMap<UUID, SegmentState>();
        private final Set<UUID> deletedSegments = ConcurrentHashMap.newKeySet();
        private final byte version;
        private final Consumer<IOException> ioExceptionHandler;
        private final ConcurrentHashMap<Class<? extends MaterializationListener>, MaterializationListener> listeners = new ConcurrentHashMap();
        private String basePath;
        private TopicIdPartition topicIdPartition = null;
        private volatile boolean dirty = false;
        private volatile SegmentState uploadInProgress;
        private volatile SegmentState immediatelyPreceedingDeletedSegment;
        private volatile long endOffset = -1L;
        private volatile long committedEndOffset = -1L;
        private volatile int currentEpoch = -1;
        private volatile long validSegmentsSize = 0L;
        private volatile TierPartitionStatus status = TierPartitionStatus.UNINITIALIZED;
        private volatile OffsetAndEpoch globalMaterializedOffsetAndEpoch = OffsetAndEpoch.EMPTY;
        private volatile OffsetAndEpoch localMaterializedOffsetAndEpoch = OffsetAndEpoch.EMPTY;
        private volatile OffsetAndEpoch errorOffsetAndEpoch = OffsetAndEpoch.EMPTY;
        private volatile OffsetAndEpoch restoreOffsetAndEpoch = OffsetAndEpoch.EMPTY;
        private volatile boolean errorStatusReachedViaFenceEvent = false;
        public Consumer<TierPartitionState.RecoveryOperation> recoveryWorkflowCb;
        private final Algorithm checksumAlgorithm;
        private final short checksumSuperBlockLength;

        private State() {
            this.channel = null;
            this.version = (byte)-1;
            this.basePath = null;
            this.ioExceptionHandler = e -> {
                throw new IllegalStateException("Illegal use of setLogDirOffline");
            };
            this.recoveryWorkflowCb = null;
            this.checksumAlgorithm = null;
            this.checksumSuperBlockLength = (short)-1;
        }

        State(TopicPartition topicPartition, String basePath, byte version, CheckedFileIO channel, Consumer<IOException> ioExceptionHandler, Algorithm checksumAlgorithm, short checksumSuperBlockLength) throws IOException, StateCorruptedException {
            this.basePath = basePath;
            this.version = version;
            this.channel = channel;
            this.ioExceptionHandler = ioExceptionHandler;
            this.recoveryWorkflowCb = null;
            this.checksumAlgorithm = checksumAlgorithm;
            this.checksumSuperBlockLength = checksumSuperBlockLength;
            this.scanAndInitialize(topicPartition);
            if (this.status == TierPartitionStatus.UNINITIALIZED) {
                throw new IllegalStateException("Illegal TierPartitionStatus: " + (Object)((Object)this.status));
            }
        }

        private static State createRestoredState(TopicPartition topicPartition, String basePath, byte version, CheckedFileIO channel, Consumer<IOException> ioExceptionHandler, OffsetAndEpoch localMaterializedOffsetAndEpoch, TierPartitionStatus status, Consumer<TierPartitionState.RecoveryOperation> recoveryWorkflowCb, Algorithm checksumAlgorithm, short checksumSuperBlockLength) throws IOException {
            State imported = new State(topicPartition, basePath, version, channel, ioExceptionHandler, checksumAlgorithm, checksumSuperBlockLength);
            imported.localMaterializedOffsetAndEpoch = localMaterializedOffsetAndEpoch;
            imported.globalMaterializedOffsetAndEpoch = OffsetAndEpoch.EMPTY;
            imported.restoreOffsetAndEpoch = localMaterializedOffsetAndEpoch;
            imported.setStatus(status);
            imported.recoveryWorkflowCb = recoveryWorkflowCb;
            imported.dirty = true;
            return imported;
        }

        private void scanAndInitialize(TopicPartition topicPartition) throws IOException, StateCorruptedException {
            log.debug("scan and truncate TierPartitionState {}", (Object)topicPartition);
            Header header = FileTierPartitionState.readHeader(this.channel).get();
            this.topicIdPartition = new TopicIdPartition(topicPartition.topic(), header.topicId(), topicPartition.partition());
            long currentPosition = header.size();
            FileTierPartitionIterator iterator = FileTierPartitionState.iterator(this.topicIdPartition, this.channel, currentPosition);
            while (iterator.hasNext()) {
                TierObjectMetadata metadata = (TierObjectMetadata)iterator.next();
                log.debug("{}: scan reloaded metadata {}", (Object)topicPartition, (Object)metadata);
                this.addSegmentMetadata(metadata, currentPosition);
                currentPosition = iterator.position();
            }
            if (currentPosition < this.channel.size()) {
                throw new StateCorruptedException("Could not read all bytes in file. position: " + currentPosition + " size: " + this.channel.size() + " for partition " + this.topicIdPartition);
            }
            if (header.endOffset() != -1L && this.endOffset != header.endOffset()) {
                if (this.numSegments() > 0) {
                    log.info("File header endOffset does not match the materialized endOffset. Setting state endOffset to be equal to header endOffset. Header endOffset: " + header.endOffset() + " materialized state endOffset: " + this.endOffset + " for partition " + this.topicIdPartition);
                }
                this.endOffset = header.endOffset();
            }
            this.channel.position(this.channel.size());
            this.committedEndOffset = this.endOffset;
            this.currentEpoch = header.tierEpoch();
            this.globalMaterializedOffsetAndEpoch = header.globalMaterializedOffsetAndEpoch();
            this.localMaterializedOffsetAndEpoch = header.localMaterializedOffsetAndEpoch();
            this.errorOffsetAndEpoch = header.errorOffsetAndEpoch();
            this.status = header.status();
            if (this.status() == TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
                this.errorStatusReachedViaFenceEvent = true;
                if (this.recoveryWorkflowCb != null) {
                    this.recoveryWorkflowCb.accept(TierPartitionState.RecoveryOperation.FREEZE_MERGED_LOG_START_OFFSET);
                }
            }
            log.info("Opened tier partition state for {} in status {}. topicIdPartition: {} tierEpoch: {} endOffset: {}", new Object[]{topicPartition, this.status, this.topicIdPartition, this.currentEpoch, this.endOffset});
        }

        public void updateBasePath(String path) {
            this.basePath = path;
        }

        SegmentState updateAndGetState(long filePosition, TierObjectMetadata metadata) {
            SegmentState existing = (SegmentState)this.allSegmentStates.get(metadata.objectId());
            SegmentState nextState = existing != null ? existing.updateState(metadata.state()) : new SegmentState(metadata, this.startOffsetOfSegment(metadata), filePosition);
            this.allSegmentStates.put(metadata.objectId(), nextState);
            return nextState;
        }

        SegmentState getState(UUID objectId) {
            return (SegmentState)this.allSegmentStates.get(objectId);
        }

        private TierPartitionStatus getStatus() {
            return this.status;
        }

        private void setStatus(TierPartitionStatus status) {
            if (this.status == TierPartitionStatus.UNINITIALIZED || status == TierPartitionStatus.UNINITIALIZED) {
                throw new IllegalStateException("Illegal transition " + (Object)((Object)this.status) + " to " + (Object)((Object)status));
            }
            if (this.status != status) {
                this.status = status;
                this.dirty = true;
                log.info("Status updated to {} for {}", (Object)status, (Object)this.topicIdPartition);
            }
        }

        private void setErrorStatus(OffsetAndEpoch offsetAndEpoch, boolean errorStatusReachedViaFenceEvent, boolean freezeMergedLogStartOffset) {
            this.errorOffsetAndEpoch = offsetAndEpoch;
            this.errorStatusReachedViaFenceEvent = errorStatusReachedViaFenceEvent;
            if (freezeMergedLogStartOffset) {
                this.setStatus(TierPartitionStatus.FROZEN_LOG_START_OFFSET);
                if (this.recoveryWorkflowCb != null) {
                    this.recoveryWorkflowCb.accept(TierPartitionState.RecoveryOperation.FREEZE_MERGED_LOG_START_OFFSET);
                }
            } else {
                this.setStatus(TierPartitionStatus.ERROR);
            }
        }

        public void beginCatchup() {
            if (!this.status.isOpenForWrite()) {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.status) + " for tier partition basePath: " + this.basePath);
            }
            this.setStatus(TierPartitionStatus.CATCHUP);
        }

        public void onCatchUpComplete() {
            if (!this.status.isOpenForWrite()) {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.status) + " for tier partition basePath: " + this.basePath);
            }
            this.setStatus(TierPartitionStatus.ONLINE);
        }

        public Optional<Long> startOffset() {
            Map.Entry firstEntry = this.logSegments.firstEntry();
            if (firstEntry != null) {
                return Optional.of(firstEntry.getKey());
            }
            return Optional.empty();
        }

        public Long endOffset() {
            return this.endOffset;
        }

        TierPartitionStatus status() {
            return this.status;
        }

        int currentEpoch() {
            return this.currentEpoch;
        }

        public int numSegments() {
            return this.logSegments.size();
        }

        long committedEndOffset() {
            return this.committedEndOffset;
        }

        long totalSize() {
            return this.validSegmentsSize;
        }

        private NavigableSet<Long> segmentOffsets() {
            return this.logSegments.keySet();
        }

        public NavigableSet<Long> segmentOffsets(long from, long to) {
            return Log$.MODULE$.logSegments(this.logSegments, from, to).keySet();
        }

        public Collection<SegmentState> segments() {
            return this.logSegments.values();
        }

        public Collection<SegmentState> segments(long from, long to) {
            return Optional.ofNullable(this.logSegments.floorKey(from)).map(floor -> this.logSegments.subMap(floor, (Object)to)).orElseGet(() -> this.logSegments.headMap((Object)to)).values();
        }

        public List<SegmentState> fetchInMemoryMetadataRange(long from, long to) {
            ArrayList<SegmentState> allMetadata = new ArrayList<SegmentState>();
            if (this.immediatelyPreceedingDeletedSegment != null && this.immediatelyPreceedingDeletedSegment.endOffset() >= from && this.immediatelyPreceedingDeletedSegment.baseOffset() <= to) {
                allMetadata.add(this.immediatelyPreceedingDeletedSegment);
            }
            allMetadata.addAll(this.segments(from, to));
            return allMetadata;
        }

        public Optional<SegmentState> previousMetadataBeforeOffset(long targetStartOffset) {
            if (targetStartOffset < 0L) {
                return Optional.empty();
            }
            ListIterator<SegmentState> li = this.segmentStateListIterator(0L, targetStartOffset);
            while (li.hasPrevious()) {
                SegmentState entry = li.previous();
                if (entry.endOffset() >= targetStartOffset) continue;
                return Optional.of(entry);
            }
            return Optional.empty();
        }

        private Optional<SegmentState> previousMetadataForFollowerRestorePoint(long targetStartOffset) {
            if (targetStartOffset < 0L) {
                return Optional.empty();
            }
            ListIterator<SegmentState> li = this.segmentStateListIterator(0L, targetStartOffset);
            while (li.hasPrevious()) {
                SegmentState entry = li.previous();
                if (entry.baseOffset() >= targetStartOffset) continue;
                return Optional.of(entry);
            }
            return Optional.empty();
        }

        private ListIterator<SegmentState> segmentStateListIterator(long startOffset, long endOffset) {
            List<SegmentState> allMetadata = this.fetchInMemoryMetadataRange(startOffset, endOffset);
            return allMetadata.listIterator(allMetadata.size());
        }

        void putValid(SegmentState state) {
            this.logSegments.put(state.baseOffset(), state);
            this.validSegmentsSize += (long)state.size();
            this.endOffset = Math.max(this.endOffset, state.endOffset());
        }

        void removeValid(SegmentState segmentState) {
            SegmentState toRemove = (SegmentState)this.logSegments.get(segmentState.baseOffset());
            if (toRemove != null && toRemove.objectId().equals(segmentState.objectId())) {
                this.logSegments.remove(segmentState.baseOffset());
                this.validSegmentsSize -= (long)segmentState.size();
            }
        }

        private long startOffsetOfSegment(TierObjectMetadata metadata) {
            return Math.max(metadata.baseOffset(), this.endOffset + 1L);
        }

        private TierPartitionState.AppendResult appendMetadataUnhandled(AbstractTierMetadata entry, OffsetAndEpoch offsetAndEpoch) throws IOException {
            if (this.status.hasError()) {
                log.debug("Skipping processing for {} from offset {} as the current status is failed", (Object)entry, (Object)offsetAndEpoch);
                return TierPartitionState.AppendResult.FAILED;
            }
            if (!this.status.isOpenForWrite()) {
                log.debug("Skipping processing for {} from offset {} as file is not open for write", (Object)entry, (Object)offsetAndEpoch);
                return TierPartitionState.AppendResult.NOT_TIERABLE;
            }
            if (!FileTierPartitionState.allowedSourceOffset(offsetAndEpoch, this.localMaterializedOffsetAndEpoch)) {
                log.debug("Ignoring message at offset {} as last materialized offset is {} for {}", new Object[]{offsetAndEpoch, this.localMaterializedOffsetAndEpoch, this.topicIdPartition});
                return TierPartitionState.AppendResult.FENCED;
            }
            if (!FileTierPartitionState.allowedStateOffset(entry.stateOffsetAndEpoch(), this.restoreOffsetAndEpoch)) {
                log.info("Ignoring message {} at offset {} as the provided restore offset is {} and current restore offset is {} for {}", new Object[]{entry, offsetAndEpoch, entry.stateOffsetAndEpoch(), this.restoreOffsetAndEpoch, this.topicIdPartition});
                return TierPartitionState.AppendResult.RESTORE_FENCED;
            }
            TierPartitionState.AppendResult result = this.appendMetadataImpl(entry, offsetAndEpoch);
            this.localMaterializedOffsetAndEpoch = offsetAndEpoch;
            log.debug("Processed append for {} with result {} consumed from offset {}", new Object[]{entry, result, offsetAndEpoch});
            return result;
        }

        private TierPartitionState.AppendResult appendMetadata(AbstractTierMetadata entry, OffsetAndEpoch offsetAndEpoch) throws KafkaStorageException {
            try {
                return this.appendMetadataUnhandled(entry, offsetAndEpoch);
            }
            catch (IOException ioe) {
                TierPartitionStatus previousStatus = this.getStatus();
                this.setErrorStatus(offsetAndEpoch, false, false);
                this.ioExceptionHandler.accept(ioe);
                throw new KafkaStorageException("Failed to apply " + entry + ", currentEpoch=" + this.currentEpoch + ", tierTopicPartitionOffsetAndEpoch=" + offsetAndEpoch + ", previousTierPartitionStatus=" + (Object)((Object)previousStatus) + ", newTierPartitionStatus=" + (Object)((Object)TierPartitionStatus.ERROR), (Throwable)ioe);
            }
            catch (Exception e) {
                TierPartitionStatus previousStatus = this.getStatus();
                this.setErrorStatus(offsetAndEpoch, false, false);
                String logMsg = String.format("Failed to apply %s, currentEpoch=%d, tierTopicPartitionOffsetAndEpoch=%s, previousTierPartitionStatus=%s, newTierPartitionStatus=%s", new Object[]{entry, this.currentEpoch, offsetAndEpoch, previousStatus, TierPartitionStatus.ERROR});
                if (previousStatus == TierPartitionStatus.ONLINE) {
                    log.error(logMsg, (Throwable)e);
                } else {
                    log.info(logMsg, (Throwable)e);
                }
                return TierPartitionState.AppendResult.FAILED;
            }
        }

        private TierPartitionState.AppendResult appendMetadataImpl(AbstractTierMetadata entry, OffsetAndEpoch offsetAndEpoch) throws IOException {
            TierPartitionState.AppendResult appendResult;
            switch (entry.type()) {
                case InitLeader: {
                    appendResult = this.handleInitLeader((TierTopicInitLeader)entry);
                    break;
                }
                case PartitionFence: {
                    appendResult = this.handlePartitionFence((TierPartitionFence)entry, offsetAndEpoch);
                    break;
                }
                case SegmentUploadInitiate: 
                case SegmentUploadComplete: 
                case SegmentDeleteInitiate: 
                case SegmentDeleteComplete: {
                    appendResult = this.maybeTransitionSegment((AbstractTierSegmentMetadata)entry);
                    break;
                }
                case PartitionDeleteInitiate: 
                case PartitionDeleteComplete: {
                    appendResult = TierPartitionState.AppendResult.ACCEPTED;
                    break;
                }
                default: {
                    throw new IllegalStateException("Attempt to append unknown type " + (Object)((Object)entry.type()) + " to " + this.topicIdPartition);
                }
            }
            Iterator<Map.Entry<Class<? extends MaterializationListener>, MaterializationListener>> iter = this.listeners.entrySet().iterator();
            while (iter.hasNext()) {
                if (!this.tryCompleteListener(iter.next().getValue(), Optional.of(entry))) continue;
                iter.remove();
            }
            return appendResult;
        }

        private boolean tryCompleteListener(MaterializationListener listener, Optional<AbstractTierMetadata> entry) throws IOException {
            if (listener.mayComplete(this, entry)) {
                this.flush();
                return listener.complete(this);
            }
            return false;
        }

        private Supplier<Optional<TierObjectMetadata>> metadataForInMemorySegmentMetadata(SegmentState metadata) {
            try {
                FileTierPartitionIterator iter = FileTierPartitionState.iterator(this.topicIdPartition, this.channel, metadata.filePosition());
                return () -> {
                    if (iter.hasNext()) {
                        return Optional.of(iter.next());
                    }
                    return Optional.empty();
                };
            }
            catch (IOException e) {
                throw new KafkaStorageException((Throwable)e);
            }
        }

        private List<SegmentState> metadataForStates(Set<TierObjectMetadata.State> states) {
            return this.allSegmentStates.values().stream().filter(segmentState -> states.contains((Object)segmentState.state())).collect(Collectors.toList());
        }

        public String toString() {
            return "State(status=" + (Object)((Object)this.status) + ", startOffset=" + this.startOffset() + ", endOffset=" + this.endOffset() + ", committedEndOffset=" + this.committedEndOffset() + ", numSegments=" + this.numSegments() + ", tierEpoch=" + this.currentEpoch + ", lastMaterializedOffset=" + this.localMaterializedOffsetAndEpoch + ", globalMaterializedOffset=" + this.globalMaterializedOffsetAndEpoch + ", errorOffsetAndEpoch=" + this.errorOffsetAndEpoch + ", restoreOffsetAndEpoch=" + this.restoreOffsetAndEpoch + ", checksumAlgorithm=" + (Object)((Object)this.checksumAlgorithm) + ", checksumSuperBlockLength" + this.checksumSuperBlockLength + ")";
        }

        public Optional<SegmentState> metadata(long targetOffset) throws IOException {
            Collection<SegmentState> view = this.segments(targetOffset, Long.MAX_VALUE);
            for (SegmentState segment : view) {
                if (segment.endOffset() < targetOffset) continue;
                return Optional.of(segment);
            }
            return Optional.empty();
        }

        private TierPartitionState.AppendResult handlePartitionFence(TierPartitionFence partitionFenceEvent, OffsetAndEpoch offsetAndEpoch) {
            if (partitionFenceEvent.freezeLogStartOffset() && this.recoveryWorkflowCb == null) {
                throw new IllegalStateException(String.format("Cannot process metadata %s at offsetEpoch %s%n.FileTierPartitionState needs a recovery workflow callback to process a partition fence event when log start offset needs to be frozen as well.", partitionFenceEvent, offsetAndEpoch));
            }
            this.setErrorStatus(offsetAndEpoch, true, partitionFenceEvent.freezeLogStartOffset());
            log.info("topicIdPartition={} fenced by PartitionFence event={} at offset={}", new Object[]{this.topicIdPartition, partitionFenceEvent, offsetAndEpoch});
            return TierPartitionState.AppendResult.FAILED;
        }

        private TierPartitionState.AppendResult handleInitLeader(TierTopicInitLeader initLeader) throws IOException {
            if (initLeader.tierEpoch() == this.currentEpoch) {
                return TierPartitionState.AppendResult.ACCEPTED;
            }
            if (initLeader.tierEpoch() > this.currentEpoch) {
                HashSet<TierObjectMetadata.State> statesToFence = new HashSet<TierObjectMetadata.State>(Arrays.asList(TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE, TierObjectMetadata.State.SEGMENT_DELETE_INITIATE));
                List<SegmentState> toFence = this.metadataForStates(statesToFence);
                for (SegmentState metadata : toFence) {
                    this.fenceSegment(metadata);
                }
                this.currentEpoch = initLeader.tierEpoch();
                this.dirty = true;
                return TierPartitionState.AppendResult.ACCEPTED;
            }
            return TierPartitionState.AppendResult.FENCED;
        }

        private TierPartitionState.AppendResult maybeTransitionSegment(AbstractTierSegmentMetadata metadata) throws IOException {
            if (metadata.tierEpoch() > this.currentEpoch) {
                throw new IllegalStateException(String.format("Unexpected transition attempted for topicIdPartition=%s via metadata=%s at epoch=%s while currentEpoch=%s is lower", this.topicIdPartition, metadata, metadata.tierEpoch(), this.currentEpoch));
            }
            if (metadata.tierEpoch() < this.currentEpoch) {
                log.info("Fenced {} as currentEpoch={} ({})", new Object[]{metadata, this.currentEpoch, this.topicIdPartition});
                return TierPartitionState.AppendResult.FENCED;
            }
            SegmentState currentState = this.getState(metadata.objectId());
            if (currentState != null) {
                if (currentState.state().equals((Object)metadata.state())) {
                    log.debug("Accepting duplicate transition for {} ({})", (Object)metadata, (Object)this.topicIdPartition);
                    return TierPartitionState.AppendResult.ACCEPTED;
                }
                if (!currentState.state().canTransitionTo(metadata.state())) {
                    log.info("Fencing already processed transition for {} with currentState={} ({})", new Object[]{metadata, currentState, this.topicIdPartition});
                    return TierPartitionState.AppendResult.FENCED;
                }
            } else {
                if (this.deletedSegments.contains(metadata.objectId())) {
                    log.info("tiered log segment for {} has already been deleted, ignoring and marking this segment as {}.", (Object)metadata, (Object)TierPartitionState.AppendResult.ACCEPTED);
                    return TierPartitionState.AppendResult.ACCEPTED;
                }
                if (metadata.state() != TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE) {
                    throw new IllegalStateException("Cannot complete transition for non-existent segment " + metadata + " for " + this.topicIdPartition);
                }
            }
            switch (metadata.type()) {
                case SegmentUploadInitiate: {
                    return this.handleUploadInitiate((TierSegmentUploadInitiate)metadata);
                }
                case SegmentUploadComplete: {
                    return this.handleUploadComplete((TierSegmentUploadComplete)metadata);
                }
                case SegmentDeleteInitiate: {
                    return this.handleDeleteInitiate((TierSegmentDeleteInitiate)metadata);
                }
                case SegmentDeleteComplete: {
                    return this.handleDeleteComplete((TierSegmentDeleteComplete)metadata);
                }
            }
            throw new IllegalStateException("Unexpected state " + (Object)((Object)metadata.state()) + " for " + this.topicIdPartition);
        }

        private void trackPreceedingMetadata(SegmentState segmentState) {
            if (segmentState.baseOffset() < this.startOffset().orElse(Long.MAX_VALUE)) {
                this.immediatelyPreceedingDeletedSegment = segmentState;
            }
        }

        private void addSegmentMetadata(TierObjectMetadata metadata, long filePosition) {
            if (this.deletedSegments.contains(metadata.objectId())) {
                log.info("ignoring metadata for {} as it has already been deleted", (Object)metadata);
            } else {
                SegmentState segmentState = this.updateAndGetState(filePosition, metadata);
                switch (metadata.state()) {
                    case SEGMENT_UPLOAD_INITIATE: {
                        if (this.uploadInProgress != null) {
                            throw new IllegalStateException("Unexpected upload in progress " + this.uploadInProgress + " when appending " + metadata + " to " + this.topicIdPartition);
                        }
                        this.uploadInProgress = segmentState;
                        break;
                    }
                    case SEGMENT_UPLOAD_COMPLETE: {
                        this.putValid(segmentState);
                        this.uploadInProgress = null;
                        break;
                    }
                    case SEGMENT_DELETE_INITIATE: {
                        this.removeValid(segmentState);
                        this.trackPreceedingMetadata(segmentState);
                        break;
                    }
                    case SEGMENT_DELETE_COMPLETE: {
                        this.trackPreceedingMetadata(segmentState);
                        this.allSegmentStates.remove(segmentState.objectId());
                        this.deletedSegments.add(segmentState.objectId());
                        break;
                    }
                    case SEGMENT_FENCED: {
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown state " + metadata + " for " + this.topicIdPartition);
                    }
                }
            }
        }

        private void updateState(UUID objectId, TierObjectMetadata.State newState) throws IOException {
            SegmentState currentState = this.getState(objectId);
            if (currentState == null) {
                throw new IllegalStateException("No metadata found for " + objectId + " in " + this.topicIdPartition);
            }
            if (!objectId.equals(currentState.objectId())) {
                throw new IllegalStateException("id mismatch. Expected: " + objectId + " Got: " + currentState.objectId() + " Partition: " + this.topicIdPartition);
            }
            TierObjectMetadata metadata = FileTierPartitionStateUtils.read(this.topicIdPartition, this.channel, currentState.filePosition());
            if (!objectId.equals(metadata.objectId())) {
                throw new IllegalStateException("id mismatch. Expected: " + objectId + " Got: " + currentState.objectId() + " Partition: " + this.topicIdPartition);
            }
            int oldSize = metadata.payloadSize();
            metadata.mutateState(newState);
            int newSize = metadata.payloadSize();
            if (oldSize != newSize) {
                throw new IllegalStateException(String.format("Size mismatch for objectId %s, expected: %d, got: %d, topicIdPartition: %s.", metadata.objectId(), oldSize, newSize, this.topicIdPartition));
            }
            this.channel.write(metadata.payloadBuffer(), currentState.filePosition() + 2);
            this.addSegmentMetadata(metadata, currentState.filePosition());
            this.dirty = true;
        }

        private void fenceSegment(SegmentState segmentState) throws IOException {
            this.updateState(segmentState.objectId(), TierObjectMetadata.State.SEGMENT_FENCED);
            if (this.uploadInProgress != null && this.uploadInProgress.objectId().equals(segmentState.objectId())) {
                this.uploadInProgress = null;
            }
        }

        private TierPartitionState.AppendResult handleUploadInitiate(TierSegmentUploadInitiate uploadInitiate) throws IOException {
            TierObjectMetadata metadata = new TierObjectMetadata(uploadInitiate);
            if (metadata.endOffset() > this.endOffset) {
                if (this.uploadInProgress != null) {
                    this.fenceSegment(this.uploadInProgress);
                }
                ByteBuffer metadataBuffer = metadata.payloadBuffer();
                long byteOffset = this.appendWithSizePrefix(metadataBuffer);
                this.addSegmentMetadata(metadata, byteOffset);
                this.dirty = true;
                return TierPartitionState.AppendResult.ACCEPTED;
            }
            log.info("Fencing uploadInitiate for {}. currentEndOffset={} currentEpoch={}. ({})", new Object[]{metadata, this.endOffset, this.currentEpoch, this.topicIdPartition});
            return TierPartitionState.AppendResult.FENCED;
        }

        private TierPartitionState.AppendResult handleUploadComplete(TierSegmentUploadComplete uploadComplete) throws IOException {
            if (!this.uploadInProgress.objectId().equals(uploadComplete.objectId())) {
                throw new IllegalStateException("Expected " + this.uploadInProgress.objectId() + " to be in-progress but got " + uploadComplete.objectId() + " for partition " + this.topicIdPartition);
            }
            this.updateState(uploadComplete.objectId(), TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE);
            return TierPartitionState.AppendResult.ACCEPTED;
        }

        private TierPartitionState.AppendResult handleDeleteInitiate(TierSegmentDeleteInitiate deleteInitiate) throws IOException {
            this.updateState(deleteInitiate.objectId(), TierObjectMetadata.State.SEGMENT_DELETE_INITIATE);
            return TierPartitionState.AppendResult.ACCEPTED;
        }

        private TierPartitionState.AppendResult handleDeleteComplete(TierSegmentDeleteComplete deleteComplete) throws IOException {
            this.updateState(deleteComplete.objectId(), TierObjectMetadata.State.SEGMENT_DELETE_COMPLETE);
            return TierPartitionState.AppendResult.ACCEPTED;
        }

        private void registerListener(MaterializationListener listener) throws IOException {
            Class<?> ty = listener.getClass();
            MaterializationListener old = this.listeners.remove(ty);
            if (old != null) {
                old.cancel(new IllegalStateException(String.format("%s listener already registered: ", ty.getName())));
            }
            if (!this.tryCompleteListener(listener, Optional.empty())) {
                this.listeners.put(ty, listener);
                log.info("Registered materialization listener {}", (Object)listener);
            }
        }

        public CompletableFuture<Optional<TierLogSegment>> materializeUptoLeaderEpoch(int targetEpoch) throws IOException {
            CompletableFuture<Optional<TierLogSegment>> promise = new CompletableFuture<Optional<TierLogSegment>>();
            MaterializationListener.LeaderEpoch listener = new MaterializationListener.LeaderEpoch(log, this.topicIdPartition, promise, targetEpoch);
            this.registerListener(listener);
            return promise;
        }

        public CompletableFuture<TierLogSegment> materializationListener(long targetOffset) throws IOException {
            CompletableFuture<TierLogSegment> promise = new CompletableFuture<TierLogSegment>();
            MaterializationListener.ReplicationTargetOffset listener = new MaterializationListener.ReplicationTargetOffset(log, this.topicIdPartition, promise, targetOffset);
            this.registerListener(listener);
            return promise;
        }

        public CompletableFuture<TierLogSegment> materializationListener(long upperBoundEndOffset, UUID targetObjectId, long targetRestoreEpoch) throws IOException {
            CompletableFuture<TierLogSegment> promise = new CompletableFuture<TierLogSegment>();
            MaterializationListener.ReplicationTargetObjectId listener = new MaterializationListener.ReplicationTargetObjectId(log, this.topicIdPartition, promise, targetObjectId, targetRestoreEpoch, upperBoundEndOffset);
            this.registerListener(listener);
            return promise;
        }

        public CompletableFuture<Boolean> trackMetadataInitialization(int targetLeaderEpoch) throws IllegalStateException, IOException {
            if (this.listeners.containsKey(MaterializationListener.Initialization.class)) {
                throw new IllegalStateException(String.format("%s listener already registered: ", MaterializationListener.Initialization.class.getName()));
            }
            CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
            MaterializationListener.Initialization listener = new MaterializationListener.Initialization(log, this.topicIdPartition, promise, targetLeaderEpoch);
            this.registerListener(listener);
            return promise;
        }

        void closeListeners() {
            TierPartitionStateIllegalListenerException exception = new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicIdPartition + " has been closed.");
            for (MaterializationListener listener : this.listeners.values()) {
                listener.cancel(exception);
            }
            this.listeners.clear();
        }

        private void close() throws IOException {
            this.closeListeners();
            if (this.channel != null) {
                this.channel.close();
            }
        }

        private boolean validate() throws IOException, InstantiationException, IllegalAccessException {
            if (this.status.isOpenForWrite() && this.channel != null) {
                return this.channel.validate();
            }
            log.warn("Checksum validation not performed on: " + this.topicIdPartition + " as it is not open for write(" + (Object)((Object)this.status) + ")");
            return true;
        }

        private boolean flush() throws IOException {
            if (!this.dirty) {
                return false;
            }
            if (this.status.hasError()) {
                this.flushErrorState();
                this.dirty = false;
                return true;
            }
            if (this.status.isOpenForWrite()) {
                this.flushWritableState();
                this.dirty = false;
                return true;
            }
            log.info("Ignored state flush due to status: " + (Object)((Object)this.status) + " for " + this.topicIdPartition);
            return false;
        }

        private void flushWritableState() throws IOException {
            FileTierPartitionState.writeHeader(this.channel, new Header(this.topicIdPartition.topicId(), this.version, this.currentEpoch, this.status, this.startOffset().orElse(-1L), this.endOffset, this.globalMaterializedOffsetAndEpoch, this.localMaterializedOffsetAndEpoch, this.errorOffsetAndEpoch, this.restoreOffsetAndEpoch));
            this.channel.flush();
            Files.copy(FileTierPartitionState.mutableFilePath(this.basePath, this.checksumAlgorithm), FileTierPartitionState.tmpFilePath(this.basePath, this.checksumAlgorithm), StandardCopyOption.REPLACE_EXISTING);
            Utils.atomicMoveWithFallback((Path)FileTierPartitionState.tmpFilePath(this.basePath, this.checksumAlgorithm), (Path)FileTierPartitionState.flushedFilePath(this.basePath, this.checksumAlgorithm));
            this.committedEndOffset = this.endOffset;
        }

        private void flushErrorState() throws IOException {
            if (this.errorStatusReachedViaFenceEvent) {
                this.flushWritableState();
            } else {
                this.flushHeaderWithErrorStatus();
                FileTierPartitionState.backupState(this.topicIdPartition, this.basePath, FileTierPartitionState.errorFilePath(this.basePath, this.checksumAlgorithm), this.checksumAlgorithm);
            }
        }

        private void flushHeaderWithErrorStatus() throws IOException {
            Path flushedFilePathHandle = FileTierPartitionState.flushedFilePath(this.basePath, this.checksumAlgorithm);
            Path tmpFilePathHandle = FileTierPartitionState.tmpFilePath(this.basePath, this.checksumAlgorithm);
            if (!Files.exists(flushedFilePathHandle, new LinkOption[0])) {
                log.warn("Flushed file absent, creating empty file for {}: {}", (Object)this.topicIdPartition, (Object)flushedFilePathHandle);
                CheckedFileIO.create(flushedFilePathHandle, this.checksumAlgorithm, this.checksumSuperBlockLength);
            }
            Files.copy(flushedFilePathHandle, tmpFilePathHandle, StandardCopyOption.REPLACE_EXISTING);
            try (CheckedFileIO channel = CheckedFileIO.open(tmpFilePathHandle, StandardOpenOption.READ, StandardOpenOption.WRITE);){
                Header newHeader;
                Optional<Header> existingHeaderOpt = FileTierPartitionState.readHeader(channel);
                if (existingHeaderOpt.isPresent()) {
                    Header existingHeader = existingHeaderOpt.get();
                    newHeader = new Header(existingHeader.topicId(), (byte)existingHeader.version(), existingHeader.tierEpoch(), TierPartitionStatus.ERROR, existingHeader.startOffset(), existingHeader.endOffset(), existingHeader.globalMaterializedOffsetAndEpoch(), existingHeader.localMaterializedOffsetAndEpoch(), this.errorOffsetAndEpoch, existingHeader.restoreOffsetAndEpoch());
                    log.warn("Writing new header to tier partition state for {}: {}", (Object)this.topicIdPartition, (Object)newHeader);
                } else {
                    newHeader = new Header(this.topicIdPartition.topicId(), this.version, -1, TierPartitionStatus.ERROR, -1L, -1L, OffsetAndEpoch.EMPTY, OffsetAndEpoch.EMPTY, OffsetAndEpoch.EMPTY, this.errorOffsetAndEpoch);
                    log.warn("Header not found! Writing new header to tier partition state for {}: {}", (Object)this.topicIdPartition, (Object)newHeader);
                    channel.truncate(0L);
                }
                FileTierPartitionState.writeHeader(channel, newHeader);
                channel.flush();
                Utils.atomicMoveWithFallback((Path)tmpFilePathHandle, (Path)flushedFilePathHandle);
            }
        }

        private long appendWithSizePrefix(ByteBuffer metadataBuffer) throws IOException {
            long byteOffset = this.channel.position();
            int remaining = metadataBuffer.remaining();
            short sizePrefix = (short)remaining;
            if (sizePrefix != remaining) {
                throw new IllegalStateException(String.format("Unexpected metadataBuffer size: %d", remaining));
            }
            ByteBuffer sizeBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
            sizeBuf.putShort(0, sizePrefix);
            this.channel.write(sizeBuf);
            this.channel.write(metadataBuffer);
            return byteOffset;
        }
    }

    private static enum StateFileType {
        FLUSHED(""),
        MUTABLE(".mutable"),
        TEMPORARY(".tmp"),
        ERROR(".error"),
        DISCARDED(".discarded"),
        RECOVER(".recover");

        private final String suffix;

        private StateFileType(String suffix) {
            this.suffix = suffix;
        }

        public Path filePath(String basePath, Algorithm checksumAlgorithm) {
            return CheckedFileIO.validPath(checksumAlgorithm, Paths.get(basePath + this.suffix, new String[0]));
        }
    }
}

