/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class OperatorSnapshotFinalizerTest
extends TestLogger {
    @Test
    public void testRunAndExtract() throws Exception {
        Random random = new Random(66L);
        KeyedStateHandle keyedTemplate = StateHandleDummyUtil.createNewKeyedStateHandle((KeyGroupRange)new KeyGroupRange(0, 0));
        OperatorStateHandle operatorTemplate = StateHandleDummyUtil.createNewOperatorStateHandle((int)2, (Random)random);
        InputChannelStateHandle inputChannelTemplate = StateHandleDummyUtil.createNewInputChannelStateHandle((int)2, (Random)random);
        ResultSubpartitionStateHandle resultSubpartitionTemplate = StateHandleDummyUtil.createNewResultSubpartitionStateHandle((int)2, (Random)random);
        SnapshotResult manKeyed = SnapshotResult.withLocalState((StateObject)StateHandleDummyUtil.deepDummyCopy((KeyedStateHandle)keyedTemplate), (StateObject)StateHandleDummyUtil.deepDummyCopy((KeyedStateHandle)keyedTemplate));
        SnapshotResult rawKeyed = SnapshotResult.withLocalState((StateObject)StateHandleDummyUtil.deepDummyCopy((KeyedStateHandle)keyedTemplate), (StateObject)StateHandleDummyUtil.deepDummyCopy((KeyedStateHandle)keyedTemplate));
        SnapshotResult manOper = SnapshotResult.withLocalState((StateObject)StateHandleDummyUtil.deepDummyCopy((OperatorStateHandle)operatorTemplate), (StateObject)StateHandleDummyUtil.deepDummyCopy((OperatorStateHandle)operatorTemplate));
        SnapshotResult rawOper = SnapshotResult.withLocalState((StateObject)StateHandleDummyUtil.deepDummyCopy((OperatorStateHandle)operatorTemplate), (StateObject)StateHandleDummyUtil.deepDummyCopy((OperatorStateHandle)operatorTemplate));
        SnapshotResult inputChannel = SnapshotResult.withLocalState((StateObject)StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.deepDummyCopy((InputChannelStateHandle)inputChannelTemplate)), (StateObject)StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.deepDummyCopy((InputChannelStateHandle)inputChannelTemplate)));
        SnapshotResult resultSubpartition = SnapshotResult.withLocalState((StateObject)StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.deepDummyCopy((ResultSubpartitionStateHandle)resultSubpartitionTemplate)), (StateObject)StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.deepDummyCopy((ResultSubpartitionStateHandle)resultSubpartitionTemplate)));
        OperatorSnapshotFutures snapshotFutures = new OperatorSnapshotFutures(new PseudoNotDoneFuture<SnapshotResult>(manKeyed), new PseudoNotDoneFuture<SnapshotResult>(rawKeyed), new PseudoNotDoneFuture<SnapshotResult>(manOper), new PseudoNotDoneFuture<SnapshotResult>(rawOper), new PseudoNotDoneFuture<SnapshotResult>(inputChannel), new PseudoNotDoneFuture<SnapshotResult>(resultSubpartition));
        for (Future f : snapshotFutures.getAllFutures()) {
            Assert.assertFalse((boolean)f.isDone());
        }
        OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures);
        for (Future f : snapshotFutures.getAllFutures()) {
            Assert.assertTrue((boolean)f.isDone());
        }
        HashMap<SnapshotResult, Function<OperatorSubtaskState, Object>> map = new HashMap<SnapshotResult, Function<OperatorSubtaskState, Object>>();
        map.put(manKeyed, OperatorSnapshotFinalizerTest.headExtractor(OperatorSubtaskState::getManagedKeyedState));
        map.put(rawKeyed, OperatorSnapshotFinalizerTest.headExtractor(OperatorSubtaskState::getRawKeyedState));
        map.put(manOper, OperatorSnapshotFinalizerTest.headExtractor(OperatorSubtaskState::getManagedOperatorState));
        map.put(rawOper, OperatorSnapshotFinalizerTest.headExtractor(OperatorSubtaskState::getRawOperatorState));
        map.put(inputChannel, OperatorSubtaskState::getInputChannelState);
        map.put(resultSubpartition, OperatorSubtaskState::getResultSubpartitionState);
        for (Map.Entry e : map.entrySet()) {
            Assert.assertEquals((Object)((SnapshotResult)e.getKey()).getJobManagerOwnedSnapshot(), ((Function)e.getValue()).apply(finalizer.getJobManagerOwnedState()));
        }
        for (Map.Entry e : map.entrySet()) {
            Assert.assertEquals((Object)((SnapshotResult)e.getKey()).getTaskLocalSnapshot(), ((Function)e.getValue()).apply(finalizer.getTaskLocalState()));
        }
    }

    private static <T extends StateObject> Function<OperatorSubtaskState, T> headExtractor(Function<OperatorSubtaskState, StateObjectCollection<T>> collectionExtractor) {
        return collectionExtractor.andThen(col -> col == null || col.isEmpty() ? null : (StateObject)col.iterator().next());
    }

    private void checkResult(Object expected, StateObjectCollection<?> actual) {
        if (expected == null) {
            Assert.assertTrue((actual == null || actual.isEmpty() ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((long)1L, (long)actual.size());
            Assert.assertEquals((Object)expected, actual.iterator().next());
        }
    }

    static class PseudoNotDoneFuture<T>
    extends DoneFuture<T> {
        private boolean done = false;

        PseudoNotDoneFuture(T payload) {
            super(payload);
        }

        public void run() {
            super.run();
            this.done = true;
        }

        public boolean isDone() {
            return this.done;
        }

        public T get() {
            try {
                Object object = super.get();
                return (T)object;
            }
            finally {
                this.done = true;
            }
        }
    }
}

