/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BagUserState;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.IterableSideInput;
import org.apache.beam.fn.harness.state.MultimapSideInput;
import org.apache.beam.fn.harness.state.MultimapUserState;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;

public class FnApiStateAccessor<K>
implements SideInputReader,
StateBinder {
    private final PipelineOptions pipelineOptions;
    private final Map<BeamFnApi.StateKey, Object> stateKeyObjectCache;
    private final Map<TupleTag<?>, SideInputSpec> sideInputSpecMap;
    private final BeamFnStateClient beamFnStateClient;
    private final String ptransformId;
    private final Supplier<String> processBundleInstructionId;
    private final Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens;
    private final Supplier<Cache<?, ?>> bundleCache;
    private final Cache<?, ?> processWideCache;
    private final Collection<ThrowingRunnable> stateFinalizers;
    private final Supplier<BoundedWindow> currentWindowSupplier;
    private final Supplier<ByteString> encodedCurrentKeySupplier;
    private final Supplier<ByteString> encodedCurrentWindowSupplier;

    public FnApiStateAccessor(PipelineOptions pipelineOptions, String ptransformId, Supplier<String> processBundleInstructionId, Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens, Supplier<Cache<?, ?>> bundleCache, Cache<?, ?> processWideCache, Map<TupleTag<?>, SideInputSpec> sideInputSpecMap, BeamFnStateClient beamFnStateClient, Coder<K> keyCoder, Coder<BoundedWindow> windowCoder, Supplier<K> currentKeySupplier, Supplier<BoundedWindow> currentWindowSupplier) {
        this.pipelineOptions = pipelineOptions;
        this.stateKeyObjectCache = Maps.newHashMap();
        this.sideInputSpecMap = sideInputSpecMap;
        this.beamFnStateClient = beamFnStateClient;
        this.ptransformId = ptransformId;
        this.processBundleInstructionId = processBundleInstructionId;
        this.cacheTokens = cacheTokens;
        this.bundleCache = bundleCache;
        this.processWideCache = processWideCache;
        this.stateFinalizers = new ArrayList<ThrowingRunnable>();
        this.currentWindowSupplier = currentWindowSupplier;
        this.encodedCurrentKeySupplier = FnApiStateAccessor.memoizeFunction(currentKeySupplier, key -> {
            Preconditions.checkState(keyCoder != null, "Accessing state in unkeyed context, no key coder available");
            ByteString.Output encodedKeyOut = ByteString.newOutput();
            try {
                keyCoder.encode(key, encodedKeyOut, Coder.Context.NESTED);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedKeyOut.toByteString();
        });
        this.encodedCurrentWindowSupplier = FnApiStateAccessor.memoizeFunction(currentWindowSupplier, window -> {
            ByteString.Output encodedWindowOut = ByteString.newOutput();
            try {
                windowCoder.encode((BoundedWindow)window, encodedWindowOut);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedWindowOut.toByteString();
        });
    }

    private static <ArgT, ResultT> Supplier<ResultT> memoizeFunction(final Supplier<ArgT> arg, final Function<ArgT, ResultT> f) {
        return new Supplier<ResultT>(){
            private ArgT memoizedArg;
            private ResultT memoizedResult;
            private boolean initialized;

            @Override
            public ResultT get() {
                Object currentArg = arg.get();
                if (currentArg != this.memoizedArg || !this.initialized) {
                    this.memoizedArg = currentArg;
                    this.memoizedResult = f.apply(this.memoizedArg);
                    this.initialized = true;
                }
                return this.memoizedResult;
            }
        };
    }

    @Override
    public <T> @Nullable T get(PCollectionView<T> view, BoundedWindow window) {
        TupleTag<?> tag = view.getTagInternal();
        SideInputSpec sideInputSpec = this.sideInputSpecMap.get(tag);
        Preconditions.checkArgument(sideInputSpec != null, "Attempting to access unknown side input %s.", view);
        ByteString.Output encodedWindowOut = ByteString.newOutput();
        try {
            sideInputSpec.getWindowCoder().encode(sideInputSpec.getWindowMappingFn().getSideInputWindow(window), encodedWindowOut);
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
        ByteString encodedWindow = encodedWindowOut.toByteString();
        BeamFnApi.StateKey.Builder cacheKeyBuilder = BeamFnApi.StateKey.newBuilder();
        switch (sideInputSpec.getAccessPattern()) {
            case "beam:side_input:iterable:v1": {
                cacheKeyBuilder.getIterableSideInputBuilder().setTransformId(this.ptransformId).setSideInputId(tag.getId()).setWindow(encodedWindow);
                break;
            }
            case "beam:side_input:multimap:v1": {
                Preconditions.checkState(sideInputSpec.getCoder() instanceof KvCoder, "Expected %s but received %s.", KvCoder.class, sideInputSpec.getCoder().getClass());
                cacheKeyBuilder.getMultimapKeysSideInputBuilder().setTransformId(this.ptransformId).setSideInputId(tag.getId()).setWindow(encodedWindow);
                break;
            }
            default: {
                throw new IllegalStateException(String.format("This SDK is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", ImmutableList.of("beam:side_input:iterable:v1", "beam:side_input:multimap:v1"), sideInputSpec.getAccessPattern(), tag));
            }
        }
        return (T)this.stateKeyObjectCache.computeIfAbsent(cacheKeyBuilder.build(), key -> {
            switch (sideInputSpec.getAccessPattern()) {
                case "beam:side_input:iterable:v1": {
                    return sideInputSpec.getViewFn().apply(new IterableSideInput(this.getCacheFor((BeamFnApi.StateKey)key), this.beamFnStateClient, this.processBundleInstructionId.get(), (BeamFnApi.StateKey)key, sideInputSpec.getCoder()));
                }
                case "beam:side_input:multimap:v1": {
                    return sideInputSpec.getViewFn().apply(new MultimapSideInput(this.getCacheFor((BeamFnApi.StateKey)key), this.beamFnStateClient, this.processBundleInstructionId.get(), (BeamFnApi.StateKey)key, ((KvCoder)sideInputSpec.getCoder()).getKeyCoder(), ((KvCoder)sideInputSpec.getCoder()).getValueCoder()));
                }
            }
            throw new IllegalStateException(String.format("This SDK is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", ImmutableList.of("beam:side_input:iterable:v1", "beam:side_input:multimap:v1"), sideInputSpec.getAccessPattern(), tag));
        });
    }

    @Override
    public <T> boolean contains(PCollectionView<T> view) {
        return this.sideInputSpecMap.containsKey(view.getTagInternal());
    }

    @Override
    public boolean isEmpty() {
        return this.sideInputSpecMap.isEmpty();
    }

    @Override
    public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, final Coder<T> coder) {
        return (ValueState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(final BeamFnApi.StateKey key) {
                return new ValueState<T>(){
                    private final BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, coder);
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }

                    @Override
                    public void write(T input) {
                        this.impl.clear();
                        this.impl.append(input);
                    }

                    @Override
                    public T read() {
                        Iterator value = this.impl.get().iterator();
                        if (value.hasNext()) {
                            return value.next();
                        }
                        return null;
                    }

                    @Override
                    public ValueState<T> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }
                };
            }
        });
    }

    @Override
    public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, final Coder<T> elemCoder) {
        return (BagState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(final BeamFnApi.StateKey key) {
                return new BagState<T>(){
                    private final BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, elemCoder);
                    }

                    @Override
                    public void add(T value) {
                        this.impl.append(value);
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            @Override
                            public @Nullable Boolean read() {
                                return !impl.get().iterator().hasNext();
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                return this;
                            }
                        };
                    }

                    @Override
                    public Iterable<T> read() {
                        return this.impl.get();
                    }

                    @Override
                    public BagState<T> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    @Override
    public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, final Coder<T> elemCoder) {
        return (SetState)this.stateKeyObjectCache.computeIfAbsent(this.createMultimapKeysUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(final BeamFnApi.StateKey key) {
                return new SetState<T>(){
                    private final MultimapUserState<T, Void> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createMultimapUserState(key, elemCoder, VoidCoder.of());
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }

                    @Override
                    public ReadableState<Boolean> contains(final T t) {
                        return new ReadableState<Boolean>(){

                            @Override
                            public Boolean read() {
                                return !Iterables.isEmpty(impl.get(t));
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                impl.get(t).prefetch();
                                return this;
                            }
                        };
                    }

                    @Override
                    public ReadableState<Boolean> addIfAbsent(T t) {
                        boolean isEmpty = Iterables.isEmpty(this.impl.get(t));
                        if (isEmpty) {
                            this.impl.put(t, null);
                        }
                        return ReadableStates.immediate(isEmpty);
                    }

                    @Override
                    public void remove(T t) {
                        this.impl.remove(t);
                    }

                    @Override
                    public void add(T value) {
                        this.impl.remove(value);
                        this.impl.put(value, null);
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            @Override
                            public Boolean read() {
                                return Iterables.isEmpty(impl.keys());
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                impl.keys().prefetch();
                                return this;
                            }
                        };
                    }

                    @Override
                    public Iterable<T> read() {
                        return this.impl.keys();
                    }

                    @Override
                    public SetState<T> readLater() {
                        this.impl.keys().prefetch();
                        return this;
                    }
                };
            }
        });
    }

    @Override
    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(String id, StateSpec<MapState<KeyT, ValueT>> spec, final Coder<KeyT> mapKeyCoder, final Coder<ValueT> mapValueCoder) {
        return (MapState)this.stateKeyObjectCache.computeIfAbsent(this.createMultimapKeysUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(final BeamFnApi.StateKey key) {
                return new MapState<KeyT, ValueT>(){
                    private final MultimapUserState<KeyT, ValueT> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createMultimapUserState(key, mapKeyCoder, mapValueCoder);
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }

                    @Override
                    public void put(KeyT key2, ValueT value) {
                        this.impl.remove(key2);
                        this.impl.put(key2, value);
                    }

                    @Override
                    public ReadableState<ValueT> computeIfAbsent(KeyT key2, Function<? super KeyT, ? extends ValueT> mappingFunction) {
                        PrefetchableIterable values = this.impl.get(key2);
                        if (Iterables.isEmpty(values)) {
                            this.impl.put(key2, mappingFunction.apply(key2));
                        }
                        return ReadableStates.immediate(Iterables.getOnlyElement(values, null));
                    }

                    @Override
                    public void remove(KeyT key2) {
                        this.impl.remove(key2);
                    }

                    @Override
                    public ReadableState<ValueT> get(KeyT key2) {
                        return this.getOrDefault((KeyT)key2, (ValueT)null);
                    }

                    @Override
                    public ReadableState<ValueT> getOrDefault(final KeyT key2, final @Nullable ValueT defaultValue) {
                        return new ReadableState<ValueT>(){

                            @Override
                            public @Nullable ValueT read() {
                                PrefetchableIterable values = impl.get(key2);
                                return Iterables.getOnlyElement(values, defaultValue);
                            }

                            @Override
                            public ReadableState<ValueT> readLater() {
                                impl.get(key2).prefetch();
                                return this;
                            }
                        };
                    }

                    @Override
                    public ReadableState<Iterable<KeyT>> keys() {
                        return new ReadableState<Iterable<KeyT>>(){

                            @Override
                            public Iterable<KeyT> read() {
                                return impl.keys();
                            }

                            @Override
                            public ReadableState<Iterable<KeyT>> readLater() {
                                impl.keys().prefetch();
                                return this;
                            }
                        };
                    }

                    @Override
                    public ReadableState<Iterable<ValueT>> values() {
                        return new ReadableState<Iterable<ValueT>>(){

                            @Override
                            public Iterable<ValueT> read() {
                                return Iterables.transform(this.entries().read(), e -> e.getValue());
                            }

                            @Override
                            public ReadableState<Iterable<ValueT>> readLater() {
                                this.entries().readLater();
                                return this;
                            }
                        };
                    }

                    @Override
                    public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
                        return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                            @Override
                            public Iterable<Map.Entry<KeyT, ValueT>> read() {
                                Iterable keys = this.keys().read();
                                return Iterables.transform(keys, key -> Maps.immutableEntry(key, this.get(key).read()));
                            }

                            @Override
                            public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
                                this.keys().readLater();
                                return this;
                            }
                        };
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            @Override
                            public Boolean read() {
                                return Iterables.isEmpty(this.keys().read());
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                this.keys().readLater();
                                return this;
                            }
                        };
                    }
                };
            }
        });
    }

    @Override
    public <T> OrderedListState<T> bindOrderedList(String id, StateSpec<OrderedListState<T>> spec, Coder<T> elemCoder) {
        throw new UnsupportedOperationException("TODO: Add support for a sorted-list state to the Fn API.");
    }

    public <ElementT, AccumT, ResultT> CombiningState<ElementT, AccumT, ResultT> bindCombining(String id, StateSpec<CombiningState<ElementT, AccumT, ResultT>> spec, final Coder<AccumT> accumCoder, final Combine.CombineFn<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(final BeamFnApi.StateKey key) {
                return new CombiningState<ElementT, AccumT, ResultT>(){
                    private final BagUserState<AccumT> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, accumCoder);
                    }

                    @Override
                    public AccumT getAccum() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return iterator.next();
                        }
                        return combineFn.createAccumulator();
                    }

                    @Override
                    public void addAccum(AccumT accum) {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            accum = combineFn.mergeAccumulators(ImmutableList.of(iterator.next(), accum));
                            this.impl.clear();
                        }
                        this.impl.append(accum);
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
                        return combineFn.mergeAccumulators(accumulators);
                    }

                    @Override
                    public CombiningState<ElementT, AccumT, ResultT> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }

                    @Override
                    public ResultT read() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return combineFn.extractOutput(iterator.next());
                        }
                        return combineFn.defaultValue();
                    }

                    @Override
                    public void add(ElementT value) {
                        Object newAccumulator = combineFn.addInput(this.getAccum(), value);
                        this.impl.clear();
                        this.impl.append(newAccumulator);
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            @Override
                            public @Nullable Boolean read() {
                                return !impl.get().iterator().hasNext();
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                impl.get().prefetch();
                                return this;
                            }
                        };
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    public <ElementT, AccumT, ResultT> CombiningState<ElementT, AccumT, ResultT> bindCombiningWithContext(String id, StateSpec<CombiningState<ElementT, AccumT, ResultT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), key -> this.bindCombining(id, spec, (Coder<AccumT>)accumCoder, CombineFnUtil.bindContext(combineFn, new StateContext<BoundedWindow>(){

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiStateAccessor.this.pipelineOptions;
            }

            @Override
            public <T> T sideInput(PCollectionView<T> view) {
                return FnApiStateAccessor.this.get(view, (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get());
            }

            @Override
            public BoundedWindow window() {
                return (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get();
            }
        })));
    }

    @Override
    @Deprecated
    public WatermarkHoldState bindWatermark(String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
        throw new UnsupportedOperationException("WatermarkHoldState is unsupported by the Fn API.");
    }

    private Cache<?, ?> getCacheFor(BeamFnApi.StateKey stateKey) {
        switch (stateKey.getTypeCase()) {
            case BAG_USER_STATE: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasUserState()) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case MULTIMAP_KEYS_USER_STATE: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasUserState()) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case ITERABLE_SIDE_INPUT: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasSideInput() || !stateKey.getIterableSideInput().getTransformId().equals(token.getSideInput().getTransformId()) || !stateKey.getIterableSideInput().getSideInputId().equals(token.getSideInput().getSideInputId())) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case MULTIMAP_KEYS_SIDE_INPUT: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasSideInput() || !stateKey.getMultimapKeysSideInput().getTransformId().equals(token.getSideInput().getTransformId()) || !stateKey.getMultimapKeysSideInput().getSideInputId().equals(token.getSideInput().getSideInputId())) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            default: {
                throw new IllegalStateException(String.format("Unknown state key type requested %s.", stateKey));
            }
        }
        return Caches.subCache(this.bundleCache.get(), stateKey, new Object[0]);
    }

    private <T> BagUserState<T> createBagUserState(BeamFnApi.StateKey stateKey, Coder<T> valueCoder) {
        BagUserState<T> rval = new BagUserState<T>(this.getCacheFor(stateKey), this.beamFnStateClient, this.processBundleInstructionId.get(), stateKey, valueCoder);
        this.stateFinalizers.add(rval::asyncClose);
        return rval;
    }

    private BeamFnApi.StateKey createBagUserStateKey(String stateId) {
        BeamFnApi.StateKey.Builder builder = BeamFnApi.StateKey.newBuilder();
        builder.getBagUserStateBuilder().setWindow(this.encodedCurrentWindowSupplier.get()).setKey(this.encodedCurrentKeySupplier.get()).setTransformId(this.ptransformId).setUserStateId(stateId);
        return builder.build();
    }

    private <KeyT, ValueT> MultimapUserState<KeyT, ValueT> createMultimapUserState(BeamFnApi.StateKey stateKey, Coder<KeyT> keyCoder, Coder<ValueT> valueCoder) {
        MultimapUserState<KeyT, ValueT> rval = new MultimapUserState<KeyT, ValueT>(this.getCacheFor(stateKey), this.beamFnStateClient, this.processBundleInstructionId.get(), stateKey, keyCoder, valueCoder);
        this.stateFinalizers.add(rval::asyncClose);
        return rval;
    }

    private BeamFnApi.StateKey createMultimapKeysUserStateKey(String stateId) {
        BeamFnApi.StateKey.Builder builder = BeamFnApi.StateKey.newBuilder();
        builder.getMultimapKeysUserStateBuilder().setWindow(this.encodedCurrentWindowSupplier.get()).setTransformId(this.ptransformId).setUserStateId(stateId);
        return builder.build();
    }

    public void finalizeState() {
        try {
            for (ThrowingRunnable runnable : this.stateFinalizers) {
                runnable.run();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        this.stateFinalizers.clear();
        this.stateKeyObjectCache.clear();
    }
}

