/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.ttl;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.AbstractTtlState;
import org.apache.flink.runtime.state.ttl.TtlAggregateFunction;
import org.apache.flink.runtime.state.ttl.TtlAggregatingState;
import org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup;
import org.apache.flink.runtime.state.ttl.TtlListState;
import org.apache.flink.runtime.state.ttl.TtlMapState;
import org.apache.flink.runtime.state.ttl.TtlReduceFunction;
import org.apache.flink.runtime.state.ttl.TtlReducingState;
import org.apache.flink.runtime.state.ttl.TtlStateContext;
import org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.runtime.state.ttl.TtlValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
    private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories;
    @Nonnull
    private final TypeSerializer<N> namespaceSerializer;
    @Nonnull
    private final StateDescriptor<S, SV> stateDesc;
    @Nonnull
    private final KeyedStateBackend<K> stateBackend;
    @Nonnull
    private final StateTtlConfig ttlConfig;
    @Nonnull
    private final TtlTimeProvider timeProvider;
    private final long ttl;
    @Nullable
    private final TtlIncrementalCleanup<K, N, TTLSV> incrementalCleanup;

    public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc, KeyedStateBackend<K> stateBackend, TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(stateBackend);
        Preconditions.checkNotNull((Object)timeProvider);
        return stateDesc.getTtlConfig().isEnabled() ? super.createState() : stateBackend.createOrUpdateInternalState(namespaceSerializer, stateDesc);
    }

    private TtlStateFactory(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull KeyedStateBackend<K> stateBackend, @Nonnull TtlTimeProvider timeProvider) {
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.stateBackend = stateBackend;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = this.ttlConfig.getTtl().toMilliseconds();
        this.stateFactories = this.createStateFactories();
        this.incrementalCleanup = this.getTtlIncrementalCleanup();
    }

    private Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, this::createValueState), Tuple2.of((Object)StateDescriptor.Type.LIST, this::createListState), Tuple2.of((Object)StateDescriptor.Type.MAP, this::createMapState), Tuple2.of((Object)StateDescriptor.Type.REDUCING, this::createReducingState), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, this::createAggregatingState)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (SupplierWithException)t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = this.stateFactories.get(this.stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State type: %s is not supported by %s", this.stateDesc.getType(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        State state = (State)stateFactory.get();
        if (this.incrementalCleanup != null) {
            this.incrementalCleanup.setTtlState((AbstractTtlState)state);
        }
        return (IS)state;
    }

    private IS createValueState() throws Exception {
        ValueStateDescriptor ttlDescriptor = this.stateDesc.getSerializer() instanceof TtlSerializer ? (ValueStateDescriptor)this.stateDesc : new ValueStateDescriptor(this.stateDesc.getName(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer()));
        return (IS)new TtlValueState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <T> IS createListState() throws Exception {
        ListStateDescriptor listStateDesc = (ListStateDescriptor)this.stateDesc;
        ListStateDescriptor ttlDescriptor = listStateDesc.getElementSerializer() instanceof TtlSerializer ? (ListStateDescriptor)this.stateDesc : new ListStateDescriptor(this.stateDesc.getName(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, listStateDesc.getElementSerializer()));
        return (IS)new TtlListState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor mapStateDesc = (MapStateDescriptor)this.stateDesc;
        MapStateDescriptor ttlDescriptor = mapStateDesc.getValueSerializer() instanceof TtlSerializer ? (MapStateDescriptor)this.stateDesc : new MapStateDescriptor(this.stateDesc.getName(), mapStateDesc.getKeySerializer(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));
        return (IS)new TtlMapState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private IS createReducingState() throws Exception {
        ReducingStateDescriptor reducingStateDesc = (ReducingStateDescriptor)this.stateDesc;
        ReducingStateDescriptor ttlDescriptor = new ReducingStateDescriptor(this.stateDesc.getName(), new TtlReduceFunction(reducingStateDesc.getReduceFunction(), this.ttlConfig, this.timeProvider), (TypeSerializer)(this.stateDesc.getSerializer() instanceof TtlSerializer ? (TtlSerializer)this.stateDesc.getSerializer() : new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer())));
        return (IS)new TtlReducingState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = (AggregatingStateDescriptor)this.stateDesc;
        TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction(aggregatingStateDescriptor.getAggregateFunction(), this.ttlConfig, this.timeProvider);
        AggregatingStateDescriptor ttlDescriptor = new AggregatingStateDescriptor(this.stateDesc.getName(), ttlAggregateFunction, (TypeSerializer)(this.stateDesc.getSerializer() instanceof TtlSerializer ? (TtlSerializer)this.stateDesc.getSerializer() : new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer())));
        return (IS)new TtlAggregatingState(this.createTtlStateContext((StateDescriptor)ttlDescriptor), ttlAggregateFunction);
    }

    private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {
        ttlDescriptor.enableTimeToLive(this.stateDesc.getTtlConfig());
        Object originalState = this.stateBackend.createOrUpdateInternalState(this.namespaceSerializer, ttlDescriptor, this.getSnapshotTransformFactory());
        return new TtlStateContext(originalState, this.ttlConfig, this.timeProvider, this.stateDesc.getSerializer(), this.registerTtlIncrementalCleanupCallback((InternalKvState)originalState));
    }

    private TtlIncrementalCleanup<K, N, TTLSV> getTtlIncrementalCleanup() {
        StateTtlConfig.IncrementalCleanupStrategy config = this.ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
        return config != null ? new TtlIncrementalCleanup(config.getCleanupSize()) : null;
    }

    private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
        Runnable callback;
        boolean isCleanupActive;
        StateTtlConfig.IncrementalCleanupStrategy config = this.ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
        boolean cleanupConfigured = config != null && this.incrementalCleanup != null;
        boolean bl = isCleanupActive = cleanupConfigured && this.isStateIteratorSupported(originalState, this.incrementalCleanup.getCleanupSize());
        Runnable runnable = isCleanupActive ? this.incrementalCleanup::stateAccessed : (callback = () -> {});
        if (isCleanupActive && config.runCleanupForEveryRecord()) {
            this.stateBackend.registerKeySelectionListener(stub -> callback.run());
        }
        return callback;
    }

    private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
        boolean stateIteratorSupported = false;
        try {
            stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        return stateIteratorSupported;
    }

    private StateSnapshotTransformer.StateSnapshotTransformFactory<?> getSnapshotTransformFactory() {
        if (!this.ttlConfig.getCleanupStrategies().inFullSnapshot()) {
            return StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform();
        }
        return new TtlStateSnapshotTransformer.Factory(this.timeProvider, this.ttl);
    }

    public static final class TtlSerializerSnapshot<T>
    extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> {
        private static final int VERSION = 2;

        public TtlSerializerSnapshot() {
            super(TtlSerializer.class);
        }

        TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
            super(serializerInstance);
        }

        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> outerSerializer) {
            return new TypeSerializer[]{outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()};
        }

        protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
            TypeSerializer<?> timestampSerializer = nestedSerializers[0];
            TypeSerializer<?> valueSerializer = nestedSerializers[1];
            return new TtlSerializer(timestampSerializer, valueSerializer);
        }
    }

    public static class TtlSerializer<T>
    extends CompositeSerializer<TtlValue<T>>
    implements TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<TtlValue<T>> {
        private static final long serialVersionUID = 131020282727167064L;

        public TtlSerializer(TypeSerializer<Long> timestampSerializer, TypeSerializer<T> userValueSerializer) {
            super(true, new TypeSerializer[]{timestampSerializer, userValueSerializer});
            Preconditions.checkArgument((!(userValueSerializer instanceof TtlSerializer) ? 1 : 0) != 0);
        }

        public TtlSerializer(CompositeSerializer.PrecomputedParameters precomputed, TypeSerializer<?> ... fieldSerializers) {
            super(precomputed, fieldSerializers);
        }

        public TtlValue<T> createInstance(Object ... values) {
            Preconditions.checkArgument((values.length == 2 ? 1 : 0) != 0);
            return new TtlValue<Object>(values[1], (Long)values[0]);
        }

        protected void setField(@Nonnull TtlValue<T> v, int index, Object fieldValue) {
            throw new UnsupportedOperationException("TtlValue is immutable");
        }

        protected Object getField(@Nonnull TtlValue<T> v, int index) {
            return index == 0 ? Long.valueOf(v.getLastAccessTimestamp()) : v.getUserValue();
        }

        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(CompositeSerializer.PrecomputedParameters precomputed, TypeSerializer<?> ... originalSerializers) {
            Preconditions.checkNotNull(originalSerializers);
            Preconditions.checkArgument((originalSerializers.length == 2 ? 1 : 0) != 0);
            return new TtlSerializer<T>(precomputed, originalSerializers);
        }

        TypeSerializer<Long> getTimestampSerializer() {
            return this.fieldSerializers[0];
        }

        TypeSerializer<T> getValueSerializer() {
            return this.fieldSerializers[1];
        }

        public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
            return new TtlSerializerSnapshot(this);
        }

        public TypeSerializerSchemaCompatibility<TtlValue<T>> resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(TypeSerializerConfigSnapshot<TtlValue<T>> deprecatedConfigSnapshot) {
            if (deprecatedConfigSnapshot instanceof CompositeSerializer.ConfigSnapshot) {
                CompositeSerializer.ConfigSnapshot castedLegacyConfigSnapshot = (CompositeSerializer.ConfigSnapshot)deprecatedConfigSnapshot;
                TtlSerializerSnapshot newSnapshot = new TtlSerializerSnapshot();
                return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot((TypeSerializer)this, newSnapshot, (TypeSerializerSnapshot[])castedLegacyConfigSnapshot.getNestedSerializerSnapshots());
            }
            return TypeSerializerSchemaCompatibility.incompatible();
        }

        public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
            boolean ttlSerializer = typeSerializer instanceof TtlSerializer;
            boolean ttlListSerializer = typeSerializer instanceof ListSerializer && ((ListSerializer)typeSerializer).getElementSerializer() instanceof TtlSerializer;
            boolean ttlMapSerializer = typeSerializer instanceof MapSerializer && ((MapSerializer)typeSerializer).getValueSerializer() instanceof TtlSerializer;
            return ttlSerializer || ttlListSerializer || ttlMapSerializer;
        }
    }
}

