/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.RSocketErrorException;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.ClientServerInputMultiplexer;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.core.FragmentationUtils;
import io.rsocket.core.LeaseSpec;
import io.rsocket.core.LoggingDuplexConnection;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.core.RSocketRequester;
import io.rsocket.core.RSocketResponder;
import io.rsocket.core.ReassemblyUtils;
import io.rsocket.core.RequesterLeaseTracker;
import io.rsocket.core.ResponderLeaseTracker;
import io.rsocket.core.Resume;
import io.rsocket.core.ServerSetup;
import io.rsocket.core.StreamIdSupplier;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.TrackingLeaseSender;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.plugins.RequestInterceptor;
import io.rsocket.resume.SessionManager;
import io.rsocket.transport.ServerTransport;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

public final class RSocketServer {
    private static final String SERVER_TAG = "server";
    private SocketAcceptor acceptor = SocketAcceptor.with(new RSocket(){});
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Resume resume;
    private Consumer<LeaseSpec> leaseConfigurer = null;
    private int mtu = 0;
    private int maxInboundPayloadSize = Integer.MAX_VALUE;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    private RSocketServer() {
    }

    public static RSocketServer create() {
        return new RSocketServer();
    }

    public static RSocketServer create(SocketAcceptor acceptor) {
        return RSocketServer.create().acceptor(acceptor);
    }

    public RSocketServer acceptor(SocketAcceptor acceptor) {
        Objects.requireNonNull(acceptor);
        this.acceptor = acceptor;
        return this;
    }

    public RSocketServer interceptors(Consumer<InterceptorRegistry> configurer) {
        configurer.accept(this.interceptors);
        return this;
    }

    public RSocketServer resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketServer lease(Consumer<LeaseSpec> leaseConfigurer) {
        this.leaseConfigurer = leaseConfigurer;
        return this;
    }

