/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.broker.acceptor;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.broker.RSocketIndex;
import io.rsocket.broker.RoutingTable;
import io.rsocket.broker.common.Id;
import io.rsocket.broker.common.WellKnownKey;
import io.rsocket.broker.frames.BrokerFrame;
import io.rsocket.broker.frames.BrokerInfo;
import io.rsocket.broker.frames.RouteJoin;
import io.rsocket.broker.frames.RouteSetup;
import io.rsocket.broker.rsocket.ErrorOnDisconnectRSocket;
import io.rsocket.broker.rsocket.RoutingRSocketFactory;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BrokerSocketAcceptor
implements SocketAcceptor {
    protected static final Logger logger = LoggerFactory.getLogger(BrokerSocketAcceptor.class);
    protected final Id brokerId;
    protected final RoutingTable routingTable;
    protected final RSocketIndex rSocketIndex;
    protected final Function<ConnectionSetupPayload, BrokerFrame> payloadExtractor;
    protected final BiConsumer<BrokerInfo, RSocket> brokerInfoConsumer;
    protected final Consumer<BrokerInfo> brokerInfoCleaner;
    protected final RoutingRSocketFactory routingRSocketFactory;

    public BrokerSocketAcceptor(Id brokerId, RoutingTable routingTable, RSocketIndex rSocketIndex, RoutingRSocketFactory routingRSocketFactory, Function<ConnectionSetupPayload, BrokerFrame> payloadExtractor, BiConsumer<BrokerInfo, RSocket> brokerInfoConsumer, Consumer<BrokerInfo> brokerInfoCleaner) {
        this.brokerId = brokerId;
        this.routingTable = routingTable;
        this.rSocketIndex = rSocketIndex;
        this.routingRSocketFactory = routingRSocketFactory;
        this.payloadExtractor = payloadExtractor;
        this.brokerInfoConsumer = brokerInfoConsumer;
        this.brokerInfoCleaner = brokerInfoCleaner;
        logger.info("Starting Broker {}", (Object)brokerId);
    }

    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
        try {
            BrokerFrame BrokerFrame2 = this.payloadExtractor.apply(setup);
            Runnable doCleanup = () -> this.cleanup(BrokerFrame2);
            logger.debug("accept {}", (Object)BrokerFrame2);
            RSocket wrapSendingSocket = this.wrapSendingSocket(sendingSocket, BrokerFrame2);
            if (BrokerFrame2 instanceof BrokerInfo) {
                this.brokerInfoConsumer.accept((BrokerInfo)BrokerFrame2, wrapSendingSocket);
                return this.finalize(sendingSocket, doCleanup);
            }
            if (BrokerFrame2 instanceof RouteSetup) {
                RouteSetup routeSetup = (RouteSetup)BrokerFrame2;
                return Mono.defer(() -> {
                    RouteJoin routeJoin = this.toRouteJoin(routeSetup);
                    this.rSocketIndex.put(routeJoin.getRouteId(), wrapSendingSocket, routeJoin.getTags());
                    this.routingTable.add(routeJoin);
                    return this.finalize(sendingSocket, doCleanup);
                });
            }
            throw new IllegalStateException("RouteSetup not found in metadata");
        }
        catch (Exception e) {
            logger.error("Error accepting setup", (Throwable)e);
            return Mono.error((Throwable)e);
        }
    }

    private Mono<RSocket> finalize(RSocket sendingSocket, Runnable doCleanup) {
        RSocket receivingSocket = this.routingRSocketFactory.create();
        Flux.first((Publisher[])new Publisher[]{receivingSocket.onClose(), sendingSocket.onClose()}).doFinally(s -> doCleanup.run()).subscribe();
        return Mono.just((Object)receivingSocket);
    }

    private void cleanup(BrokerFrame BrokerFrame2) {
        if (BrokerFrame2 instanceof BrokerInfo) {
            BrokerInfo brokerInfo = (BrokerInfo)BrokerFrame2;
            this.brokerInfoCleaner.accept(brokerInfo);
        } else if (BrokerFrame2 instanceof RouteSetup) {
            RouteSetup routeSetup = (RouteSetup)BrokerFrame2;
            Id routeId = routeSetup.getRouteId();
            this.routingTable.remove(routeId);
            this.rSocketIndex.remove(routeId);
        }
    }

    private RSocket wrapSendingSocket(RSocket sendingSocket, BrokerFrame BrokerFrame2) {
        ErrorOnDisconnectRSocket rSocket = new ErrorOnDisconnectRSocket(sendingSocket);
        rSocket.onClose().doFinally(s -> logger.info("Closing socket for {}", (Object)BrokerFrame2));
        return rSocket;
    }

    private RouteJoin toRouteJoin(RouteSetup routeSetup) {
        return ((RouteJoin.Builder)((RouteJoin.Builder)((RouteJoin.Builder)RouteJoin.builder().brokerId(this.brokerId).routeId(routeSetup.getRouteId()).serviceName(routeSetup.getServiceName()).with(routeSetup.getTags())).with(WellKnownKey.ROUTE_ID, routeSetup.getRouteId().toString())).with(WellKnownKey.SERVICE_NAME, routeSetup.getServiceName())).build();
    }
}

