/*
 * Decompiled with CFR 0.152.
 */
package io.activej.http;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.process.transformer.ChannelConsumerTransformer;
import io.activej.csp.process.transformer.ChannelSupplierTransformer;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.http.AbstractHttpConnection;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpClient;
import io.activej.http.HttpHeaderValue;
import io.activej.http.HttpHeaders;
import io.activej.http.HttpMessage;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.HttpUtils;
import io.activej.http.IWebSocket;
import io.activej.http.WebSocket;
import io.activej.http.WebSocketBufsToFrames;
import io.activej.http.WebSocketConstants;
import io.activej.http.WebSocketFramesToBufs;
import io.activej.promise.Promise;
import io.activej.promise.SettableCallback;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.util.Arrays;

public abstract class WebSocketServlet
extends AbstractReactive
implements AsyncServlet {
    private static final boolean CHECKS = Checks.isEnabled(WebSocketServlet.class);

    protected WebSocketServlet(Reactor reactor) {
        super(reactor);
        Checks.checkState((boolean)IWebSocket.ENABLED, (Object)"Web sockets are disabled by application settings");
    }

    protected Promise<HttpResponse> onRequest(HttpRequest request) {
        return HttpResponse.ofCode(101).toPromise();
    }

    protected abstract void onWebSocket(IWebSocket var1);

    @Override
    public final Promise<HttpResponse> serve(HttpRequest request) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        return WebSocketServlet.validateHeaders(request).thenCallback(cb -> WebSocketServlet.processAnswer(request, (SettableCallback<String>)cb)).then(answer -> {
            ChannelSupplier<ByteBuf> rawStream = request.takeBodyStream();
            assert (rawStream != null);
            return this.onRequest(request).whenException(e -> WebSocketServlet.recycleStream(rawStream)).map(response -> {
                if (response.getCode() != 101) {
                    WebSocketServlet.recycleStream(rawStream);
                    return response;
                }
                Checks.checkState((response.body == null && response.bodyStream == null ? 1 : 0) != 0, (Object)"Illegal body or stream");
                ChannelZeroBuffer buffer = new ChannelZeroBuffer();
                response.bodyStream = buffer.getSupplier();
                response.headers.add(HttpHeaders.UPGRADE, HttpClient.WEBSOCKET_HEADER);
                response.headers.add(HttpHeaders.CONNECTION, HttpClient.UPGRADE_HEADER);
                response.headers.add(HttpHeaders.SEC_WEBSOCKET_ACCEPT, HttpHeaderValue.of(answer));
                WebSocketFramesToBufs encoder = WebSocketFramesToBufs.create(false);
                WebSocketBufsToFrames decoder = WebSocketBufsToFrames.create(request.maxBodySize, encoder::sendPong, ByteBuf::recycle, true);
                WebSocketServlet.bindWebSocketTransformers(rawStream, encoder, decoder);
                this.onWebSocket(new WebSocket(request, (HttpResponse)response, (ChannelSupplier<IWebSocket.Frame>)((ChannelSupplier)rawStream.transformWith((ChannelSupplierTransformer)decoder)), (ChannelConsumer<IWebSocket.Frame>)((ChannelConsumer)buffer.getConsumer().transformWith((ChannelConsumerTransformer)encoder)), decoder::onProtocolError, request.maxBodySize));
                return response;
            });
        });
    }

    private static void bindWebSocketTransformers(ChannelSupplier<ByteBuf> rawStream, WebSocketFramesToBufs encoder, WebSocketBufsToFrames decoder) {
        encoder.getCloseSentPromise().then(decoder::getCloseReceivedPromise).whenResult(arg_0 -> rawStream.closeEx(arg_0)).whenException(arg_0 -> rawStream.closeEx(arg_0));
        decoder.getProcessCompletion().whenResult(() -> encoder.sendCloseFrame(WebSocketConstants.REGULAR_CLOSE)).whenException(arg_0 -> ((WebSocketFramesToBufs)encoder).closeEx(arg_0));
    }

    private static boolean isUpgradeHeaderMissing(HttpMessage message) {
        String headerValue = message.getHeader(HttpHeaders.CONNECTION);
        if (headerValue != null) {
            for (String val : headerValue.split(",")) {
                if (!"upgrade".equalsIgnoreCase(val.trim())) continue;
                return false;
            }
        }
        return true;
    }

    private static Promise<Void> validateHeaders(HttpRequest request) {
        if (WebSocketServlet.isUpgradeHeaderMissing(request) || !Arrays.equals(AbstractHttpConnection.WEB_SOCKET_VERSION, request.getHeader(HttpHeaders.SEC_WEBSOCKET_VERSION, ByteBuf::getArray))) {
            return Promise.ofException((Exception)WebSocketConstants.NOT_A_WEB_SOCKET_REQUEST);
        }
        return Promise.complete();
    }

    private static void processAnswer(HttpRequest request, SettableCallback<String> cb) {
        String header = request.getHeader(HttpHeaders.SEC_WEBSOCKET_KEY);
        if (header == null) {
            cb.setException((Exception)WebSocketConstants.NOT_A_WEB_SOCKET_REQUEST);
            return;
        }
        cb.set((Object)HttpUtils.getWebSocketAnswer(header.trim()));
    }

    private static void recycleStream(ChannelSupplier<ByteBuf> rawStream) {
        rawStream.streamTo(ChannelConsumers.recycling());
    }
}

