/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.HeapAggregatingState;
import org.apache.flink.runtime.state.heap.HeapFoldingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapMapState;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableByKeyGroupReader;
import org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders;
import org.apache.flink.runtime.state.heap.StateTableSnapshot;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap();
    private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapSnapshotStrategy snapshotStrategy;

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, boolean asynchronousSnapshots, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        SnapshotStrategySynchronicityBehavior synchronicityTrait = asynchronousSnapshots ? new AsyncSnapshotStrategySynchronicityBehavior() : new SyncSnapshotStrategySynchronicityBehavior();
        this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait);
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.restoredKvStateMetaInfos = new HashMap();
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
        StateTable<K, Object, Object> stateTable = this.stateTables.get(stateDesc.getName());
        if (stateTable != null) {
            RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(stateDesc.getName());
            Preconditions.checkState((restoredMetaInfoSnapshot != null ? 1 : 0) != 0, (Object)"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            RegisteredKeyedBackendStateMetaInfo<?, ?> newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(restoredMetaInfoSnapshot, namespaceSerializer, stateDesc);
            stateTable.setMetaInfo(newMetaInfo);
        } else {
            RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer());
            stateTable = this.snapshotStrategy.newStateTable(newMetaInfo);
            this.stateTables.put(stateDesc.getName(), stateTable);
        }
        return stateTable;
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        if (!this.stateTables.containsKey(state)) {
            return Stream.empty();
        }
        StateTable<K, ?, ?> table = this.stateTables.get(state);
        return table.getKeys(namespace);
    }

    private boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override
    public <N, V> InternalValueState<K, N, V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
        StateTable<K, N, V> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor<?, V>)stateDesc);
        return new HeapValueState<K, N, Object>(stateTable, this.keySerializer, (TypeSerializer<Object>)stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), stateDesc.getDefaultValue());
    }

    @Override
    public <N, T> InternalListState<K, N, T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapListState(stateTable, this.keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), (List)stateDesc.getDefaultValue());
    }

    @Override
    public <N, T> InternalReducingState<K, N, T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapReducingState<K, N, Object>(stateTable, this.keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), stateDesc.getDefaultValue(), stateDesc.getReduceFunction());
    }

    @Override
    public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(TypeSerializer<N> namespaceSerializer, AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapAggregatingState(stateTable, this.keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), stateDesc.getDefaultValue(), stateDesc.getAggregateFunction());
    }

    @Override
    public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        StateTable<K, N, ACC> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapFoldingState(stateTable, this.keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), stateDesc.getDefaultValue(), stateDesc.getFoldFunction());
    }

    @Override
    protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, MapStateDescriptor<UK, UV> stateDesc) throws Exception {
        StateTable<K, N, UV> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapMapState(stateTable, this.keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), (Map)stateDesc.getDefaultValue());
    }

    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) {
        return this.snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    @Override
    public void restore(Collection<KeyedStateHandle> restoredState) throws Exception {
        if (restoredState == null || restoredState.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
        }
        this.restorePartitionedState(restoredState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restorePartitionedState(Collection<KeyedStateHandle> state) throws Exception {
        HashMap<Integer, String> kvStatesById = new HashMap<Integer, String>();
        int numRegisteredKvStates = 0;
        this.stateTables.clear();
        boolean keySerializerRestored = false;
        for (KeyedStateHandle keyedStateHandle : state) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
            }
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader, true);
                serializationProxy.read((DataInputView)inView);
                if (!keySerializerRestored) {
                    if (CompatibilityUtil.resolveCompatibilityResult(serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer)this.keySerializer).isRequiresMigration()) {
                        throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                    }
                    keySerializerRestored = true;
                }
                List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
                for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
                    this.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
                    StateTable<K, ?, ?> stateTable = this.stateTables.get(restoredMetaInfo.getName());
                    if (null != stateTable) continue;
                    RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(restoredMetaInfo.getStateType(), restoredMetaInfo.getName(), restoredMetaInfo.getNamespaceSerializer(), restoredMetaInfo.getStateSerializer());
                    stateTable = this.snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
                    this.stateTables.put(restoredMetaInfo.getName(), stateTable);
                    kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
                    ++numRegisteredKvStates;
                }
                StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) {
                    int keyGroupIndex = (Integer)groupOffset.f0;
                    long offset = (Long)groupOffset.f1;
                    Preconditions.checkState((boolean)this.keyGroupRange.contains(keyGroupIndex), (Object)"The key group must belong to the backend.");
                    fsDataInputStream.seek(offset);
                    int writtenKeyGroupIndex = inView.readInt();
                    InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression((InputStream)fsDataInputStream);
                    Throwable throwable = null;
                    try {
                        DataInputViewStreamWrapper kgCompressionInView = new DataInputViewStreamWrapper(kgCompressionInStream);
                        Preconditions.checkState((writtenKeyGroupIndex == keyGroupIndex ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
                        for (int i = 0; i < restoredMetaInfos.size(); ++i) {
                            short kvStateId = kgCompressionInView.readShort();
                            StateTable<K, ?, ?> stateTable = this.stateTables.get(kvStatesById.get(kvStateId));
                            StateTableByKeyGroupReader keyGroupReader = StateTableByKeyGroupReaders.readerForVersion(stateTable, serializationProxy.getReadVersion());
                            keyGroupReader.readMappingsInKeyGroup((DataInputView)kgCompressionInView, keyGroupIndex);
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (kgCompressionInStream == null) continue;
                        if (throwable != null) {
                            try {
                                kgCompressionInStream.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        kgCompressionInStream.close();
                    }
                }
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)fsDataInputStream)) continue;
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        try (Stream<K> keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            List keys = keyStream.collect(Collectors.toList());
            S state = this.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
            for (Object key : keys) {
                this.setCurrentKey(key);
                function.process(key, state);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override
    @VisibleForTesting
    public int numStateEntries() {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.sizeOfNamespace(namespace);
        }
        return sum;
    }

    @Override
    public boolean supportsAsynchronousSnapshots() {
        return this.snapshotStrategy.isAsynchronous();
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }

    private class HeapSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
    SnapshotStrategySynchronicityBehavior<K> {
        private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;

        public HeapSnapshotStrategy(SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait) {
            this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityTrait;
        }

        @Override
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) {
            if (!HeapKeyedStateBackend.this.hasRegisteredState()) {
                return DoneFuture.of(SnapshotResult.empty());
            }
            long syncStartTime = System.currentTimeMillis();
            Preconditions.checkState((HeapKeyedStateBackend.this.stateTables.size() <= Short.MAX_VALUE ? 1 : 0) != 0, (Object)("Too many KV-States: " + HeapKeyedStateBackend.this.stateTables.size() + ". Currently at most " + Short.MAX_VALUE + " states are supported"));
            ArrayList metaInfoSnapshots = new ArrayList(HeapKeyedStateBackend.this.stateTables.size());
            final HashMap<String, Integer> kVStateToId = new HashMap<String, Integer>(HeapKeyedStateBackend.this.stateTables.size());
            final HashMap<String, StateTableSnapshot> cowStateStableSnapshots = new HashMap<String, StateTableSnapshot>(HeapKeyedStateBackend.this.stateTables.size());
            for (Map.Entry kvState : HeapKeyedStateBackend.this.stateTables.entrySet()) {
                String stateName = (String)kvState.getKey();
                kVStateToId.put(stateName, kVStateToId.size());
                StateTable stateTable = (StateTable)kvState.getValue();
                if (null == stateTable) continue;
                metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
                cowStateStableSnapshots.put(stateName, stateTable.createSnapshot());
            }
            final KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(HeapKeyedStateBackend.this.keySerializer, metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapKeyedStateBackend.this.keyGroupCompressionDecorator));
            final SupplierWithException checkpointStreamSupplier = HeapKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory, HeapKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
            AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){
                CheckpointStreamWithResultProvider streamAndResultExtractor = null;

                @Override
                protected void acquireResources() throws Exception {
                    this.streamAndResultExtractor = (CheckpointStreamWithResultProvider)checkpointStreamSupplier.get();
                    HeapKeyedStateBackend.this.cancelStreamRegistry.registerCloseable((Closeable)this.streamAndResultExtractor);
                }

                @Override
                protected void releaseResources() {
                    this.unregisterAndCloseStreamAndResultExtractor();
                    for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) {
                        tableSnapshot.release();
                    }
                }

                @Override
                protected void stopOperation() {
                    this.unregisterAndCloseStreamAndResultExtractor();
                }

                private void unregisterAndCloseStreamAndResultExtractor() {
                    if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                        IOUtils.closeQuietly((Closeable)this.streamAndResultExtractor);
                        this.streamAndResultExtractor = null;
                    }
                }

                @Override
                @Nonnull
                protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long startTime = System.currentTimeMillis();
                    CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                    DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)localStream));
                    serializationProxy.write((DataOutputView)outView);
                    long[] keyGroupRangeOffsets = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                    for (int keyGroupPos = 0; keyGroupPos < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                        int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(keyGroupPos);
                        keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                        outView.writeInt(keyGroupId);
                        for (Map.Entry kvState : cowStateStableSnapshots.entrySet()) {
                            OutputStream kgCompressionOut = HeapKeyedStateBackend.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)((Object)localStream));
                            Throwable throwable = null;
                            try {
                                String stateName = (String)kvState.getKey();
                                DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
                                kgCompressionView.writeShort(((Integer)kVStateToId.get(stateName)).intValue());
                                ((StateTableSnapshot)kvState.getValue()).writeMappingsInKeyGroup((DataOutputView)kgCompressionView, keyGroupId);
                            }
                            catch (Throwable throwable2) {
                                throwable = throwable2;
                                throw throwable2;
                            }
                            finally {
                                if (kgCompressionOut == null) continue;
                                if (throwable != null) {
                                    try {
                                        kgCompressionOut.close();
                                    }
                                    catch (Throwable throwable3) {
                                        throwable.addSuppressed(throwable3);
                                    }
                                    continue;
                                }
                                kgCompressionOut.close();
                            }
                        }
                    }
                    if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                        KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, keyGroupRangeOffsets);
                        SnapshotResult<StreamStateHandle> result = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                        this.streamAndResultExtractor = null;
                        HeapSnapshotStrategy.this.logOperationCompleted(primaryStreamFactory, startTime);
                        return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);
                    }
                    return SnapshotResult.empty();
                }
            };
            AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable);
            this.finalizeSnapshotBeforeReturnHook(task);
            LOG.info("Heap backend snapshot (" + primaryStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
            return task;
        }

        @Override
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            this.snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
        }

        @Override
        public void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) {
            this.snapshotStrategySynchronicityTrait.logOperationCompleted(streamFactory, startTime);
        }

        @Override
        public boolean isAsynchronous() {
            return this.snapshotStrategySynchronicityTrait.isAsynchronous();
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
            return this.snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo);
        }
    }

    private class SyncSnapshotStrategySynchronicityBehavior
    implements SnapshotStrategySynchronicityBehavior<K> {
        private SyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            runnable.run();
        }

        @Override
        public boolean isAsynchronous() {
            return false;
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
            return new NestedMapsStateTable(HeapKeyedStateBackend.this, newMetaInfo);
        }
    }

    private class AsyncSnapshotStrategySynchronicityBehavior
    implements SnapshotStrategySynchronicityBehavior<K> {
        private AsyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override
        public void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) {
            LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime});
        }

        @Override
        public boolean isAsynchronous() {
            return true;
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
            return new CopyOnWriteStateTable(HeapKeyedStateBackend.this, newMetaInfo);
        }
    }

    private static interface SnapshotStrategySynchronicityBehavior<K> {
        default public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
        }

        default public void logOperationCompleted(CheckpointStreamFactory streamFactory, long startTime) {
        }

        public boolean isAsynchronous();

        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> var1);
    }
}

