/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.binary;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.Checks;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.common.function.FunctionEx;
import io.activej.csp.binary.BinaryChannelInput;
import io.activej.csp.binary.decoder.ByteBufsDecoder;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.promise.Promise;
import io.activej.reactor.Reactive;
import java.util.Iterator;
import java.util.List;

public abstract class BinaryChannelSupplier
extends AbstractAsyncCloseable {
    private static final boolean CHECKS = Checks.isEnabled(BinaryChannelSupplier.class);
    protected final ByteBufs bufs;

    protected BinaryChannelSupplier(ByteBufs bufs) {
        this.bufs = bufs;
    }

    protected BinaryChannelSupplier() {
        this.bufs = new ByteBufs();
    }

    public ByteBufs getBufs() {
        return this.bufs;
    }

    public abstract Promise<Void> needMoreData();

    public abstract Promise<Void> endOfStream();

    public static BinaryChannelSupplier ofList(List<ByteBuf> iterable) {
        return BinaryChannelSupplier.of(ChannelSuppliers.ofList(iterable));
    }

    public static BinaryChannelSupplier ofIterator(Iterator<ByteBuf> iterator) {
        return BinaryChannelSupplier.of(ChannelSuppliers.ofIterator(iterator));
    }

    public static BinaryChannelSupplier of(final ChannelSupplier<ByteBuf> input) {
        return new BinaryChannelSupplier(){

            @Override
            public Promise<Void> needMoreData() {
                return input.get().map(buf -> {
                    if (buf != null) {
                        this.bufs.add(buf);
                        return null;
                    }
                    throw new TruncatedDataException("Unexpected end-of-stream");
                });
            }

            @Override
            public Promise<Void> endOfStream() {
                if (!this.bufs.isEmpty()) {
                    this.bufs.recycle();
                    UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                    input.closeEx((Exception)exception);
                    return Promise.ofException((Exception)exception);
                }
                return input.get().whenResult(buf -> {
                    if (buf == null) {
                        return;
                    }
                    buf.recycle();
                    UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                    input.closeEx((Exception)exception);
                    throw exception;
                }).toVoid();
            }

            protected void onClosed(Exception e) {
                input.closeEx(e);
            }
        };
    }

    public static BinaryChannelSupplier ofProvidedBufs(ByteBufs bufs, final AsyncRunnable get, final AsyncRunnable complete, final AsyncCloseable closeable) {
        return new BinaryChannelSupplier(bufs){

            @Override
            public Promise<Void> needMoreData() {
                return get.run();
            }

            @Override
            public Promise<Void> endOfStream() {
                return complete.run();
            }

            protected void onClosed(Exception e) {
                closeable.closeEx(e);
            }
        };
    }

    public final <T> Promise<T> decode(ByteBufsDecoder<T> decoder) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.doDecode(decoder, (AsyncCloseable)this);
    }

    private <T> Promise<T> doDecode(ByteBufsDecoder<T> decoder, AsyncCloseable closeable) {
        Promise<Void> moreDataPromise;
        do {
            T result;
            if (this.bufs.isEmpty()) continue;
            try {
                result = decoder.tryDecode(this.bufs);
            }
            catch (MalformedDataException e) {
                this.closeEx((Exception)((Object)e));
                return Promise.ofException((Exception)((Object)e));
            }
            if (result == null) continue;
            return Promise.of(result);
        } while ((moreDataPromise = this.needMoreData()).isResult());
        return moreDataPromise.whenException(arg_0 -> ((AsyncCloseable)closeable).closeEx(arg_0)).then(() -> this.doDecode(decoder, closeable));
    }

    public final <T> Promise<T> decodeRemaining(ByteBufsDecoder<T> decoder) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.decode(decoder).then(result -> {
            if (!this.bufs.isEmpty()) {
                UnexpectedDataException exception = new UnexpectedDataException("Unexpected data after end-of-stream");
                this.closeEx((Exception)exception);
                throw exception;
            }
            return this.endOfStream().map($ -> result);
        });
    }

    public final <T> ChannelSupplier<T> decodeStream(ByteBufsDecoder<T> decoder) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return ChannelSuppliers.ofAsyncSupplier(() -> this.doDecode(decoder, AsyncCloseable.of((T e) -> {
            if (e instanceof TruncatedDataException && this.bufs.isEmpty()) {
                return;
            }
            this.closeEx((Exception)e);
        })).map(FunctionEx.identity(), e -> {
            if (e instanceof TruncatedDataException && this.bufs.isEmpty()) {
                return null;
            }
            throw e;
        }), (AsyncCloseable)this);
    }

    public Promise<Void> bindTo(BinaryChannelInput input) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return input.set(this);
    }

    protected void onCleanup() {
        this.bufs.recycle();
    }
}

