/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.redis.inbound;

import java.time.Duration;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.ConversionFailedException;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.stream.StreamReceiver;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveRedisStreamMessageProducer
extends MessageProducerSupport {
    private final ReactiveRedisConnectionFactory reactiveConnectionFactory;
    private final String streamKey;
    private final StreamReceiver.StreamReceiverOptionsBuilder<String, ?> streamReceiverOptionsBuilder = StreamReceiver.StreamReceiverOptions.builder().pollTimeout(Duration.ZERO).onErrorResume(this::handleReceiverError);
    private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
    private StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions;
    private StreamReceiver<String, ?> streamReceiver;
    private ReadOffset readOffset = ReadOffset.latest();
    private boolean extractPayload = true;
    private boolean autoAck = true;
    @Nullable
    private String consumerGroup;
    @Nullable
    private String consumerName;
    private boolean createConsumerGroup;
    private boolean receiverBuilderOptionSet;

    public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey) {
        Assert.notNull((Object)reactiveConnectionFactory, (String)"'connectionFactory' must not be null");
        Assert.hasText((String)streamKey, (String)"'streamKey' must be set");
        this.reactiveConnectionFactory = reactiveConnectionFactory;
        this.streamKey = streamKey;
    }

    public void setReadOffset(ReadOffset readOffset) {
        this.readOffset = readOffset;
    }

    public void setExtractPayload(boolean extractPayload) {
        this.extractPayload = extractPayload;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }

    public void setConsumerGroup(@Nullable String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public void setConsumerName(@Nullable String consumerName) {
        this.consumerName = consumerName;
    }

    public void setCreateConsumerGroup(boolean createConsumerGroup) {
        this.createConsumerGroup = createConsumerGroup;
    }

    public void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) {
        Assert.isTrue((!this.receiverBuilderOptionSet ? 1 : 0) != 0, (String)"The 'streamReceiverOptions' is mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'");
        this.streamReceiverOptions = streamReceiverOptions;
    }

    private void assertStreamReceiverOptions(String property) {
        Assert.isNull(this.streamReceiverOptions, () -> "'" + property + "' cannot be set when 'StreamReceiver.StreamReceiverOptions' is provided.");
    }

    public void setPollTimeout(Duration pollTimeout) {
        this.assertStreamReceiverOptions("pollTimeout");
        this.streamReceiverOptionsBuilder.pollTimeout(pollTimeout);
        this.receiverBuilderOptionSet = true;
    }

    public void setBatchSize(int recordsPerPoll) {
        this.assertStreamReceiverOptions("batchSize");
        this.streamReceiverOptionsBuilder.batchSize(recordsPerPoll);
        this.receiverBuilderOptionSet = true;
    }

    public void setOnErrorResume(Function<? super Throwable, ? extends Publisher<Void>> resumeFunction) {
        this.assertStreamReceiverOptions("onErrorResume");
        this.streamReceiverOptionsBuilder.onErrorResume(resumeFunction);
        this.receiverBuilderOptionSet = true;
    }

    public void setSerializer(RedisSerializationContext.SerializationPair<?> pair) {
        this.assertStreamReceiverOptions("serializer");
        this.streamReceiverOptionsBuilder.serializer(pair);
        this.receiverBuilderOptionSet = true;
    }

    public void setTargetType(Class<?> targetType) {
        this.assertStreamReceiverOptions("targetType");
        this.streamReceiverOptionsBuilder.targetType(targetType);
        this.receiverBuilderOptionSet = true;
    }

    public void setObjectMapper(HashMapper<?, ?, ?> hashMapper) {
        this.assertStreamReceiverOptions("objectMapper");
        this.streamReceiverOptionsBuilder.objectMapper(hashMapper);
        this.receiverBuilderOptionSet = true;
    }

    public String getComponentType() {
        return "redis:stream-inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        if (this.streamReceiverOptions == null) {
            this.streamReceiverOptions = this.streamReceiverOptionsBuilder.build();
        }
        this.streamReceiver = StreamReceiver.create((ReactiveRedisConnectionFactory)this.reactiveConnectionFactory, this.streamReceiverOptions);
        if (StringUtils.hasText((String)this.consumerName) && !StringUtils.hasText((String)this.consumerGroup)) {
            this.consumerGroup = this.getBeanName();
        }
        ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(this.reactiveConnectionFactory, RedisSerializationContext.string());
        this.reactiveStreamOperations = reactiveRedisTemplate.opsForStream();
    }

    protected void doStart() {
        Flux events;
        StreamOffset offset = StreamOffset.create((Object)this.streamKey, (ReadOffset)this.readOffset);
        if (!StringUtils.hasText((String)this.consumerName)) {
            events = this.streamReceiver.receive(offset);
        } else {
            Mono consumerGroupMono = Mono.empty();
            if (this.createConsumerGroup) {
                consumerGroupMono = this.reactiveStreamOperations.createGroup((Object)this.streamKey, this.consumerGroup).onErrorReturn((Object)this.consumerGroup);
            }
            Consumer consumer = Consumer.from((String)this.consumerGroup, (String)this.consumerName);
            if (offset.getOffset().equals((Object)ReadOffset.latest())) {
                offset = StreamOffset.create((Object)this.streamKey, (ReadOffset)ReadOffset.lastConsumed());
            }
            events = this.autoAck ? this.streamReceiver.receiveAutoAck(consumer, offset) : this.streamReceiver.receive(consumer, offset);
            events = consumerGroupMono.thenMany((Publisher)events);
        }
        Flux messageFlux = events.map(record -> this.buildMessageFromRecord((Record<String, ?>)record, this.extractPayload));
        this.subscribeToPublisher((Publisher)messageFlux);
    }

    private Message<?> buildMessageFromRecord(Record<String, ?> record, boolean extractPayload) {
        AbstractIntegrationMessageBuilder builder = this.getMessageBuilderFactory().withPayload(extractPayload ? record.getValue() : record).setHeader("redis_streamKey", record.getStream()).setHeader("redis_streamMessageId", (Object)record.getId()).setHeader("redis_consumerGroup", (Object)this.consumerGroup).setHeader("redis_consumer", (Object)this.consumerName);
        if (!this.autoAck && this.consumerGroup != null) {
            builder.setHeader("acknowledgmentCallback", () -> this.reactiveStreamOperations.acknowledge(this.consumerGroup, record).subscribe());
        }
        return builder.build();
    }

    private <T> Publisher<T> handleReceiverError(Throwable error) {
        MessageConversionException conversionException;
        Record record;
        Message<?> failedMessage = null;
        if (error instanceof ConversionFailedException && (record = (Record)((ConversionFailedException)error).getValue()) != null) {
            failedMessage = this.buildMessageFromRecord(record, false);
        }
        if (!this.sendErrorMessageIfNecessary(null, (Exception)(conversionException = new MessageConversionException(failedMessage, "Cannot deserialize Redis Stream Record", error)))) {
            this.logger.getLog().error((Object)conversionException);
        }
        return Mono.empty();
    }
}

