/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.di.beam;

import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.runtime.beam.coder.NoCheckpointCoder;
import org.talend.sdk.component.runtime.beam.coder.registry.SchemaRegistryCoder;
import org.talend.sdk.component.runtime.di.beam.LoopState;

public final class InMemoryQueueIO {
    public static PTransform<PBegin, PCollection<Record>> from(LoopState state) {
        return Read.from((UnboundedSource)new UnboundedQueuedInput(state.id));
    }

    public static PTransform<PCollection<Record>, PCollection<Void>> to(LoopState state) {
        return new QueuedOutputTransform(state.id);
    }

    private InMemoryQueueIO() {
    }

    public static class UnboundedQueuedInput
    extends UnboundedSource<Record, UnboundedSource.CheckpointMark> {
        private SchemaRegistryCoder coder;
        private String stateId;

        protected UnboundedQueuedInput(String stateId) {
            this.stateId = stateId;
            this.coder = SchemaRegistryCoder.of();
        }

        public List<? extends UnboundedSource<Record, UnboundedSource.CheckpointMark>> split(int desiredNumSplits, PipelineOptions options) {
            return Collections.singletonList(this);
        }

        public UnboundedSource.UnboundedReader<Record> createReader(PipelineOptions options, UnboundedSource.CheckpointMark checkpointMark) {
            return new UnboundedQueuedReader(this);
        }

        public Coder<Record> getOutputCoder() {
            return this.coder;
        }

        public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return new NoCheckpointCoder();
        }

        private static class UnboundedQueuedReader
        extends UnboundedSource.UnboundedReader<Record> {
            private final UnboundedQueuedInput source;
            private final LoopState state;
            private volatile Supplier<Instant> waterMarkProvider;
            private Record current;

            private UnboundedQueuedReader(UnboundedQueuedInput source) {
                this.source = source;
                this.state = LoopState.lookup(source.stateId);
                if (this.state != null) {
                    this.state.referenceCounting.incrementAndGet();
                } else {
                    this.waterMarkProvider = () -> BoundedWindow.TIMESTAMP_MAX_VALUE;
                }
            }

            public boolean start() {
                return this.advance();
            }

            public boolean advance() {
                if (this.state == null) {
                    return false;
                }
                this.current = this.state.next();
                if (this.current != null) {
                    return true;
                }
                this.waterMarkProvider = () -> BoundedWindow.TIMESTAMP_MAX_VALUE;
                return false;
            }

            public Record getCurrent() throws NoSuchElementException {
                return this.current;
            }

            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return Instant.now();
            }

            public void close() {
            }

            public Instant getWatermark() {
                if (this.waterMarkProvider == null) {
                    this.waterMarkProvider = Instant::now;
                }
                return this.waterMarkProvider.get();
            }

            public UnboundedSource.CheckpointMark getCheckpointMark() {
                return UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
            }

            public UnboundedSource<Record, ?> getCurrentSource() {
                return this.source;
            }
        }
    }

    public static class QueuedOutputTransform
    extends PTransform<PCollection<Record>, PCollection<Void>> {
        private String stateId;

        protected QueuedOutputTransform(String stateId) {
            this.stateId = stateId;
        }

        public PCollection<Void> expand(PCollection<Record> input) {
            return (PCollection)input.apply((PTransform)ParDo.of((DoFn)new QueuedOutput(this.stateId)));
        }
    }

    public static class QueuedOutput
    extends DoFn<Record, Void> {
        private String stateId;
        private transient LoopState state;

        protected QueuedOutput(String stateId) {
            this.stateId = stateId;
        }

        @DoFn.Setup
        public void onInit() {
            this.getState().referenceCounting.incrementAndGet();
        }

        @DoFn.ProcessElement
        public void onElement(DoFn.ProcessContext context) {
            LoopState state = this.getState();
            state.push(context.element());
            if (state.getRecordCount().decrementAndGet() == 0L && state.isDone()) {
                state.end();
            }
        }

        @DoFn.Teardown
        public void onTeardown() {
            Optional.ofNullable(this.getState()).filter(s -> s.referenceCounting.decrementAndGet() == 0).ifPresent(LoopState::close);
        }

        private LoopState getState() {
            return this.state == null ? (this.state = LoopState.lookup(this.stateId)) : this.state;
        }
    }
}

