/*
 * Decompiled with CFR 0.152.
 */
package io.druid.server.coordinator.helper;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;

public class DruidCoordinatorSegmentMerger
implements DruidCoordinatorHelper {
    private static final Logger log = new Logger(DruidCoordinatorSegmentMerger.class);
    private final IndexingServiceClient indexingServiceClient;
    private final AtomicReference<DatasourceWhitelist> whiteListRef;

    @Inject
    public DruidCoordinatorSegmentMerger(IndexingServiceClient indexingServiceClient, JacksonConfigManager configManager) {
        this.indexingServiceClient = indexingServiceClient;
        this.whiteListRef = configManager.watch("coordinator.whitelist", DatasourceWhitelist.class);
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        VersionedIntervalTimeline timeline;
        DatasourceWhitelist whitelist = this.whiteListRef.get();
        CoordinatorStats stats = new CoordinatorStats();
        HashMap dataSources = Maps.newHashMap();
        for (DataSegment dataSegment : params.getAvailableSegments()) {
            if (whitelist != null && !whitelist.contains(dataSegment.getDataSource())) continue;
            timeline = (VersionedIntervalTimeline)dataSources.get(dataSegment.getDataSource());
            if (timeline == null) {
                timeline = new VersionedIntervalTimeline((Comparator)Ordering.natural());
                dataSources.put(dataSegment.getDataSource(), timeline);
            }
            timeline.add(dataSegment.getInterval(), (Object)dataSegment.getVersion(), dataSegment.getShardSpec().createChunk((Object)dataSegment));
        }
        for (Map.Entry entry : dataSources.entrySet()) {
            timeline = (VersionedIntervalTimeline)entry.getValue();
            List timelineObjects = timeline.lookup(new Interval((ReadableInstant)DateTimes.EPOCH, (ReadableInstant)DateTimes.of((String)"3000-01-01")));
            SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
            for (int i = 0; i < timelineObjects.size(); ++i) {
                if (segmentsToMerge.add((TimelineObjectHolder<String, DataSegment>)((TimelineObjectHolder)timelineObjects.get(i))) && segmentsToMerge.getByteCount() <= params.getCoordinatorDynamicConfig().getMergeBytesLimit() && segmentsToMerge.getSegmentCount() < params.getCoordinatorDynamicConfig().getMergeSegmentsLimit()) continue;
                i -= segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
                if (segmentsToMerge.getSegmentCount() > 1) {
                    stats.addToGlobalStat("mergedCount", this.mergeSegments(segmentsToMerge, (String)entry.getKey()));
                }
                if (segmentsToMerge.getSegmentCount() == 0) {
                    ++i;
                }
                segmentsToMerge = new SegmentsToMerge();
            }
            segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
            if (segmentsToMerge.getSegmentCount() <= 1) continue;
            stats.addToGlobalStat("mergedCount", this.mergeSegments(segmentsToMerge, (String)entry.getKey()));
        }
        log.info("Issued merge requests for %s segments", new Object[]{stats.getGlobalStat("mergedCount")});
        params.getEmitter().emit(new ServiceMetricEvent.Builder().build("coordinator/merge/count", (Number)stats.getGlobalStat("mergedCount")));
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource) {
        List<DataSegment> segments = segmentsToMerge.getSegments();
        List segmentNames = Lists.transform(segments, (Function)new Function<DataSegment, String>(){

            public String apply(DataSegment input) {
                return input.getIdentifier();
            }
        });
        log.info("[%s] Found %d segments to merge %s", new Object[]{dataSource, segments.size(), segmentNames});
        try {
            this.indexingServiceClient.mergeSegments(segments);
        }
        catch (Exception e) {
            log.error((Throwable)e, "[%s] Merging error for segments [%s]", new Object[]{dataSource, segmentNames});
        }
        return segments.size();
    }

    private static class SegmentsToMerge {
        private final Multiset<DataSegment> segments;
        private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects = Lists.newArrayList();
        private long byteCount = 0L;

        private SegmentsToMerge() {
            this.segments = HashMultiset.create();
        }

        public List<DataSegment> getSegments() {
            return ImmutableSet.copyOf((Iterable)FunctionalIterable.create(this.timelineObjects).transformCat((Function)new Function<Pair<TimelineObjectHolder<String, DataSegment>, Interval>, Iterable<DataSegment>>(){

                public Iterable<DataSegment> apply(Pair<TimelineObjectHolder<String, DataSegment>, Interval> input) {
                    return Iterables.transform((Iterable)((TimelineObjectHolder)input.lhs).getObject(), (Function)new Function<PartitionChunk<DataSegment>, DataSegment>(){

                        public DataSegment apply(PartitionChunk<DataSegment> input) {
                            return (DataSegment)input.getObject();
                        }
                    });
                }
            })).asList();
        }

        public boolean add(TimelineObjectHolder<String, DataSegment> timelineObject) {
            PartitionChunk firstChunk;
            Interval timelineObjectInterval = timelineObject.getInterval();
            if (this.timelineObjects.size() > 0) {
                Preconditions.checkArgument((timelineObjectInterval.getStart().getMillis() >= ((TimelineObjectHolder)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).lhs).getInterval().getEnd().getMillis() ? 1 : 0) != 0, (Object)"timeline objects must be provided in order");
            }
            if ((firstChunk = (PartitionChunk)Iterables.getFirst((Iterable)timelineObject.getObject(), null)) == null) {
                throw new ISE("Unable to find an underlying interval", new Object[0]);
            }
            Interval underlyingInterval = ((DataSegment)firstChunk.getObject()).getInterval();
            for (PartitionChunk segment : timelineObject.getObject()) {
                if (!(((DataSegment)segment.getObject()).getShardSpec() instanceof NoneShardSpec)) {
                    return false;
                }
                this.segments.add(segment.getObject());
                if (this.segments.count(segment.getObject()) != 1) continue;
                this.byteCount += ((DataSegment)segment.getObject()).getSize();
            }
            Interval mergedUnderlyingInterval = this.getMergedUnderlyingInterval();
            if (mergedUnderlyingInterval == null) {
                this.timelineObjects.add((Pair<TimelineObjectHolder<String, DataSegment>, Interval>)Pair.of(timelineObject, (Object)underlyingInterval));
            } else {
                DateTime start = underlyingInterval.getStart().isBefore((ReadableInstant)mergedUnderlyingInterval.getStart()) ? underlyingInterval.getStart() : mergedUnderlyingInterval.getStart();
                DateTime end = underlyingInterval.getEnd().isAfter((ReadableInstant)mergedUnderlyingInterval.getEnd()) ? underlyingInterval.getEnd() : mergedUnderlyingInterval.getEnd();
                this.timelineObjects.add((Pair<TimelineObjectHolder<String, DataSegment>, Interval>)Pair.of(timelineObject, (Object)new Interval((ReadableInstant)start, (ReadableInstant)end)));
            }
            return true;
        }

        public Interval getMergedTimelineInterval() {
            if (this.timelineObjects.isEmpty()) {
                return null;
            }
            return new Interval((ReadableInstant)((TimelineObjectHolder)this.timelineObjects.get((int)0).lhs).getInterval().getStart(), (ReadableInstant)((TimelineObjectHolder)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).lhs).getInterval().getEnd());
        }

        public Interval getMergedUnderlyingInterval() {
            if (this.timelineObjects.isEmpty()) {
                return null;
            }
            return (Interval)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).rhs;
        }

        public long getByteCount() {
            return this.byteCount;
        }

        public int getSegmentCount() {
            return this.timelineObjects.size();
        }

        public boolean isComplete() {
            return this.timelineObjects.size() == 0 || this.getMergedTimelineInterval().equals((Object)this.getMergedUnderlyingInterval());
        }

        public int backtrack(long maxSize) {
            Preconditions.checkArgument((maxSize >= 0L ? 1 : 0) != 0, (Object)"maxSize >= 0");
            int removed = 0;
            while (!this.isComplete() || this.byteCount > maxSize) {
                ++removed;
                TimelineObjectHolder removedHolder = (TimelineObjectHolder)this.timelineObjects.remove((int)(this.timelineObjects.size() - 1)).lhs;
                for (PartitionChunk segment : removedHolder.getObject()) {
                    this.segments.remove(segment.getObject());
                    if (this.segments.count(segment.getObject()) != 0) continue;
                    this.byteCount -= ((DataSegment)segment.getObject()).getSize();
                }
            }
            return removed;
        }
    }
}

