/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process;

import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.dsl.WithChannelInput;
import io.activej.csp.dsl.WithChannelOutputs;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.promise.Promises;
import io.activej.reactor.Reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.UnaryOperator;

public final class ChannelSplitter<T>
extends AbstractCommunicatingProcess
implements WithChannelInput<ChannelSplitter<T>, T>,
WithChannelOutputs<T> {
    private final List<ChannelConsumer<T>> outputs = new ArrayList<ChannelConsumer<T>>();
    private ChannelSupplier<T> input;
    private Function<T, T> splitFn = Function.identity();

    private ChannelSplitter() {
    }

    public static <T> ChannelSplitter<T> create() {
        return (ChannelSplitter)ChannelSplitter.builder().build();
    }

    public static <T> ChannelSplitter<T> create(ChannelSupplier<T> input) {
        return (ChannelSplitter)ChannelSplitter.create().withInput(input);
    }

    public static <T> Builder builder() {
        return new ChannelSplitter<T>().new Builder();
    }

    public static <T> ChannelSplitter<T> builder(ChannelSupplier<T> input) {
        return (ChannelSplitter)new ChannelSplitter<T>().withInput(input);
    }

    public boolean hasOutputs() {
        return !this.outputs.isEmpty();
    }

    @Override
    public ChannelInput<T> getInput() {
        return input -> {
            Checks.checkState((!this.isProcessStarted() ? 1 : 0) != 0, (Object)"Can't configure splitter while it is running");
            this.input = this.sanitize(input);
            this.tryStart();
            return this.getProcessCompletion();
        };
    }

    @Override
    public ChannelOutput<T> addOutput() {
        int index = this.outputs.size();
        this.outputs.add(null);
        return output -> {
            Reactive.checkInReactorThread((Reactive)this);
            this.outputs.set(index, this.sanitize(output));
            this.tryStart();
        };
    }

    private void tryStart() {
        if (this.input != null && this.outputs.stream().allMatch(Objects::nonNull)) {
            this.reactor.post(this::startProcess);
        }
    }

    @Override
    protected void beforeProcess() {
        Checks.checkState((this.input != null ? 1 : 0) != 0, (Object)"No splitter input");
        Checks.checkState((!this.outputs.isEmpty() ? 1 : 0) != 0, (Object)"No splitter outputs");
    }

    @Override
    protected void doProcess() {
        if (this.isProcessComplete()) {
            return;
        }
        this.input.get().subscribe((item, e) -> {
            if (e == null) {
                if (item != null) {
                    Promises.all(this.outputs.stream().map(output -> output.accept(this.splitFn.apply(item)))).subscribe(($, e2) -> {
                        if (e2 == null) {
                            this.doProcess();
                        } else {
                            this.closeEx(e2);
                        }
                    });
                    Recyclers.recycle((Object)item);
                } else {
                    Promises.all(this.outputs.stream().map(ChannelConsumer::acceptEndOfStream)).subscribe(($, e1) -> this.completeProcessEx(e1));
                }
            } else {
                this.closeEx(e);
            }
        });
    }

    @Override
    protected void doClose(Exception e) {
        this.input.closeEx(e);
        this.outputs.forEach(output -> output.closeEx(e));
    }

    public final class Builder
    extends AbstractBuilder<Builder, ChannelSplitter<T>> {
        private Builder() {
        }

        public Builder withSplitFunction(UnaryOperator<T> splitFn) {
            Builder.checkNotBuilt((AbstractBuilder)this);
            ChannelSplitter.this.splitFn = splitFn;
            return this;
        }

        protected ChannelSplitter<T> doBuild() {
            return ChannelSplitter.this;
        }
    }
}

