/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.CoderTranslator;
import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;

public class StateBackedIterable<T>
implements Iterable<T>,
Serializable {
    @VisibleForTesting
    final BeamFnApi.StateRequest request;
    @VisibleForTesting
    final List<T> prefix;
    private final transient PrefetchableIterable<T> suffix;

    public StateBackedIterable(Cache<?, ?> cache, BeamFnStateClient beamFnStateClient, String instructionId, BeamFnApi.StateKey stateKey, org.apache.beam.sdk.coders.Coder<T> elemCoder, List<T> prefix) {
        this.request = BeamFnApi.StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
        this.prefix = prefix;
        this.suffix = StateFetchingIterators.readAllAndDecodeStartingFrom(cache, beamFnStateClient, this.request, elemCoder);
    }

    @Override
    public Iterator<T> iterator() {
        return PrefetchableIterators.concat(this.prefix.iterator(), this.suffix.iterator());
    }

    protected Object writeReplace() throws ObjectStreamException {
        return ImmutableList.copyOf(this);
    }

    private static class Translator
    implements CoderTranslator<Coder<?>> {
        private Translator() {
        }

        @Override
        public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents(Coder<?> from) {
            return Collections.singletonList(from.getElemCoder());
        }

        @Override
        public Coder<?> fromComponents(List<org.apache.beam.sdk.coders.Coder<?>> components, byte[] payload, CoderTranslation.TranslationContext context) {
            if (context instanceof StateBackedIterableTranslationContext) {
                return new Coder(((StateBackedIterableTranslationContext)context).getCache(), ((StateBackedIterableTranslationContext)context).getStateClient(), ((StateBackedIterableTranslationContext)context).getCurrentInstructionId(), Iterables.getOnlyElement(components));
            }
            throw new IllegalStateException(String.format("Unable to construct coder %s. Expected translation context %s but received %s.", "beam:coder:state_backed_iterable:v1", StateBackedIterableTranslationContext.class.getName(), context.getClass().getName()));
        }
    }

    public static class Registrar
    implements CoderTranslatorRegistrar {
        @Override
        public Map<Class<? extends org.apache.beam.sdk.coders.Coder>, String> getCoderURNs() {
            return Collections.singletonMap(Coder.class, "beam:coder:state_backed_iterable:v1");
        }

        @Override
        public Map<Class<? extends org.apache.beam.sdk.coders.Coder>, CoderTranslator<? extends org.apache.beam.sdk.coders.Coder>> getCoderTranslators() {
            return ImmutableMap.of(Coder.class, new Translator());
        }
    }

    public static interface StateBackedIterableTranslationContext
    extends CoderTranslation.TranslationContext {
        public Supplier<Cache<?, ?>> getCache();

        public BeamFnStateClient getStateClient();

        public Supplier<String> getCurrentInstructionId();
    }

    public static class Coder<T>
    extends IterableLikeCoder<T, Iterable<T>> {
        private final Supplier<Cache<?, ?>> cache;
        private final BeamFnStateClient beamFnStateClient;
        private final Supplier<String> instructionId;

        public Coder(Supplier<Cache<?, ?>> cache, BeamFnStateClient beamFnStateClient, Supplier<String> instructionId, org.apache.beam.sdk.coders.Coder<T> elemCoder) {
            super(elemCoder, "StateBackedIterable");
            this.cache = cache;
            this.beamFnStateClient = beamFnStateClient;
            this.instructionId = instructionId;
        }

        @Override
        protected Iterable<T> decodeToIterable(List<T> decodedElements) {
            return decodedElements;
        }

        @Override
        protected Iterable<T> decodeToIterable(List<T> decodedElements, long terminatorValue, InputStream in) throws IOException {
            if (terminatorValue == -1L) {
                long tokenLength = VarInt.decodeLong(in);
                ByteString token = ByteString.readFrom(ByteStreams.limit(in, tokenLength));
                return new StateBackedIterable(this.cache.get(), this.beamFnStateClient, this.instructionId.get(), BeamFnApi.StateKey.newBuilder().setRunner(BeamFnApi.StateKey.Runner.newBuilder().setKey(token)).build(), this.getElemCoder(), decodedElements);
            }
            throw new IllegalStateException(String.format("StateBackedIterable expected terminator of 0 or -1 but received %s.", terminatorValue));
        }

        @Override
        public void encode(Iterable<T> iterable, OutputStream outStream) throws IOException {
            if (!(iterable instanceof StateBackedIterable)) {
                super.encode(iterable, outStream);
                return;
            }
            StateBackedIterable stateBackedIterable = (StateBackedIterable)iterable;
            DataOutputStream dataOutStream = new DataOutputStream(outStream);
            dataOutStream.writeInt(-1);
            BufferedElementCountingOutputStream countingOutputStream = new BufferedElementCountingOutputStream(dataOutStream, -1L);
            for (Object elem : stateBackedIterable.prefix) {
                countingOutputStream.markElementStart();
                this.getElemCoder().encode(elem, countingOutputStream);
            }
            countingOutputStream.finish();
            dataOutStream.flush();
            VarInt.encode(stateBackedIterable.request.getStateKey().getRunner().getKey().size(), outStream);
            stateBackedIterable.request.getStateKey().getRunner().getKey().writeTo(outStream);
        }
    }
}

