/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.AbstractChangelogStateBackend;
import org.apache.flink.state.changelog.ChangelogStateFactory;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.state.changelog.restore.ChangelogMigrationRestoreTarget;

@Internal
public class DeactivatedChangelogStateBackend
extends AbstractChangelogStateBackend {
    private static final long serialVersionUID = 1000L;

    DeactivatedChangelogStateBackend(StateBackend stateBackend) {
        super(stateBackend);
    }

    @Override
    protected <K> CheckpointableKeyedStateBackend<K> restore(Environment env, String operatorIdentifier, KeyGroupRange keyGroupRange, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<ChangelogStateBackendHandle> stateBackendHandles, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> baseBackendBuilder) throws Exception {
        stateBackendHandles = this.reboundCheckpoint(stateBackendHandles);
        ChangelogStateFactory changelogStateFactory = new ChangelogStateFactory();
        return ChangelogBackendRestoreOperation.restore(env.getTaskManagerInfo().getConfiguration(), env.getUserCodeClassLoader().asClassLoader(), env.getTaskStateManager(), stateBackendHandles, baseBackendBuilder, (baseBackend, baseState) -> new ChangelogMigrationRestoreTarget(baseBackend, changelogStateFactory));
    }

    private Collection<ChangelogStateBackendHandle> reboundCheckpoint(Collection<ChangelogStateBackendHandle> stateBackendHandles) {
        return stateBackendHandles.stream().map(changelogStateBackendHandle -> changelogStateBackendHandle.rebound(changelogStateBackendHandle.getCheckpointId())).collect(Collectors.toList());
    }
}

