/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.Test;

class StreamOperatorStateHandlerTest {
    StreamOperatorStateHandlerTest() {
    }

    @Test
    void testFailingBackendSnapshotMethod() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        try (CloseableRegistry closeableRegistry = new CloseableRegistry();){
            CancelableFuture keyedStateManagedFuture = new CancelableFuture();
            CancelableFuture keyedStateRawFuture = new CancelableFuture();
            CancelableFuture operatorStateManagedFuture = new CancelableFuture();
            CancelableFuture operatorStateRawFuture = new CancelableFuture();
            CancelableFuture inputChannelStateFuture = new CancelableFuture();
            CancelableFuture resultSubpartitionStateFuture = new CancelableFuture();
            OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(keyedStateManagedFuture, keyedStateRawFuture, operatorStateManagedFuture, operatorStateRawFuture, inputChannelStateFuture, resultSubpartitionStateFuture);
            TestStateSnapshotContextSynchronousImpl context = new TestStateSnapshotContextSynchronousImpl(42L, 1L, closeableRegistry);
            context.getRawKeyedOperatorStateOutput();
            context.getRawOperatorStateOutput();
            StreamTaskStateInitializerImpl stateInitializer = new StreamTaskStateInitializerImpl((Environment)new MockEnvironmentBuilder().build(), (StateBackend)new MemoryStateBackend());
            StreamOperatorStateContext stateContext = stateInitializer.streamOperatorStateContext(new OperatorID(), "whatever", (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)new UnUsedKeyContext(), (TypeSerializer)IntSerializer.INSTANCE, closeableRegistry, (MetricGroup)new InterceptingOperatorMetricGroup(), 1.0, false);
            StreamOperatorStateHandler stateHandler = new StreamOperatorStateHandler(stateContext, new ExecutionConfig(), closeableRegistry);
            String keyedStateField = "keyedStateField";
            String operatorStateField = "operatorStateField";
            StreamOperatorStateHandler.CheckpointedStreamOperator checkpointedStreamOperator = new StreamOperatorStateHandler.CheckpointedStreamOperator(){

                public void initializeState(StateInitializationContext context) throws Exception {
                    context.getKeyedStateStore().getState(new ValueStateDescriptor("keyedStateField", (TypeSerializer)LongSerializer.INSTANCE)).update((Object)42L);
                    context.getOperatorStateStore().getListState(new ListStateDescriptor("operatorStateField", (TypeSerializer)LongSerializer.INSTANCE)).add((Object)42L);
                }

                public void snapshotState(StateSnapshotContext context) throws Exception {
                    throw new ExpectedTestException();
                }
            };
            stateHandler.setCurrentKey((Object)"44");
            stateHandler.initializeOperatorState(checkpointedStreamOperator);
            Assertions.assertThat((Collection)stateContext.operatorStateBackend().getRegisteredStateNames()).isNotEmpty();
            Assertions.assertThat((int)((AbstractKeyedStateBackend)stateContext.keyedStateBackend()).numKeyValueStatesByName()).isOne();
            Assertions.assertThatThrownBy(() -> stateHandler.snapshotState(checkpointedStreamOperator, Optional.of(stateContext.internalTimerServiceManager()), "42", 42L, 42L, CheckpointOptions.forCheckpointWithDefaultLocation(), (CheckpointStreamFactory)new MemCheckpointStreamFactory(1024), operatorSnapshotResult, context, false)).isInstanceOfSatisfying(CheckpointException.class, e -> {
                OptionalAssert cfr_ignored_0 = (OptionalAssert)Assertions.assertThat((Optional)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Expected Test Exception")).isPresent();
            });
            Assertions.assertThat(keyedStateManagedFuture).isCancelled();
            Assertions.assertThat(keyedStateRawFuture).isCancelled();
            Assertions.assertThat((Future)context.getKeyedStateStreamFuture()).isCancelled();
            Assertions.assertThat(operatorStateManagedFuture).isCancelled();
            Assertions.assertThat(operatorStateRawFuture).isCancelled();
            Assertions.assertThat((Future)context.getOperatorStateStreamFuture()).isCancelled();
            Assertions.assertThat(inputChannelStateFuture).isCancelled();
            Assertions.assertThat(resultSubpartitionStateFuture).isCancelled();
            stateHandler.dispose();
            Assertions.assertThat((Collection)stateContext.operatorStateBackend().getRegisteredBroadcastStateNames()).isEmpty();
            Assertions.assertThat((Collection)stateContext.operatorStateBackend().getRegisteredStateNames()).isEmpty();
            Assertions.assertThat((int)((AbstractKeyedStateBackend)stateContext.keyedStateBackend()).numKeyValueStatesByName()).isZero();
        }
    }

    private static class UnUsedKeyContext
    implements KeyContext {
        private UnUsedKeyContext() {
        }

        public void setCurrentKey(Object key) {
            throw new UnsupportedOperationException();
        }

        public Object getCurrentKey() {
            throw new UnsupportedOperationException();
        }
    }

    private static class CancelableFuture<T>
    extends FutureTask<T> {
        public CancelableFuture() {
            super(() -> {
                throw new UnsupportedOperationException();
            });
        }
    }

    private static class TestStateSnapshotContextSynchronousImpl
    extends StateSnapshotContextSynchronousImpl {
        public TestStateSnapshotContextSynchronousImpl(long checkpointId, long timestamp, CloseableRegistry closeableRegistry) {
            super(checkpointId, timestamp, (CheckpointStreamFactory)new MemCheckpointStreamFactory(1024), new KeyGroupRange(0, 2), closeableRegistry);
            this.keyedStateCheckpointClosingFuture = new CancelableFuture();
            this.operatorStateCheckpointClosingFuture = new CancelableFuture();
        }
    }
}

