/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.core;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.core.StreamObjectMapper;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class DefaultReactiveStreamOperations<K, HK, HV>
implements ReactiveStreamOperations<K, HK, HV> {
    private final ReactiveRedisTemplate<?, ?> template;
    private final RedisSerializationContext<K, ?> serializationContext;
    private final StreamObjectMapper objectMapper;

    DefaultReactiveStreamOperations(ReactiveRedisTemplate<?, ?> template, final RedisSerializationContext<K, ?> serializationContext, @Nullable HashMapper<? super K, ? super HK, ? super HV> hashMapper) {
        this.template = template;
        this.serializationContext = serializationContext;
        this.objectMapper = new StreamObjectMapper(hashMapper){

            @Override
            protected HashMapper<?, ?, ?> doGetHashMapper(final ConversionService conversionService, final Class<?> targetType) {
                if (DefaultReactiveStreamOperations.this.objectMapper.isSimpleType(targetType) || ClassUtils.isAssignable(ByteBuffer.class, targetType)) {
                    return new HashMapper<Object, Object, Object>(){

                        @Override
                        public Map<Object, Object> toHash(Object object) {
                            Object key = "payload";
                            Object value = object;
                            if (serializationContext.getHashKeySerializationPair() == null) {
                                key = key.toString().getBytes(StandardCharsets.UTF_8);
                            }
                            if (serializationContext.getHashValueSerializationPair() == null) {
                                value = conversionService.convert(value, byte[].class);
                            }
                            return Collections.singletonMap(key, value);
                        }

                        @Override
                        public Object fromHash(Map<Object, Object> hash) {
                            Object value = hash.values().iterator().next();
                            if (ClassUtils.isAssignableValue((Class)targetType, (Object)value)) {
                                return value;
                            }
                            Object deserialized = DefaultReactiveStreamOperations.this.deserializeHashValue((ByteBuffer)value);
                            if (ClassUtils.isAssignableValue((Class)targetType, deserialized)) {
                                return value;
                            }
                            return conversionService.convert(deserialized, targetType);
                        }
                    };
                }
                return super.doGetHashMapper(conversionService, targetType);
            }
        };
    }

    @Override
    public Mono<Long> acknowledge(K key, String group, RecordId ... recordIds) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.hasText((String)group, (String)"Group must not be null or empty");
        Assert.notNull((Object)recordIds, (String)"MessageIds must not be null");
        Assert.notEmpty((Object[])recordIds, (String)"MessageIds must not be empty");
        return this.createMono(connection -> connection.xAck(this.rawKey(key), group, recordIds));
    }

    @Override
    public Mono<RecordId> add(Record<K, ?> record) {
        Assert.notNull(record.getStream(), (String)"Key must not be null");
        Assert.notNull(record.getValue(), (String)"Body must not be null");
        MapRecord input = StreamObjectMapper.toMapRecord(this, record);
        return this.createMono(connection -> connection.xAdd(this.serializeRecord(input)));
    }

    @Override
    public Mono<Long> delete(K key, RecordId ... recordIds) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)recordIds, (String)"MessageIds must not be null");
        return this.createMono(connection -> connection.xDel(this.rawKey(key), recordIds));
    }

    @Override
    public Mono<String> createGroup(K key, ReadOffset readOffset, String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)readOffset, (String)"ReadOffset must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createMono(connection -> connection.xGroupCreate(this.rawKey(key), group, readOffset, true));
    }

    @Override
    public Mono<String> deleteConsumer(K key, Consumer consumer) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        return this.createMono(connection -> connection.xGroupDelConsumer(this.rawKey(key), consumer));
    }

    @Override
    public Mono<String> destroyGroup(K key, String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createMono(connection -> connection.xGroupDestroy(this.rawKey(key), group));
    }

    @Override
    public Flux<StreamInfo.XInfoConsumer> consumers(K key, String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createFlux(connection -> connection.xInfoConsumers(this.rawKey(key), group));
    }

    @Override
    public Mono<StreamInfo.XInfoStream> info(K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(connection -> connection.xInfo(this.rawKey(key)));
    }

    @Override
    public Flux<StreamInfo.XInfoGroup> groups(K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createFlux(connection -> connection.xInfoGroups(this.rawKey(key)));
    }

    @Override
    public Mono<PendingMessages> pending(K key, String group, Range<?> range, long count) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(connection -> connection.xPending(rawKey, group, range, (Long)count));
    }

    @Override
    public Mono<PendingMessages> pending(K key, Consumer consumer, Range<?> range, long count) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(connection -> connection.xPending(rawKey, consumer, range, (Long)count));
    }

    @Override
    public Mono<PendingMessagesSummary> pending(K key, String group) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(connection -> connection.xPending(rawKey, group));
    }

    @Override
    public Mono<Long> size(K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(connection -> connection.xLen(this.rawKey(key)));
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> range(K key, Range<String> range, Limit limit) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        return this.createFlux(connection -> connection.xRange(this.rawKey(key), range, limit).map(this::deserializeRecord));
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K> ... streams) {
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"Streams must not be null");
        return this.createFlux(connection -> {
            StreamOffset<ByteBuffer>[] streamOffsets = this.rawStreamOffsets(streams);
            return connection.xRead(readOptions, streamOffsets).map(this::deserializeRecord);
        });
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K> ... streams) {
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"Streams must not be null");
        return this.createFlux(connection -> {
            StreamOffset<ByteBuffer>[] streamOffsets = this.rawStreamOffsets(streams);
            return connection.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord);
        });
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range, Limit limit) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        return this.createFlux(connection -> connection.xRevRange(this.rawKey(key), range, limit).map(this::deserializeRecord));
    }

    @Override
    public Mono<Long> trim(K key, long count) {
        return this.trim(key, count, false);
    }

    @Override
    public Mono<Long> trim(K key, long count, boolean approximateTrimming) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(connection -> connection.xTrim(this.rawKey(key), count, approximateTrimming));
    }

    @Override
    public <V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType) {
        return this.objectMapper.getHashMapper(targetType);
    }

    private StreamOffset<ByteBuffer>[] rawStreamOffsets(StreamOffset<K>[] streams) {
        return (StreamOffset[])Arrays.stream(streams).map(it -> StreamOffset.create(this.rawKey(it.getKey()), it.getOffset())).toArray(StreamOffset[]::new);
    }

    private <T> Mono<T> createMono(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, (String)"Function must not be null");
        return this.template.doCreateMono(connection -> (Publisher)function.apply(connection.streamCommands()));
    }

    private <T> Flux<T> createFlux(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, (String)"Function must not be null");
        return this.template.doCreateFlux(connection -> (Publisher)function.apply(connection.streamCommands()));
    }

    private ByteBuffer rawKey(K key) {
        return this.serializationContext.getKeySerializationPair().write(key);
    }

    private ByteBuffer rawHashKey(HK key) {
        try {
            return this.serializationContext.getHashKeySerializationPair().write(key);
        }
        catch (IllegalStateException illegalStateException) {
            return ByteBuffer.wrap((byte[])this.objectMapper.getConversionService().convert(key, byte[].class));
        }
    }

    private ByteBuffer rawValue(HV value) {
        try {
            return this.serializationContext.getHashValueSerializationPair().write(value);
        }
        catch (IllegalStateException illegalStateException) {
            return ByteBuffer.wrap((byte[])this.objectMapper.getConversionService().convert(value, byte[].class));
        }
    }

    private HK readHashKey(ByteBuffer buffer) {
        return this.serializationContext.getHashKeySerializationPair().getReader().read(buffer);
    }

    private K readKey(ByteBuffer buffer) {
        return this.serializationContext.getKeySerializationPair().read(buffer);
    }

    private HV deserializeHashValue(ByteBuffer buffer) {
        return this.serializationContext.getHashValueSerializationPair().read(buffer);
    }

    @Override
    public MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record) {
        return record.map(it -> it.mapEntries(this::deserializeRecordFields).withStreamKey(this.readKey((ByteBuffer)record.getStream())));
    }

    private Map.Entry<HK, HV> deserializeRecordFields(Map.Entry<ByteBuffer, ByteBuffer> it) {
        return Converters.entryOf(this.readHashKey(it.getKey()), this.deserializeHashValue(it.getValue()));
    }

    private ByteBufferRecord serializeRecord(MapRecord<K, ? extends HK, ? extends HV> record) {
        return ByteBufferRecord.of(record.map(it -> it.mapEntries(this::serializeRecordFields).withStreamKey(this.rawKey(record.getStream()))));
    }

    private Map.Entry<ByteBuffer, ByteBuffer> serializeRecordFields(Map.Entry<? extends HK, ? extends HV> it) {
        return Converters.entryOf(this.rawHashKey(it.getKey()), this.rawValue(it.getValue()));
    }
}

