/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
import org.apache.flink.state.changelog.AbstractChangelogState;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup;
import org.apache.flink.state.changelog.ChangelogStateFactory;
import org.apache.flink.state.changelog.ChangelogTruncateHelper;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.KvStateChangeLoggerImpl;
import org.apache.flink.state.changelog.PriorityQueueStateChangeLoggerImpl;
import org.apache.flink.state.changelog.StateChangeLogger;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.state.common.PeriodicMaterializationManager;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend<K>,
InternalCheckpointListener,
PeriodicMaterializationManager.MaterializationTarget {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
    private static final CheckpointOptions CHECKPOINT_OPTIONS = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final Map<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private final ChangelogStateFactory changelogStateFactory;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter;
    private final Closer closer = Closer.create();
    private final CheckpointStreamFactory streamFactory;
    private ChangelogSnapshotState changelogSnapshotState;
    private long lastCheckpointId = -1L;
    private long materializedId = 0L;
    private InternalKvState lastState;
    private String lastName;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();
    private final ChangelogStateBackendMetricGroup metrics;
    @Nullable
    private SequenceNumber lastUploadedFrom;
    @Nullable
    private SequenceNumber lastUploadedTo;
    private final String subtaskName;
    private short lastCreatedStateId = (short)-1;
    private final NavigableMap<Long, Long> materializationIdByCheckpointId = new TreeMap<Long, Long>();
    private long lastConfirmedMaterializationId = -1L;
    private long lastFailedMaterializationId = -1L;
    private final ChangelogTruncateHelper changelogTruncateHelper;
    private AtomicBoolean hasCompletedMaterialization = new AtomicBoolean(false);
    private boolean isRescaling = false;

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, CheckpointStorageWorkerView checkpointStorageWorkerView) {
        this(keyedStateBackend, subtaskName, executionConfig, ttlTimeProvider, new ChangelogStateBackendMetricGroup(metricGroup), stateChangelogWriter, initialState, checkpointStorageWorkerView, new ChangelogStateFactory());
    }

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, ChangelogStateBackendMetricGroup metricGroup, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, final CheckpointStorageWorkerView checkpointStorageWorkerView, ChangelogStateFactory changelogStateFactory) {
        this.keyedStateBackend = keyedStateBackend;
        this.subtaskName = subtaskName;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.keyValueStatesByName = new HashMap();
        this.metrics = metricGroup;
        this.changelogStateFactory = changelogStateFactory;
        this.stateChangelogWriter = stateChangelogWriter;
        this.lastUploadedTo = stateChangelogWriter.initialSequenceNumber();
        this.closer.register(() -> stateChangelogWriter.truncateAndClose(this.lastUploadedTo));
        this.changelogSnapshotState = this.completeRestore(initialState);
        this.streamFactory = new CheckpointStreamFactory(){

            public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
                return checkpointStorageWorkerView.createTaskOwnedStateStream();
            }

            public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) throws IOException {
                return false;
            }

            public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
                return null;
            }
        };
        this.closer.register(keyedStateBackend);
        this.changelogTruncateHelper = new ChangelogTruncateHelper(stateChangelogWriter);
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void setCurrentKey(K newKey) {
        this.keyedStateBackend.setCurrentKey(newKey);
    }

    public K getCurrentKey() {
        return (K)this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        return this.keyedStateBackend.getKeys(state, namespace);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return this.keyedStateBackend.getKeysAndNamespaces(state);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
        this.changelogStateFactory.dispose();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keyedStateBackend.registerKeySelectionListener(listener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(listener);
    }

    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        this.keyedStateBackend.applyToAllKeys(namespace, namespaceSerializer, stateDescriptor, function, this::getPartitionedState);
    }

    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            this.functionDelegationHelper.addOrUpdate(stateDescriptor);
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        if (checkpointOptions.getCheckpointType().isSavepoint()) {
            return this.nativeSavepoint(checkpointId, timestamp, streamFactory, checkpointOptions);
        }
        this.lastCheckpointId = checkpointId;
        this.lastUploadedFrom = this.changelogSnapshotState.lastMaterializedTo();
        this.lastUploadedTo = this.stateChangelogWriter.nextSequenceNumber();
        this.changelogTruncateHelper.checkpoint(checkpointId, this.lastUploadedTo);
        LOG.info("snapshot of {} for checkpoint {}, change range: {}..{}, materialization ID {}", new Object[]{this.subtaskName, checkpointId, this.lastUploadedFrom, this.lastUploadedTo, this.changelogSnapshotState.getMaterializationID()});
        ChangelogSnapshotState changelogStateBackendStateCopy = this.changelogSnapshotState;
        this.materializationIdByCheckpointId.put(checkpointId, changelogStateBackendStateCopy.materializationID);
        return ChangelogKeyedStateBackend.toRunnableFuture(((CompletableFuture)((CompletableFuture)this.stateChangelogWriter.persist(this.lastUploadedFrom, checkpointId).thenApply(delta -> this.buildSnapshotResult(checkpointId, (SnapshotResult<ChangelogStateHandle>)delta, changelogStateBackendStateCopy))).whenComplete((snapshotResult, throwable) -> this.metrics.reportSnapshotResult((SnapshotResult<ChangelogStateBackendHandle>)snapshotResult))).thenApply(this::castSnapshotResult));
    }

    private RunnableFuture<SnapshotResult<KeyedStateHandle>> nativeSavepoint(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        if (sharingFilesStrategy != SnapshotType.SharingFilesStrategy.NO_SHARING) {
            throw new UnsupportedOperationException("ChangelogKeyedStateBackend doesn't support native savepoint with SharingFilesStrategy: " + sharingFilesStrategy);
        }
        long materializationID = this.materializedId++;
        final RunnableFuture delegatedSnapshotResult = this.keyedStateBackend.snapshot(materializationID, timestamp, streamFactory, checkpointOptions);
        this.materializationIdByCheckpointId.put(checkpointId, materializationID);
        return new FutureTask<SnapshotResult<KeyedStateHandle>>(() -> {
            SnapshotResult result = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)delegatedSnapshotResult);
            return this.castSnapshotResult(this.buildSnapshotResult(checkpointId, (SnapshotResult<ChangelogStateHandle>)SnapshotResult.empty(), new ChangelogSnapshotState(this.getMaterializedResult((SnapshotResult<KeyedStateHandle>)result), materializationID)));
        }){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return delegatedSnapshotResult.cancel(mayInterruptIfRunning) && super.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return delegatedSnapshotResult.isCancelled() && super.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegatedSnapshotResult.isDone() && super.isDone();
            }
        };
    }

    private SnapshotResult<KeyedStateHandle> castSnapshotResult(SnapshotResult<?> snapshotResult) {
        return snapshotResult;
    }

    private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult(long checkpointId, SnapshotResult<? extends ChangelogStateHandle> delta, ChangelogSnapshotState changelogStateBackendStateCopy) {
        ArrayList<ChangelogStateHandle> prevDeltaCopy = new ArrayList<ChangelogStateHandle>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
        long persistedSizeOfThisCheckpoint = 0L;
        if (delta != null && delta.getJobManagerOwnedSnapshot() != null && ((ChangelogStateHandle)delta.getJobManagerOwnedSnapshot()).getStateSize() > 0L) {
            prevDeltaCopy.add((ChangelogStateHandle)delta.getJobManagerOwnedSnapshot());
            persistedSizeOfThisCheckpoint += ((ChangelogStateHandle)delta.getJobManagerOwnedSnapshot()).getCheckpointedSize();
        }
        if (prevDeltaCopy.isEmpty() && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
            return SnapshotResult.empty();
        }
        if (!(this.isRescaling && !this.hasCompletedMaterialization.get() || changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty() && delta.getTaskLocalSnapshot() == null)) {
            ArrayList<ChangelogStateHandle> localDeltaCopy = new ArrayList<ChangelogStateHandle>(changelogStateBackendStateCopy.getLocalRestoredNonMaterialized());
            if (delta != null && delta.getTaskLocalSnapshot() != null && ((ChangelogStateHandle)delta.getTaskLocalSnapshot()).getStateSize() > 0L) {
                localDeltaCopy.add((ChangelogStateHandle)delta.getTaskLocalSnapshot());
            }
            ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl jmHandle = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(changelogStateBackendStateCopy.getMaterializedSnapshot(), prevDeltaCopy, this.getKeyGroupRange(), checkpointId, changelogStateBackendStateCopy.materializationID, persistedSizeOfThisCheckpoint);
            return SnapshotResult.withLocalState((StateObject)jmHandle, (StateObject)new ChangelogStateBackendLocalHandle(changelogStateBackendStateCopy.getLocalMaterializedSnapshot(), localDeltaCopy, jmHandle));
        }
        return SnapshotResult.of((StateObject)new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(changelogStateBackendStateCopy.getMaterializedSnapshot(), prevDeltaCopy, this.getKeyGroupRange(), checkpointId, changelogStateBackendStateCopy.materializationID, persistedSizeOfThisCheckpoint));
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        KeyGroupedInternalPriorityQueue internalPriorityQueue = this.keyedStateBackend.create(stateName, byteOrderedElementSerializer);
        ChangelogKeyGroupedPriorityQueue<T> queue = (ChangelogKeyGroupedPriorityQueue<T>)this.changelogStateFactory.getExistingState(stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        if (queue == null) {
            queue = this.changelogStateFactory.create(stateName, internalPriorityQueue, this.getPqStateChangeLogger(stateName, byteOrderedElementSerializer), byteOrderedElementSerializer);
        } else {
            this.updateChangelogState(queue, internalPriorityQueue, stateName, byteOrderedElementSerializer);
        }
        return queue;
    }

    private <T> void updateChangelogState(ChangelogKeyGroupedPriorityQueue<T> queue, KeyGroupedInternalPriorityQueue<T> priorityQueue, String stateName, TypeSerializer<T> byteOrderedElementSerializer) {
        RegisteredPriorityQueueStateBackendMetaInfo pqMetaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
        PriorityQueueStateChangeLoggerImpl stateChangeLogger = (PriorityQueueStateChangeLoggerImpl)queue.getStateChangeLogger();
        stateChangeLogger.setMetaInfo((RegisteredStateMetaInfoBase)pqMetaInfo);
        queue.setDelegatedState(priorityQueue);
    }

    private <T> StateChangeLogger<T, Void> getPqStateChangeLogger(String stateName, TypeSerializer<T> byteOrderedElementSerializer) {
        this.lastCreatedStateId = (short)(this.lastCreatedStateId + 1);
        PriorityQueueStateChangeLoggerImpl priorityQueueStateChangeLogger = new PriorityQueueStateChangeLoggerImpl(byteOrderedElementSerializer, this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer), this.lastCreatedStateId);
        this.closer.register(priorityQueueStateChangeLogger);
        return priorityQueueStateChangeLogger;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isSafeToReuseKVState() {
        return this.keyedStateBackend.isSafeToReuseKVState();
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Long materializationID;
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.confirm(this.lastUploadedFrom, this.lastUploadedTo, checkpointId);
        }
        if ((materializationID = (Long)this.materializationIdByCheckpointId.remove(checkpointId)) != null && materializationID > this.lastConfirmedMaterializationId) {
            this.keyedStateBackend.notifyCheckpointComplete(materializationID.longValue());
            this.lastConfirmedMaterializationId = materializationID;
        }
        this.materializationIdByCheckpointId.headMap(checkpointId, true).clear();
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.reset(this.lastUploadedFrom, this.lastUploadedTo, checkpointId);
        }
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        Preconditions.checkNotNull(this.getKeySerializer(), (String)"State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, (KeyedStateBackend)this, (TtlTimeProvider)this.ttlTimeProvider)), stateDescriptor, (LatencyTrackingStateConfig)this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return (S)kvState;
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        InternalKvState state = (InternalKvState)this.keyedStateBackend.createOrUpdateInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
        ChangelogState changelogState = this.changelogStateFactory.getExistingState(stateDesc.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
        if (changelogState == null) {
            changelogState = this.changelogStateFactory.create(stateDesc, state, this.getKvStateChangeLogger(state, stateDesc, snapshotTransformFactory), this.keyedStateBackend);
        } else {
            this.updateChangelogState(changelogState, state, stateDesc, snapshotTransformFactory);
        }
        return (IS)((State)changelogState);
    }

    private <SV, SEV, S extends State, IS extends InternalKvState<K, N, SV>, N> void updateChangelogState(ChangelogState changelogState, InternalKvState<K, N, SV> state, StateDescriptor<S, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getType(), stateDesc.getName(), state.getNamespaceSerializer(), state.getValueSerializer(), snapshotTransformFactory);
        AbstractChangelogState kvChangelogState = (AbstractChangelogState)changelogState;
        kvChangelogState.setDelegatedState(state);
        KvStateChangeLoggerImpl stateChangeLogger = (KvStateChangeLoggerImpl)kvChangelogState.getStateChangeLogger();
        ((KvStateChangeLoggerImpl)stateChangeLogger.setMetaInfo((RegisteredStateMetaInfoBase)meta)).setStateTtlConfig(stateDesc.getTtlConfig()).setDefaultValue(stateDesc.getDefaultValue());
    }

    private <SV, SEV, S extends State, N> KvStateChangeLogger<SV, N> getKvStateChangeLogger(InternalKvState<K, N, SV> state, StateDescriptor<S, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getType(), stateDesc.getName(), state.getNamespaceSerializer(), state.getValueSerializer(), snapshotTransformFactory);
        this.lastCreatedStateId = (short)(this.lastCreatedStateId + 1);
        KvStateChangeLoggerImpl kvStateChangeLogger = new KvStateChangeLoggerImpl(state.getKeySerializer(), state.getNamespaceSerializer(), state.getValueSerializer(), this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, (RegisteredStateMetaInfoBase)meta, stateDesc.getTtlConfig(), stateDesc.getDefaultValue(), this.lastCreatedStateId);
        this.closer.register(kvStateChangeLogger);
        return kvStateChangeLogger;
    }

    public void registerCloseable(@Nullable Closeable closeable) {
        this.closer.register(closeable);
    }

    private ChangelogSnapshotState completeRestore(Collection<ChangelogStateBackendHandle> stateHandles) {
        long materializationId = 0L;
        ArrayList<KeyedStateHandle> materialized = new ArrayList<KeyedStateHandle>();
        ArrayList<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<ChangelogStateHandle>();
        ArrayList<KeyedStateHandle> localMaterialized = new ArrayList<KeyedStateHandle>();
        ArrayList<ChangelogStateHandle> localRestoredNonMaterialized = new ArrayList<ChangelogStateHandle>();
        this.isRescaling = stateHandles.size() > 1;
        for (ChangelogStateBackendHandle h : stateHandles) {
            if (h == null) continue;
            if (h instanceof ChangelogStateBackendLocalHandle) {
                ChangelogStateBackendLocalHandle localHandle = (ChangelogStateBackendLocalHandle)h;
                materialized.addAll(localHandle.getRemoteMaterializedStateHandles());
                restoredNonMaterialized.addAll(localHandle.getRemoteNonMaterializedStateHandles());
                localMaterialized.addAll(localHandle.getMaterializedStateHandles());
                localRestoredNonMaterialized.addAll(localHandle.getNonMaterializedStateHandles());
            } else {
                materialized.addAll(h.getMaterializedStateHandles());
                restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
            }
            materializationId = Math.max(materializationId, h.getMaterializationID());
        }
        this.lastConfirmedMaterializationId = materializationId;
        this.materializedId = materializationId + 1L;
        if (!(this.isRescaling || localMaterialized.isEmpty() && localRestoredNonMaterialized.isEmpty())) {
            return new ChangelogSnapshotState(materialized, localMaterialized, restoredNonMaterialized, localRestoredNonMaterialized, this.stateChangelogWriter.initialSequenceNumber(), materializationId);
        }
        return new ChangelogSnapshotState(materialized, restoredNonMaterialized, this.stateChangelogWriter.initialSequenceNumber(), materializationId);
    }

    public Optional<PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws Exception {
        if (this.lastConfirmedMaterializationId < this.materializedId - 1L && this.lastFailedMaterializationId < this.materializedId - 1L) {
            LOG.info("materialization:{} not confirmed or failed or cancelled, skip trigger new one.", (Object)(this.materializedId - 1L));
            return Optional.empty();
        }
        SequenceNumber upTo = this.stateChangelogWriter.nextSequenceNumber();
        SequenceNumber lastMaterializedTo = this.changelogSnapshotState.lastMaterializedTo();
        LOG.info("Initialize Materialization. Current changelog writers last append to sequence number {}", (Object)upTo);
        if (upTo.compareTo((Object)lastMaterializedTo) > 0) {
            LOG.info("Starting materialization from {} : {}", (Object)lastMaterializedTo, (Object)upTo);
            long materializationID = this.materializedId++;
            PeriodicMaterializationManager.MaterializationRunnable materializationRunnable = new PeriodicMaterializationManager.MaterializationRunnable(this.keyedStateBackend.snapshot(materializationID, System.currentTimeMillis(), this.streamFactory, CHECKPOINT_OPTIONS), materializationID, upTo);
            this.changelogStateFactory.resetAllWritingMetaFlags();
            return Optional.of(materializationRunnable);
        }
        LOG.debug("Skip materialization, last materialized to {} : last log to {}", (Object)lastMaterializedTo, (Object)upTo);
        return Optional.empty();
    }

    public void handleMaterializationResult(SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo) {
        LOG.info("Task {} finishes materialization, updates the snapshotState upTo {} : {}", new Object[]{this.subtaskName, upTo, materializedSnapshot});
        this.changelogSnapshotState = materializedSnapshot.getTaskLocalSnapshot() == null ? new ChangelogSnapshotState(this.getMaterializedResult(materializedSnapshot), Collections.emptyList(), upTo, materializationID) : new ChangelogSnapshotState(this.getMaterializedResult(materializedSnapshot), this.getLocalMaterializedResult(materializedSnapshot), Collections.emptyList(), Collections.emptyList(), upTo, materializationID);
        this.hasCompletedMaterialization.set(true);
        this.changelogTruncateHelper.materialized(upTo);
    }

    public void handleMaterializationFailureOrCancellation(long materializationID, SequenceNumber upTo, Throwable cause) {
        LOG.info("Task {} failed or cancelled materialization:{} which is upTo:{}", new Object[]{this.subtaskName, materializationID, upTo});
        this.lastFailedMaterializationId = Math.max(this.lastFailedMaterializationId, materializationID);
    }

    private List<KeyedStateHandle> getMaterializedResult(@Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
        KeyedStateHandle jobManagerOwned = (KeyedStateHandle)materializedSnapshot.getJobManagerOwnedSnapshot();
        return jobManagerOwned == null ? Collections.emptyList() : Collections.singletonList(jobManagerOwned);
    }

    private List<KeyedStateHandle> getLocalMaterializedResult(@Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
        KeyedStateHandle taskLocalSnapshot = (KeyedStateHandle)materializedSnapshot.getTaskLocalSnapshot();
        return taskLocalSnapshot == null ? Collections.emptyList() : Collections.singletonList(taskLocalSnapshot);
    }

    public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
        return this.keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
    }

    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
        this.changelogTruncateHelper.checkpointSubsumed(checkpointId);
    }

    public ChangelogRestoreTarget<K> getChangelogRestoreTarget() {
        return new ChangelogRestoreTarget<K>(){

            @Override
            public KeyGroupRange getKeyGroupRange() {
                return ChangelogKeyedStateBackend.this.getKeyGroupRange();
            }

            @Override
            public <N, S extends State, V> S createKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
                InternalKvState kvState = (InternalKvState)ChangelogKeyedStateBackend.this.keyedStateBackend.createOrUpdateInternalState(namespaceSerializer, stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform(), true);
                ChangelogState changelogState = ChangelogKeyedStateBackend.this.changelogStateFactory.getExistingState(stateDescriptor.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
                if (changelogState == null) {
                    changelogState = ChangelogKeyedStateBackend.this.changelogStateFactory.create(stateDescriptor, kvState, ChangelogKeyedStateBackend.this.getKvStateChangeLogger(kvState, stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform()), ChangelogKeyedStateBackend.this.keyedStateBackend);
                } else {
                    ChangelogKeyedStateBackend.this.updateChangelogState(changelogState, kvState, stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());
                }
                ChangelogKeyedStateBackend.this.functionDelegationHelper.addOrUpdate(stateDescriptor);
                return (S)((State)changelogState);
            }

            @Override
            @Nonnull
            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> createPqState(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
                KeyGroupedInternalPriorityQueue internalPriorityQueue = ChangelogKeyedStateBackend.this.keyedStateBackend.create(stateName, byteOrderedElementSerializer, true);
                ChangelogKeyGroupedPriorityQueue<T> queue = (ChangelogKeyGroupedPriorityQueue<T>)ChangelogKeyedStateBackend.this.changelogStateFactory.getExistingState(stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
                if (queue == null) {
                    queue = ChangelogKeyedStateBackend.this.changelogStateFactory.create(stateName, internalPriorityQueue, ChangelogKeyedStateBackend.this.getPqStateChangeLogger(stateName, byteOrderedElementSerializer), byteOrderedElementSerializer);
                } else {
                    ChangelogKeyedStateBackend.this.updateChangelogState(queue, internalPriorityQueue, stateName, byteOrderedElementSerializer);
                }
                return queue;
            }

            @Override
            public ChangelogState getExistingState(String name, StateMetaInfoSnapshot.BackendStateType type) {
                return ChangelogKeyedStateBackend.this.changelogStateFactory.getExistingState(name, type);
            }

            @Override
            public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
                return ChangelogKeyedStateBackend.this;
            }
        };
    }

    private static <T> RunnableFuture<T> toRunnableFuture(final CompletableFuture<T> f) {
        return new RunnableFuture<T>(){

            @Override
            public void run() {
                f.join();
            }

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return f.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return f.isCancelled();
            }

            @Override
            public boolean isDone() {
                return f.isDone();
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                return f.get();
            }

            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return f.get(timeout, unit);
            }
        };
    }

    @VisibleForTesting
    StateChangelogWriter<? extends ChangelogStateHandle> getChangelogWriter() {
        return this.stateChangelogWriter;
    }

    private class ChangelogSnapshotState {
        private final SnapshotResult<ChangelogStateBackendHandle> changelogSnapshot;
        private final SequenceNumber materializedTo;
        private final long materializationID;

        public ChangelogSnapshotState(List<KeyedStateHandle> materializedSnapshot, long materializationID) {
            this(materializedSnapshot, Collections.emptyList(), SequenceNumber.of((long)Long.MAX_VALUE), materializationID);
        }

        public ChangelogSnapshotState(List<KeyedStateHandle> materializedSnapshot, List<ChangelogStateHandle> restoredNonMaterialized, SequenceNumber materializedTo, long materializationID) {
            this.changelogSnapshot = SnapshotResult.of((StateObject)new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(materializedSnapshot, restoredNonMaterialized, ChangelogKeyedStateBackend.this.getKeyGroupRange(), ChangelogKeyedStateBackend.this.lastCheckpointId, materializationID, 0L));
            this.materializedTo = materializedTo;
            this.materializationID = materializationID;
        }

        public ChangelogSnapshotState(List<KeyedStateHandle> materializedSnapshot, List<KeyedStateHandle> localMaterializedSnapshot, List<ChangelogStateHandle> restoredNonMaterialized, List<ChangelogStateHandle> localRestoredNonMaterialized, SequenceNumber materializedTo, long materializationID) {
            ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl jmHandle = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(materializedSnapshot, restoredNonMaterialized, ChangelogKeyedStateBackend.this.getKeyGroupRange(), ChangelogKeyedStateBackend.this.lastCheckpointId, materializationID, 0L);
            this.changelogSnapshot = SnapshotResult.withLocalState((StateObject)jmHandle, (StateObject)new ChangelogStateBackendLocalHandle(localMaterializedSnapshot, localRestoredNonMaterialized, jmHandle));
            this.materializedTo = materializedTo;
            this.materializationID = materializationID;
        }

        public List<KeyedStateHandle> getMaterializedSnapshot() {
            return this.changelogSnapshot.getJobManagerOwnedSnapshot() != null ? ((ChangelogStateBackendHandle)this.changelogSnapshot.getJobManagerOwnedSnapshot()).getMaterializedStateHandles() : Collections.emptyList();
        }

        public List<KeyedStateHandle> getLocalMaterializedSnapshot() {
            return this.changelogSnapshot.getTaskLocalSnapshot() != null ? ((ChangelogStateBackendHandle)this.changelogSnapshot.getTaskLocalSnapshot()).getMaterializedStateHandles() : Collections.emptyList();
        }

        public List<ChangelogStateHandle> getRestoredNonMaterialized() {
            return this.changelogSnapshot.getJobManagerOwnedSnapshot() != null ? ((ChangelogStateBackendHandle)this.changelogSnapshot.getJobManagerOwnedSnapshot()).getNonMaterializedStateHandles() : Collections.emptyList();
        }

        public List<ChangelogStateHandle> getLocalRestoredNonMaterialized() {
            return this.changelogSnapshot.getTaskLocalSnapshot() != null ? ((ChangelogStateBackendHandle)this.changelogSnapshot.getTaskLocalSnapshot()).getNonMaterializedStateHandles() : Collections.emptyList();
        }

        public SequenceNumber lastMaterializedTo() {
            return this.materializedTo;
        }

        public long getMaterializationID() {
            return this.materializationID;
        }
    }
}

