/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.plumber;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.joda.time.ReadableInterval;

public class CoordinatorBasedSegmentHandoffNotifier
implements SegmentHandoffNotifier {
    private static final Logger log = new Logger(CoordinatorBasedSegmentHandoffNotifier.class);
    private final ConcurrentMap<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks = new ConcurrentHashMap<SegmentDescriptor, Pair<Executor, Runnable>>();
    private final CoordinatorClient coordinatorClient;
    private volatile ScheduledExecutorService scheduledExecutor;
    private final long pollDurationMillis;
    private final String dataSource;

    public CoordinatorBasedSegmentHandoffNotifier(String dataSource, CoordinatorClient coordinatorClient, CoordinatorBasedSegmentHandoffNotifierConfig config) {
        this.dataSource = dataSource;
        this.coordinatorClient = coordinatorClient;
        this.pollDurationMillis = config.getPollDuration().getMillis();
    }

    @Override
    public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) {
        log.info("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", new Object[]{this.dataSource, descriptor});
        Pair<Executor, Runnable> prev = this.handOffCallbacks.putIfAbsent(descriptor, (Pair<Executor, Runnable>)new Pair((Object)exec, (Object)handOffRunnable));
        return prev == null;
    }

    @Override
    public void start() {
        this.scheduledExecutor = Execs.scheduledSingleThreaded((String)"coordinator_handoff_scheduled_%d");
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                CoordinatorBasedSegmentHandoffNotifier.this.checkForSegmentHandoffs();
            }
        }, 0L, this.pollDurationMillis, TimeUnit.MILLISECONDS);
    }

    void checkForSegmentHandoffs() {
        try {
            Iterator itr = this.handOffCallbacks.entrySet().iterator();
            while (itr.hasNext()) {
                Map.Entry entry = itr.next();
                SegmentDescriptor descriptor = (SegmentDescriptor)entry.getKey();
                try {
                    Boolean handOffComplete = this.coordinatorClient.isHandOffComplete(this.dataSource, descriptor);
                    if (handOffComplete == null) {
                        log.warn("Failed to call the new coordinator API for checking segment handoff. Falling back to the old API", new Object[0]);
                        List<ImmutableSegmentLoadInfo> loadedSegments = this.coordinatorClient.fetchServerView(this.dataSource, descriptor.getInterval(), true);
                        handOffComplete = CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(loadedSegments, descriptor);
                    }
                    if (!handOffComplete.booleanValue()) continue;
                    log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", new Object[]{this.dataSource, descriptor});
                    ((Executor)((Pair)entry.getValue()).lhs).execute((Runnable)((Pair)entry.getValue()).rhs);
                    itr.remove();
                }
                catch (Exception e) {
                    log.error((Throwable)e, "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", new Object[]{this.dataSource, descriptor, this.pollDurationMillis});
                }
            }
            if (!this.handOffCallbacks.isEmpty()) {
                log.info("Still waiting for Handoff for Segments : [%s]", new Object[]{this.handOffCallbacks.keySet()});
            }
        }
        catch (Throwable t) {
            log.error(t, "Exception while checking handoff for dataSource[%s], Will try again after [%d]secs", new Object[]{this.dataSource, this.pollDurationMillis});
        }
    }

    static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor) {
        for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
            if (!segmentLoadInfo.getSegment().getInterval().contains((ReadableInterval)descriptor.getInterval()) || segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() != descriptor.getPartitionNumber() || segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) < 0 || !segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) continue;
            return true;
        }
        return false;
    }

    @Override
    public void close() {
        this.scheduledExecutor.shutdown();
    }

    Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks() {
        return this.handOffCallbacks;
    }
}