    public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) {
        this.maxInboundPayloadSize = ReassemblyUtils.assertInboundPayloadSize(maxInboundPayloadSize);
        return this;
    }

    public RSocketServer fragment(int mtu) {
        this.mtu = FragmentationUtils.assertMtu(mtu);
        return this;
    }

    public RSocketServer payloadDecoder(PayloadDecoder decoder) {
        Objects.requireNonNull(decoder);
        this.payloadDecoder = decoder;
        return this;
    }

    public <T extends Closeable> Mono<T> bind(final ServerTransport<T> transport) {
        return Mono.defer((Supplier)new Supplier<Mono<T>>(){
            final ServerSetup serverSetup;
            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            @Override
            public Mono<T> get() {
                int maxFrameLength = transport.maxFrameLength();
                PayloadValidationUtils.assertValidateSetup(maxFrameLength, RSocketServer.this.maxInboundPayloadSize, RSocketServer.this.mtu);
                return transport.start(duplexConnection -> RSocketServer.this.acceptor(this.serverSetup, duplexConnection, maxFrameLength)).doOnNext(c -> c.onClose().doFinally(v -> this.serverSetup.dispose()).subscribe());
            }
        });
    }

    public <T extends Closeable> T bindNow(ServerTransport<T> transport) {
        return (T)((Closeable)this.bind(transport).block());
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
        return this.asConnectionAcceptor(0xFFFFFF);
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor(final int maxFrameLength) {
        PayloadValidationUtils.assertValidateSetup(maxFrameLength, this.maxInboundPayloadSize, this.mtu);
        return new ServerTransport.ConnectionAcceptor(){
            private final ServerSetup serverSetup;
            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            @Override
            public Mono<Void> apply(DuplexConnection connection) {
                return RSocketServer.this.acceptor(this.serverSetup, connection, maxFrameLength);
            }
        };
    }

    private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection sourceConnection, int maxFrameLength) {
        DuplexConnection interceptedConnection = this.interceptors.initConnection(DuplexConnectionInterceptor.Type.SOURCE, sourceConnection);
        return serverSetup.init(LoggingDuplexConnection.wrapIfEnabled(interceptedConnection)).flatMap(tuple2 -> {
            ByteBuf startFrame = (ByteBuf)tuple2.getT1();
            DuplexConnection clientServerConnection = (DuplexConnection)tuple2.getT2();
            return this.accept(serverSetup, startFrame, clientServerConnection, maxFrameLength);
        });
    }

    private Mono<Void> acceptResume(ServerSetup serverSetup, ByteBuf resumeFrame, DuplexConnection clientServerConnection) {
        return serverSetup.acceptRSocketResume(resumeFrame, clientServerConnection);
    }

    private Mono<Void> accept(ServerSetup serverSetup, ByteBuf startFrame, DuplexConnection clientServerConnection, int maxFrameLength) {
        switch (FrameHeaderCodec.frameType(startFrame)) {
            case SETUP: {
                return this.acceptSetup(serverSetup, startFrame, clientServerConnection, maxFrameLength);
            }
            case RESUME: {
                return this.acceptResume(serverSetup, startFrame, clientServerConnection);
            }
        }
        serverSetup.sendError(clientServerConnection, new InvalidSetupException("SETUP or RESUME frame must be received before any others"));
        return clientServerConnection.onClose();
    }

    private Mono<Void> acceptSetup(ServerSetup serverSetup, ByteBuf setupFrame, DuplexConnection clientServerConnection, int maxFrameLength) {
        boolean leaseEnabled;
        if (!SetupFrameCodec.isSupportedVersion(setupFrame)) {
            serverSetup.sendError(clientServerConnection, new InvalidSetupException("Unsupported version: " + SetupFrameCodec.humanReadableVersion(setupFrame)));
            return clientServerConnection.onClose();
        }
        boolean bl = leaseEnabled = this.leaseConfigurer != null;
        if (SetupFrameCodec.honorLease(setupFrame) && !leaseEnabled) {
            serverSetup.sendError(clientServerConnection, new InvalidSetupException("lease is not supported"));
            return clientServerConnection.onClose();
        }
        return serverSetup.acceptRSocketSetup(setupFrame, clientServerConnection, (keepAliveHandler, wrappedDuplexConnection) -> {
            RequesterLeaseTracker requesterLeaseTracker;
            LeaseSpec leases;
            DefaultConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(setupFrame.retain());
            InitializingInterceptorRegistry interceptors = this.interceptors;
            ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer((DuplexConnection)wrappedDuplexConnection, interceptors, false);
            if (leaseEnabled) {
                leases = new LeaseSpec();
                this.leaseConfigurer.accept(leases);
                requesterLeaseTracker = new RequesterLeaseTracker(SERVER_TAG, leases.maxPendingRequests);
            } else {
                leases = null;
                requesterLeaseTracker = null;
            }
            RSocketRequester rSocketRequester = new RSocketRequester(multiplexer.asServerConnection(), this.payloadDecoder, StreamIdSupplier.serverSupplier(), this.mtu, maxFrameLength, this.maxInboundPayloadSize, ((ConnectionSetupPayload)setupPayload).keepAliveInterval(), ((ConnectionSetupPayload)setupPayload).keepAliveMaxLifetime(), (KeepAliveHandler)keepAliveHandler, interceptors::initRequesterRequestInterceptor, requesterLeaseTracker);
            RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
            return interceptors.initSocketAcceptor(this.acceptor).accept(setupPayload, wrappedRSocketRequester).doOnError(err -> serverSetup.sendError((DuplexConnection)wrappedDuplexConnection, this.rejectedSetupError((Throwable)err))).doOnNext(rSocketHandler -> {
                RSocket wrappedRSocketHandler = interceptors.initResponder((RSocket)rSocketHandler);
                DuplexConnection clientConnection = multiplexer.asClientConnection();
                ResponderLeaseTracker responderLeaseTracker = leaseEnabled ? new ResponderLeaseTracker(SERVER_TAG, clientConnection, leases.sender) : null;
                RSocketResponder rSocketResponder = new RSocketResponder(clientConnection, wrappedRSocketHandler, this.payloadDecoder, responderLeaseTracker, this.mtu, maxFrameLength, this.maxInboundPayloadSize, leaseEnabled && leases.sender instanceof TrackingLeaseSender ? rSocket -> interceptors.initResponderRequestInterceptor((RSocket)rSocket, (RequestInterceptor)((Object)leases.sender)) : x$0 -> interceptors.initResponderRequestInterceptor((RSocket)x$0, new RequestInterceptor[0]));
            }).doFinally(signalType -> setupPayload.release()).then();
        });
    }

    private ServerSetup serverSetup() {
        return this.resume != null ? this.createSetup() : new ServerSetup.DefaultServerSetup();
    }

    ServerSetup createSetup() {
        return new ServerSetup.ResumableServerSetup(new SessionManager(), this.resume.getSessionDuration(), this.resume.getStreamTimeout(), this.resume.getStoreFactory(SERVER_TAG), this.resume.isCleanupStoreOnKeepAlive());
    }

    private RSocketErrorException rejectedSetupError(Throwable err) {
        String msg = err.getMessage();
        return new RejectedSetupException(msg == null ? "rejected by server acceptor" : msg);
    }
}

