/*
 * Decompiled with CFR 0.152.
 */
package io.activej.http;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.recycle.Recyclable;
import io.activej.common.ref.Ref;
import io.activej.csp.consumer.AbstractChannelConsumer;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.supplier.AbstractChannelSupplier;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.HttpUtils;
import io.activej.http.IWebSocket;
import io.activej.http.WebSocketConstants;
import io.activej.http.WebSocketException;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettableCallback;
import io.activej.promise.SettablePromise;
import io.activej.reactor.Reactive;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;

public final class WebSocket
extends AbstractAsyncCloseable
implements IWebSocket {
    private static final boolean CHECKS = Checks.isEnabled(WebSocket.class);
    private final HttpRequest request;
    private final HttpResponse response;
    private final Consumer<WebSocketException> onProtocolError;
    private final ChannelSupplier<IWebSocket.Frame> frameInput;
    private final ChannelConsumer<IWebSocket.Frame> frameOutput;
    private final int maxMessageSize;
    @Nullable
    private SettablePromise<?> readPromise;
    @Nullable
    private SettablePromise<Void> writePromise;

    WebSocket(HttpRequest request, HttpResponse response, ChannelSupplier<IWebSocket.Frame> frameInput, ChannelConsumer<IWebSocket.Frame> frameOutput, Consumer<WebSocketException> onProtocolError, int maxMessageSize) {
        this.request = request;
        this.response = response;
        this.frameInput = ChannelSuppliers.prefetch(this.sanitize(frameInput));
        this.frameOutput = this.sanitize(frameOutput);
        this.onProtocolError = onProtocolError;
        this.maxMessageSize = maxMessageSize;
    }

    @Override
    public Promise<IWebSocket.Message> readMessage() {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.doRead(() -> {
            ByteBufs messageBufs = new ByteBufs();
            Ref typeRef = new Ref();
            return Promises.repeat(() -> this.frameInput.get().thenCallback((frame, cb) -> {
                if (frame == null) {
                    if (typeRef.get() == null) {
                        cb.set((Object)false);
                        return;
                    }
                    cb.setException((Exception)WebSocketConstants.REGULAR_CLOSE);
                    return;
                }
                if (typeRef.get() == null) {
                    typeRef.set((Object)HttpUtils.frameToMessageType(frame.getType()));
                }
                ByteBuf payload = frame.getPayload();
                if (messageBufs.remainingBytes() + payload.readRemaining() > this.maxMessageSize) {
                    this.protocolError(WebSocketConstants.MESSAGE_TOO_BIG, cb);
                    return;
                }
                messageBufs.add(payload);
                cb.set((Object)(!frame.isLastFrame() ? 1 : 0));
            })).whenException(e -> messageBufs.recycle()).thenCallback(($, cb) -> {
                ByteBuf payload = messageBufs.takeRemaining();
                IWebSocket.Message.MessageType type = (IWebSocket.Message.MessageType)((Object)((Object)((Object)typeRef.get())));
                if (type == IWebSocket.Message.MessageType.TEXT) {
                    try {
                        cb.set((Object)IWebSocket.Message.text(HttpUtils.getUTF8(payload)));
                    }
                    catch (CharacterCodingException e) {
                        this.protocolError(WebSocketConstants.NOT_A_VALID_UTF_8, cb);
                    }
                    finally {
                        payload.recycle();
                    }
                } else if (type == IWebSocket.Message.MessageType.BINARY) {
                    cb.set((Object)IWebSocket.Message.binary(payload));
                } else {
                    cb.set(null);
                }
            });
        });
    }

    @Override
    public Promise<IWebSocket.Frame> readFrame() {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.doRead(() -> this.frameInput.get());
    }

    @Override
    public Promise<Void> writeMessage(@Nullable IWebSocket.Message msg) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.doWrite(() -> {
            if (msg == null) {
                return this.frameOutput.accept(null);
            }
            if (msg.getType() == IWebSocket.Message.MessageType.TEXT) {
                return this.frameOutput.accept((Object)IWebSocket.Frame.text(ByteBuf.wrapForReading((byte[])msg.getText().getBytes(StandardCharsets.UTF_8))));
            }
            return this.frameOutput.accept((Object)IWebSocket.Frame.binary(msg.getBuf()));
        }, msg);
    }

    @Override
    public Promise<Void> writeFrame(@Nullable IWebSocket.Frame frame) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return this.doWrite(() -> this.frameOutput.accept((Object)frame), frame);
    }

    @Override
    public HttpRequest getRequest() {
        return this.request;
    }

    @Override
    public HttpResponse getResponse() {
        return this.response;
    }

    protected void onClosed(Exception e) {
        this.frameOutput.closeEx(e);
        this.frameInput.closeEx(e);
        this.readPromise = (SettablePromise)Utils.nullify(this.readPromise, SettablePromise::setException, (Object)e);
        this.writePromise = (SettablePromise)Utils.nullify(this.writePromise, SettablePromise::setException, (Object)e);
    }

    protected void onCleanup() {
        this.request.recycle();
        this.response.recycle();
    }

    private void protocolError(WebSocketException exception, SettableCallback<?> cb) {
        this.onProtocolError.accept(exception);
        this.closeEx(exception);
        cb.setException((Exception)exception);
    }

    private <T> Promise<T> doRead(AsyncSupplier<T> supplier) {
        SettablePromise readPromise;
        Checks.checkState((this.readPromise == null ? 1 : 0) != 0, (Object)"Concurrent reads");
        if (this.isClosed()) {
            return Promise.ofException((Exception)this.getException());
        }
        this.readPromise = readPromise = new SettablePromise();
        supplier.get().subscribe((result, e) -> {
            this.readPromise = null;
            readPromise.trySet(result, e);
        });
        return readPromise;
    }

    private Promise<Void> doWrite(AsyncRunnable runnable, @Nullable Recyclable recyclable) {
        SettablePromise writePromise;
        assert (this.reactor.inReactorThread());
        Checks.checkState((this.writePromise == null ? 1 : 0) != 0, (Object)"Concurrent writes");
        if (this.isClosed()) {
            if (recyclable != null) {
                recyclable.recycle();
            }
            return Promise.ofException((Exception)this.getException());
        }
        this.writePromise = writePromise = new SettablePromise();
        runnable.run().subscribe((result, e) -> {
            this.writePromise = null;
            writePromise.trySet(result, e);
        });
        return writePromise;
    }

    private ChannelSupplier<IWebSocket.Frame> sanitize(final ChannelSupplier<IWebSocket.Frame> supplier) {
        return new AbstractChannelSupplier<IWebSocket.Frame>(supplier){

            protected Promise<IWebSocket.Frame> doGet() {
                return this.sanitize(supplier.get());
            }

            protected void onClosed(Exception e) {
                WebSocket.this.closeEx(e);
            }
        };
    }

    private ChannelConsumer<IWebSocket.Frame> sanitize(final ChannelConsumer<IWebSocket.Frame> consumer) {
        return new AbstractChannelConsumer<IWebSocket.Frame>(consumer){

            protected Promise<Void> doAccept(@Nullable IWebSocket.Frame value) {
                return this.sanitize(consumer.accept((Object)value));
            }

            protected void onClosed(Exception e) {
                WebSocket.this.closeEx(e);
            }
        };
    }
}

