/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.rsocket.inbound;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.rsocket.AbstractRSocketConnector;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

public class RSocketInboundGateway
extends MessagingGatewaySupport
implements IntegrationRSocketEndpoint {
    private final String[] path;
    private RSocketInteractionModel[] interactionModels = RSocketInteractionModel.values();
    private RSocketStrategies rsocketStrategies = RSocketStrategies.create();
    @Nullable
    private AbstractRSocketConnector rsocketConnector;
    @Nullable
    private ResolvableType requestElementType;
    private boolean decodeFluxAsUnit;

    public RSocketInboundGateway(String ... pathArg) {
        Assert.notNull((Object)pathArg, (String)"'pathArg' must not be null");
        this.path = Arrays.copyOf(pathArg, pathArg.length);
    }

    public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
        Assert.notNull((Object)rsocketStrategies, (String)"'rsocketStrategies' must not be null");
        this.rsocketStrategies = rsocketStrategies;
    }

    public void setRSocketConnector(AbstractRSocketConnector rsocketConnector) {
        Assert.notNull((Object)rsocketConnector, (String)"'rsocketConnector' must not be null");
        this.rsocketConnector = rsocketConnector;
    }

    public void setInteractionModels(RSocketInteractionModel ... interactionModelsArg) {
        Assert.notNull((Object)interactionModelsArg, (String)"'interactionModelsArg' must not be null");
        this.interactionModels = Arrays.copyOf(interactionModelsArg, interactionModelsArg.length);
    }

    @Override
    public RSocketInteractionModel[] getInteractionModels() {
        return Arrays.copyOf(this.interactionModels, this.interactionModels.length);
    }

    @Override
    public String[] getPath() {
        return Arrays.copyOf(this.path, this.path.length);
    }

    public void setRequestElementClass(Class<?> requestElementClass) {
        this.setRequestElementType(ResolvableType.forClass(requestElementClass));
    }

    public void setRequestElementType(ResolvableType requestElementType) {
        this.requestElementType = requestElementType;
    }

    public void setDecodeFluxAsUnit(boolean decodeFluxAsUnit) {
        this.decodeFluxAsUnit = decodeFluxAsUnit;
    }

    protected void onInit() {
        super.onInit();
        AbstractRSocketConnector rsocketConnectorToUse = this.rsocketConnector;
        if (rsocketConnectorToUse != null) {
            rsocketConnectorToUse.addEndpoint(this);
            this.rsocketStrategies = rsocketConnectorToUse.getRSocketStrategies();
        }
    }

    protected void doStart() {
        super.doStart();
        if (this.rsocketConnector instanceof ClientRSocketConnector) {
            ((ClientRSocketConnector)this.rsocketConnector).connect();
        }
    }

    public Mono<Void> handleMessage(Message<?> requestMessage) {
        if (!this.isRunning()) {
            return Mono.error((Throwable)new MessageDeliveryException(requestMessage, "The RSocket Inbound Gateway '" + this.getComponentName() + "' is stopped; service for path(s) " + Arrays.toString(this.path) + " is not available at the moment."));
        }
        Mono<Message<?>> requestMono = this.decodeRequestMessage(requestMessage);
        AtomicReference<Object> replyTo = RSocketInboundGateway.getReplyToHeader(requestMessage);
        if (replyTo != null) {
            return requestMono.flatMap(x$0 -> this.sendAndReceiveMessageReactive(x$0)).flatMap(replyMessage -> {
                Flux<DataBuffer> reply = this.createReply(replyMessage.getPayload(), requestMessage);
                replyTo.set(reply);
                return Mono.empty();
            });
        }
        return requestMono.flatMap(message -> Mono.deferContextual(context -> Mono.just((Object)message).handle((messageToSend, sink) -> this.send(this.messageWithReactorContextIfAny((Message<?>)messageToSend, (ContextView)context)))));
    }

    private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
        if (!context.isEmpty()) {
            return this.getMessageBuilderFactory().fromMessage(message).setHeader("reactorContext", (Object)context).build();
        }
        return message;
    }

    private Mono<Message<?>> decodeRequestMessage(Message<?> requestMessage) {
        Object data = this.decodePayload(requestMessage);
        if (data == null) {
            return Mono.just(requestMessage);
        }
        return Mono.just((Object)data).map(payload -> MessageBuilder.withPayload((Object)payload).copyHeaders((Map)requestMessage.getHeaders()).build());
    }

    @Nullable
    private Object decodePayload(Message<?> requestMessage) {
        MimeType mimeType = (MimeType)requestMessage.getHeaders().get((Object)"contentType", MimeType.class);
        ResolvableType elementType = this.requestElementType == null ? (mimeType != null && "text".equals(mimeType.getType()) ? ResolvableType.forClass(String.class) : ResolvableType.forClass(byte[].class)) : this.requestElementType;
        Object payload = requestMessage.getPayload();
        Decoder decoder = this.rsocketStrategies.decoder(elementType, mimeType);
        if (payload instanceof DataBuffer) {
            return decoder.decode((DataBuffer)payload, elementType, mimeType, null);
        }
        if (this.decodeFluxAsUnit) {
            return decoder.decode((Publisher)payload, elementType, mimeType, null);
        }
        return Flux.from((Publisher)((Publisher)payload)).handle((buffer, synchronousSink) -> {
            Object value = decoder.decode(buffer, elementType, mimeType, null);
            if (value != null) {
                synchronousSink.next(value);
            }
        });
    }

    private Flux<DataBuffer> createReply(Object reply, Message<?> requestMessage) {
        MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
        DataBufferFactory bufferFactory = (DataBufferFactory)requestMessageHeaders.get((Object)"dataBufferFactory", DataBufferFactory.class);
        if (bufferFactory == null) {
            bufferFactory = this.rsocketStrategies.dataBufferFactory();
        }
        MimeType mimeType = (MimeType)requestMessageHeaders.get((Object)"contentType", MimeType.class);
        return this.encodeContent(reply, ResolvableType.forInstance((Object)reply), bufferFactory, mimeType);
    }

    private Flux<DataBuffer> encodeContent(Object content, ResolvableType returnValueType, DataBufferFactory bufferFactory, @Nullable MimeType mimeType) {
        ReactiveAdapter adapter = this.rsocketStrategies.reactiveAdapterRegistry().getAdapter(returnValueType.resolve(), content);
        Object publisher = adapter != null ? adapter.toPublisher(content) : Flux.just((Object)content);
        return Flux.from((Publisher)publisher).map(value -> this.encodeValue(value, bufferFactory, mimeType));
    }

    private DataBuffer encodeValue(Object element, DataBufferFactory bufferFactory, @Nullable MimeType mimeType) {
        ResolvableType elementType = ResolvableType.forInstance((Object)element);
        Encoder encoder = this.rsocketStrategies.encoder(elementType, mimeType);
        return encoder.encodeValue(element, bufferFactory, elementType, mimeType, null);
    }

    @Nullable
    private static AtomicReference<Object> getReplyToHeader(Message<?> message) {
        Object headerValue = message.getHeaders().get((Object)"rsocketResponse");
        Assert.state((headerValue == null || headerValue instanceof AtomicReference ? 1 : 0) != 0, (String)"Expected AtomicReference");
        return (AtomicReference)headerValue;
    }
}

