/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public class InternalKeyedStateTestBase {
    AsyncExecutionController aec;
    TestStateExecutor testStateExecutor;
    AtomicReference<Throwable> exception;

    @BeforeEach
    void setup() {
        this.testStateExecutor = (TestStateExecutor)this.createStateExecutor();
        this.aec = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (a, b) -> this.exception.set(b), (StateExecutor)this.testStateExecutor, 1, 1, 1000L, 1);
        this.exception = new AtomicReference<Object>(null);
    }

    @AfterEach
    void after() {
        AssertionsForClassTypes.assertThat((Throwable)this.exception.get()).isNull();
    }

    private StateExecutor createStateExecutor() {
        TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend();
        AssertionsForClassTypes.assertThat((boolean)testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
    }

    <IN> void validateRequestRun(@Nullable State state, StateRequestType type, @Nullable IN payload) {
        this.aec.triggerIfNeeded(true);
        this.testStateExecutor.validate(state, type, payload);
        AssertionsForClassTypes.assertThat((boolean)this.testStateExecutor.receivedRequest.isEmpty()).isTrue();
    }

    static class TestStateExecutor
    implements StateExecutor {
        private Deque<StateRequest<?, ?, ?>> receivedRequest = new ConcurrentLinkedDeque();

        TestStateExecutor() {
        }

        <IN> void validate(@Nullable State state, StateRequestType type, @Nullable IN payload) {
            AssertionsForClassTypes.assertThat((boolean)this.receivedRequest.isEmpty()).isFalse();
            StateRequest<?, ?, ?> request = this.receivedRequest.pop();
            AssertionsForClassTypes.assertThat((Object)request.getState()).isEqualTo((Object)state);
            AssertionsForClassTypes.assertThat((Object)request.getRequestType()).isEqualTo((Object)type);
            AssertionsForClassTypes.assertThat((Object)request.getPayload()).isEqualTo(payload);
        }

        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            this.receivedRequest.addAll(((TestStateRequestContainer)stateRequestContainer).requests);
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            future.complete(null);
            return future;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new TestStateRequestContainer();
        }

        public void shutdown() {
        }

        static class TestStateRequestContainer
        implements StateRequestContainer {
            ArrayList<StateRequest<?, ?, ?>> requests = new ArrayList();

            TestStateRequestContainer() {
            }

            public void offer(StateRequest<?, ?, ?> stateRequest) {
                this.requests.add(stateRequest);
            }

            public boolean isEmpty() {
                return this.requests.isEmpty();
            }
        }
    }

    static class TestAsyncStateBackend
    implements StateBackend {
        TestAsyncStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet");
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) {
            return new AsyncKeyedStateBackend(){

                public void close() throws IOException {
                }

                public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
                }

                @Nonnull
                public <SV, S extends State> S createState(@Nonnull StateDescriptor<SV> stateDesc) throws Exception {
                    return null;
                }

                public StateExecutor createStateExecutor() {
                    return new TestStateExecutor();
                }

                public void dispose() {
                }
            };
        }
    }
}

