/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.operator.CepRuntimeContext;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class CepOperator<IN, KEY, OUT>
extends AbstractUdfStreamOperator<OUT, PatternProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<KEY, VoidNamespace> {
    private static final long serialVersionUID = -4166778210774160757L;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private static final String NFA_STATE_NAME = "nfaStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient ValueState<NFAState> computationStates;
    private transient MapState<Long, List<IN>> elementQueueState;
    private transient SharedBuffer<IN> partialMatches;
    private transient InternalTimerService<VoidNamespace> timerService;
    private transient NFA<IN> nfa;
    private final EventComparator<IN> comparator;
    private final OutputTag<IN> lateDataOutputTag;
    private final AfterMatchSkipStrategy afterMatchSkipStrategy;
    private transient ContextFunctionImpl context;
    private transient TimestampedCollector<OUT> collector;
    private transient CepRuntimeContext cepRuntimeContext;
    private transient TimerService cepTimerService;
    private transient Counter numLateRecordsDropped;

    public CepOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory, @Nullable EventComparator<IN> comparator, @Nullable AfterMatchSkipStrategy afterMatchSkipStrategy, PatternProcessFunction<IN, OUT> function, @Nullable OutputTag<IN> lateDataOutputTag) {
        super(function);
        this.inputSerializer = (TypeSerializer)Preconditions.checkNotNull(inputSerializer);
        this.nfaFactory = (NFACompiler.NFAFactory)Preconditions.checkNotNull(nfaFactory);
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;
        this.afterMatchSkipStrategy = afterMatchSkipStrategy == null ? AfterMatchSkipStrategy.noSkip() : afterMatchSkipStrategy;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.cepRuntimeContext = new CepRuntimeContext((RuntimeContext)this.getRuntimeContext());
        FunctionUtils.setFunctionRuntimeContext((Function)this.getUserFunction(), (RuntimeContext)this.cepRuntimeContext);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.computationStates = context.getKeyedStateStore().getState(new ValueStateDescriptor(NFA_STATE_NAME, (TypeSerializer)new NFAStateSerializer()));
        this.partialMatches = new SharedBuffer<IN>(context.getKeyedStateStore(), this.inputSerializer);
        this.elementQueueState = context.getKeyedStateStore().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)new ListSerializer(this.inputSerializer)));
        if (context.isRestored()) {
            this.migrateOldState();
        }
    }

    private void migrateOldState() throws Exception {
        this.getKeyedStateBackend().applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("nfaOperatorStateName", new NFA.NFASerializer<IN>(this.inputSerializer)), new KeyedStateFunction<Object, ValueState<NFA.MigratedNFA<IN>>>(){

            public void process(Object key, ValueState<NFA.MigratedNFA<IN>> state) throws Exception {
                NFA.MigratedNFA oldState = (NFA.MigratedNFA)state.value();
                CepOperator.this.computationStates.update((Object)new NFAState(oldState.getComputationStates()));
                org.apache.flink.cep.nfa.SharedBuffer sharedBuffer = oldState.getSharedBuffer();
                CepOperator.this.partialMatches.init(sharedBuffer.getEventsBuffer(), sharedBuffer.getPages());
                state.clear();
            }
        });
    }

    public void open() throws Exception {
        super.open();
        this.timerService = this.getInternalTimerService("watermark-callbacks", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        this.nfa = this.nfaFactory.createNFA();
        this.nfa.open(this.cepRuntimeContext, new Configuration());
        this.context = new ContextFunctionImpl();
        this.collector = new TimestampedCollector(this.output);
        this.cepTimerService = new TimerServiceImpl();
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

    public void close() throws Exception {
        super.close();
        if (this.nfa != null) {
            this.nfa.close();
        }
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (this.isProcessingTime) {
            if (this.comparator == null) {
                NFAState nfaState = this.getNFAState();
                long timestamp = this.getProcessingTimeService().getCurrentProcessingTime();
                this.advanceTime(nfaState, timestamp);
                this.processEvent(nfaState, element.getValue(), timestamp);
                this.updateNFA(nfaState);
            } else {
                long currentTime = this.timerService.currentProcessingTime();
                this.bufferEvent(element.getValue(), currentTime);
                this.timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, currentTime + 1L);
            }
        } else {
            long timestamp = element.getTimestamp();
            Object value = element.getValue();
            if (timestamp > this.timerService.currentWatermark()) {
                this.saveRegisterWatermarkTimer();
                this.bufferEvent(value, timestamp);
            } else if (this.lateDataOutputTag != null) {
                this.output.collect(this.lateDataOutputTag, element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.timerService.currentWatermark();
        if (currentWatermark + 1L > currentWatermark) {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, currentWatermark + 1L);
        }
    }

    private void bufferEvent(IN event, long currentTime) throws Exception {
        ArrayList<IN> elementsForTimestamp = (ArrayList<IN>)this.elementQueueState.get((Object)currentTime);
        if (elementsForTimestamp == null) {
            elementsForTimestamp = new ArrayList<IN>();
        }
        elementsForTimestamp.add(event);
        this.elementQueueState.put((Object)currentTime, elementsForTimestamp);
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        NFAState nfaState = this.getNFAState();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= this.timerService.currentWatermark()) {
            long timestamp = sortedTimestamps.poll();
            this.advanceTime(nfaState, timestamp);
            try (Stream<IN> elements = this.sort((Collection)this.elementQueueState.get((Object)timestamp));){
                elements.forEachOrdered(event -> {
                    try {
                        this.processEvent(nfaState, event, timestamp);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        this.advanceTime(nfaState, this.timerService.currentWatermark());
        this.updateNFA(nfaState);
        if (!sortedTimestamps.isEmpty() || !this.partialMatches.isEmpty()) {
            this.saveRegisterWatermarkTimer();
        }
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        NFAState nfa = this.getNFAState();
        while (!sortedTimestamps.isEmpty()) {
            long timestamp = sortedTimestamps.poll();
            this.advanceTime(nfa, timestamp);
            try (Stream<IN> elements = this.sort((Collection)this.elementQueueState.get((Object)timestamp));){
                elements.forEachOrdered(event -> {
                    try {
                        this.processEvent(nfa, event, timestamp);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        this.updateNFA(nfa);
    }

    private Stream<IN> sort(Collection<IN> elements) {
        Stream<IN> stream = elements.stream();
        return this.comparator == null ? stream : stream.sorted(this.comparator);
    }

    private NFAState getNFAState() throws IOException {
        NFAState nfaState = (NFAState)this.computationStates.value();
        return nfaState != null ? nfaState : this.nfa.createInitialNFAState();
    }

    private void updateNFA(NFAState nfaState) throws IOException {
        if (nfaState.isStateChanged()) {
            nfaState.resetStateChanged();
            this.computationStates.update((Object)nfaState);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> sortedTimestamps = new PriorityQueue<Long>();
        for (Long timestamp : this.elementQueueState.keys()) {
            sortedTimestamps.offer(timestamp);
        }
        return sortedTimestamps;
    }

    private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = this.partialMatches.getAccessor();){
            Collection<Map<String, List<IN>>> patterns = this.nfa.process(sharedBufferAccessor, nfaState, event, timestamp, this.afterMatchSkipStrategy, this.cepTimerService);
            this.processMatchedSequences(patterns, timestamp);
        }
    }

    private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = this.partialMatches.getAccessor();){
            Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut = this.nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
            if (!timedOut.isEmpty()) {
                this.processTimedOutSequences(timedOut);
            }
        }
    }

    private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
        PatternProcessFunction function = (PatternProcessFunction)this.getUserFunction();
        this.setTimestamp(timestamp);
        for (Map<String, List<IN>> matchingSequence : matchingSequences) {
            function.processMatch(matchingSequence, this.context, this.collector);
        }
    }

    private void processTimedOutSequences(Collection<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences) throws Exception {
        PatternProcessFunction function = (PatternProcessFunction)this.getUserFunction();
        if (function instanceof TimedOutPartialMatchHandler) {
            TimedOutPartialMatchHandler timeoutHandler = (TimedOutPartialMatchHandler)((Object)function);
            for (Tuple2<Map<String, List<IN>>, Long> matchingSequence : timedOutSequences) {
                this.setTimestamp((Long)matchingSequence.f1);
                timeoutHandler.processTimedOutMatch((Map)matchingSequence.f0, this.context);
            }
        }
    }

    private void setTimestamp(long timestamp) {
        if (!this.isProcessingTime) {
            this.collector.setAbsoluteTimestamp(timestamp);
        }
        this.context.setTimestamp(timestamp);
    }

    @VisibleForTesting
    boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
        this.setCurrentKey(key);
        return !this.partialMatches.isEmpty();
    }

    @VisibleForTesting
    boolean hasNonEmptyPQ(KEY key) throws Exception {
        this.setCurrentKey(key);
        return !this.elementQueueState.isEmpty();
    }

    @VisibleForTesting
    int getPQSize(KEY key) throws Exception {
        this.setCurrentKey(key);
        int counter = 0;
        for (List elements : this.elementQueueState.values()) {
            counter += elements.size();
        }
        return counter;
    }

    @VisibleForTesting
    long getLateRecordsNumber() {
        return this.numLateRecordsDropped.getCount();
    }

    private class ContextFunctionImpl
    implements PatternProcessFunction.Context {
        private Long timestamp;

        private ContextFunctionImpl() {
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            StreamRecord record = CepOperator.this.isProcessingTime ? new StreamRecord(value) : new StreamRecord(value, this.timestamp());
            CepOperator.this.output.collect(outputTag, record);
        }

        void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }

        @Override
        public long timestamp() {
            return this.timestamp;
        }

        @Override
        public long currentProcessingTime() {
            return CepOperator.this.timerService.currentProcessingTime();
        }
    }

    private class TimerServiceImpl
    implements TimerService {
        private TimerServiceImpl() {
        }

        @Override
        public long currentProcessingTime() {
            return CepOperator.this.timerService.currentProcessingTime();
        }
    }
}

