/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.Committed;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.joda.time.ReadablePeriod;

public class AppenderatorImpl
implements Appenderator {
    private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
    private static final int WARN_DELAY = 1000;
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final Cache cache;
    private final Map<SegmentIdentifier, Sink> sinks = new ConcurrentHashMap<SegmentIdentifier, Sink>();
    private final Set<SegmentIdentifier> droppingSinks = Sets.newConcurrentHashSet();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER);
    private final QuerySegmentWalker texasRanger;
    private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
    private final AtomicInteger totalRows = new AtomicInteger();
    private final Lock commitLock = new ReentrantLock();
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private volatile ListeningExecutorService intermediateTempExecutor = null;
    private volatile long nextFlush;
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public AppenderatorImpl(DataSchema schema, AppenderatorConfig tuningConfig, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, ExecutorService queryExecutorService, IndexIO indexIO, IndexMerger indexMerger, Cache cache, CacheConfig cacheConfig) {
        this.schema = (DataSchema)Preconditions.checkNotNull((Object)schema, (Object)"schema");
        this.tuningConfig = (AppenderatorConfig)Preconditions.checkNotNull((Object)tuningConfig, (Object)"tuningConfig");
        this.metrics = (FireDepartmentMetrics)Preconditions.checkNotNull((Object)metrics, (Object)"metrics");
        this.dataSegmentPusher = (DataSegmentPusher)Preconditions.checkNotNull((Object)dataSegmentPusher, (Object)"dataSegmentPusher");
        this.objectMapper = (ObjectMapper)Preconditions.checkNotNull((Object)objectMapper, (Object)"objectMapper");
        this.segmentAnnouncer = (DataSegmentAnnouncer)Preconditions.checkNotNull((Object)segmentAnnouncer, (Object)"segmentAnnouncer");
        this.indexIO = (IndexIO)Preconditions.checkNotNull((Object)indexIO, (Object)"indexIO");
        this.indexMerger = (IndexMerger)Preconditions.checkNotNull((Object)indexMerger, (Object)"indexMerger");
        this.cache = cache;
        this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker(schema.getDataSource(), this.sinkTimeline, objectMapper, emitter, conglomerate, queryExecutorService, (Cache)Preconditions.checkNotNull((Object)cache, (Object)"cache"), cacheConfig);
        log.info("Created Appenderator for dataSource[%s].", new Object[]{schema.getDataSource()});
    }

    @Override
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override
    public Object startJob() {
        this.tuningConfig.getBasePersistDirectory().mkdirs();
        this.lockBasePersistDirectory();
        Object retVal = this.bootstrapSinksFromDisk();
        this.initializeExecutors();
        this.resetNextFlush();
        return retVal;
    }

    @Override
    public Appenderator.AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, @Nullable Supplier<Committer> committerSupplier, boolean allowIncrementalPersists) throws IndexSizeExceededException, SegmentNotWritableException {
        int sinkRowsInMemoryAfterAdd;
        if (!identifier.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", new Object[]{this.schema.getDataSource(), identifier.getDataSource()});
        }
        Sink sink = this.getOrCreateSink(identifier);
        this.metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
        int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
        try {
            sinkRowsInMemoryAfterAdd = sink.add(row, !allowIncrementalPersists);
        }
        catch (IndexSizeExceededException e) {
            log.error((Throwable)e, "Sink for segment[%s] was unexpectedly full!", new Object[]{identifier});
            throw e;
        }
        if (sinkRowsInMemoryAfterAdd < 0) {
            throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
        }
        int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
        this.rowsCurrentlyInMemory.addAndGet(numAddedRows);
        this.totalRows.addAndGet(numAddedRows);
        boolean isPersistRequired = false;
        if (!sink.canAppendRow() || System.currentTimeMillis() > this.nextFlush || this.rowsCurrentlyInMemory.get() >= this.tuningConfig.getMaxRowsInMemory()) {
            if (allowIncrementalPersists) {
                this.persistAll(committerSupplier == null ? null : (Committer)committerSupplier.get());
            } else {
                isPersistRequired = true;
            }
        }
        return new Appenderator.AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
    }

    @Override
    public List<SegmentIdentifier> getSegments() {
        return ImmutableList.copyOf(this.sinks.keySet());
    }

    @Override
    public int getRowCount(SegmentIdentifier identifier) {
        Sink sink = this.sinks.get(identifier);
        if (sink == null) {
            throw new ISE("No such sink: %s", new Object[]{identifier});
        }
        return sink.getNumRows();
    }

    @Override
    public int getTotalRowCount() {
        return this.totalRows.get();
    }

    @VisibleForTesting
    int getRowsInMemory() {
        return this.rowsCurrentlyInMemory.get();
    }

    private Sink getOrCreateSink(SegmentIdentifier identifier) {
        Sink retVal = this.sinks.get(identifier);
        if (retVal == null) {
            retVal = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions());
            try {
                this.segmentAnnouncer.announceSegment(retVal.getSegment());
            }
            catch (IOException e) {
                log.makeAlert((Throwable)e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", (Object)retVal.getInterval()).emit();
            }
            this.sinks.put(identifier, retVal);
            this.metrics.setSinkCount(this.sinks.size());
            this.sinkTimeline.add(retVal.getInterval(), (Object)retVal.getVersion(), identifier.getShardSpec().createChunk((Object)retVal));
        }
        return retVal;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForIntervals(query, intervals);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForSegments(query, specs);
    }

    @Override
    public void clear() throws InterruptedException {
        try {
            if (this.persistExecutor != null) {
                ListenableFuture uncommitFuture = this.persistExecutor.submit((Callable)new Callable<Object>(){

                    @Override
                    public Object call() throws Exception {
                        try {
                            AppenderatorImpl.this.commitLock.lock();
                            AppenderatorImpl.this.objectMapper.writeValue(AppenderatorImpl.this.computeCommitFile(), (Object)Committed.nil());
                        }
                        finally {
                            AppenderatorImpl.this.commitLock.unlock();
                        }
                        return null;
                    }
                });
                uncommitFuture.get();
                ArrayList futures = Lists.newArrayList();
                for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
                    futures.add(this.abandonSegment(entry.getKey(), entry.getValue(), true));
                }
                Futures.allAsList((Iterable)futures).get();
            }
        }
        catch (ExecutionException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public ListenableFuture<?> drop(SegmentIdentifier identifier) {
        Sink sink = this.sinks.get(identifier);
        if (sink != null) {
            return this.abandonSegment(identifier, sink, true);
        }
        return Futures.immediateFuture(null);
    }

    @Override
    public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, final @Nullable Committer committer) {
        final HashMap currentHydrants = Maps.newHashMap();
        final ArrayList indexesToPersist = Lists.newArrayList();
        int numPersistedRows = 0;
        for (SegmentIdentifier identifier : identifiers) {
            Sink sink = this.sinks.get(identifier);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{identifier});
            }
            ArrayList hydrants = Lists.newArrayList((Iterable)sink);
            currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
            numPersistedRows += sink.getNumRowsInMemory();
            int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
            for (FireHydrant hydrant : hydrants.subList(0, limit)) {
                if (hydrant.hasSwapped()) continue;
                log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", new Object[]{hydrant, identifier});
                indexesToPersist.add(Pair.of((Object)hydrant, (Object)identifier));
            }
            if (!sink.swappable()) continue;
            indexesToPersist.add(Pair.of((Object)sink.swap(), (Object)identifier));
        }
        log.info("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        String threadName = StringUtils.format((String)"%s-incremental-persist", (Object[])new Object[]{this.schema.getDataSource()});
        final Object commitMetadata = committer == null ? null : committer.getMetadata();
        Stopwatch runExecStopwatch = Stopwatch.createStarted();
        final Stopwatch persistStopwatch = Stopwatch.createStarted();
        ListenableFuture future = this.persistExecutor.submit((Callable)new ThreadRenamingCallable<Object>(threadName){

            public Object doCall() {
                try {
                    Object commitHydrants;
                    for (Pair pair : indexesToPersist) {
                        AppenderatorImpl.this.metrics.incrementRowOutputCount(AppenderatorImpl.this.persistHydrant((FireHydrant)pair.lhs, (SegmentIdentifier)pair.rhs));
                    }
                    if (committer != null) {
                        log.info("Committing metadata[%s] for sinks[%s].", new Object[]{commitMetadata, Joiner.on((String)", ").join((Iterable)currentHydrants.entrySet().stream().map(entry -> StringUtils.format((String)"%s:%d", (Object[])new Object[]{entry.getKey(), entry.getValue()})).collect(Collectors.toList()))});
                        committer.run();
                        try {
                            AppenderatorImpl.this.commitLock.lock();
                            commitHydrants = Maps.newHashMap();
                            Committed oldCommit = AppenderatorImpl.this.readCommit();
                            if (oldCommit != null) {
                                commitHydrants.putAll(oldCommit.getHydrants());
                            }
                            commitHydrants.putAll(currentHydrants);
                            AppenderatorImpl.this.writeCommit(new Committed((Map<String, Integer>)commitHydrants, commitMetadata));
                        }
                        finally {
                            AppenderatorImpl.this.commitLock.unlock();
                        }
                    }
                    commitHydrants = commitMetadata;
                    return commitHydrants;
                }
                catch (Exception e) {
                    AppenderatorImpl.this.metrics.incrementFailedPersists();
                    throw Throwables.propagate((Throwable)e);
                }
                finally {
                    AppenderatorImpl.this.metrics.incrementNumPersists();
                    AppenderatorImpl.this.metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
                    persistStopwatch.stop();
                }
            }
        });
        long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(startDelay);
        if (startDelay > 1000L) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{startDelay});
        }
        runExecStopwatch.stop();
        this.resetNextFlush();
        this.rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
        return future;
    }

    @Override
    public ListenableFuture<Object> persistAll(@Nullable Committer committer) {
        return this.persist(this.sinks.keySet(), committer);
    }

    @Override
    public ListenableFuture<SegmentsAndMetadata> push(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer) {
        HashMap theSinks = Maps.newHashMap();
        for (SegmentIdentifier identifier : identifiers) {
            Sink sink = this.sinks.get(identifier);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{identifier});
            }
            theSinks.put(identifier, sink);
            sink.finishWriting();
        }
        return Futures.transform(this.persist(identifiers, committer), commitMetadata -> {
            ArrayList dataSegments = Lists.newArrayList();
            for (Map.Entry entry : theSinks.entrySet()) {
                if (this.droppingSinks.contains(entry.getKey())) {
                    log.info("Skipping push of currently-dropping sink[%s]", new Object[]{entry.getKey()});
                    continue;
                }
                DataSegment dataSegment = this.mergeAndPush((SegmentIdentifier)entry.getKey(), (Sink)entry.getValue());
                if (dataSegment != null) {
                    dataSegments.add(dataSegment);
                    continue;
                }
                log.warn("mergeAndPush[%s] returned null, skipping.", new Object[]{entry.getKey()});
            }
            return new SegmentsAndMetadata(dataSegments, commitMetadata);
        }, (Executor)this.pushExecutor);
    }

    private ListenableFuture<?> pushBarrier() {
        return this.intermediateTempExecutor.submit(() -> this.pushExecutor.submit(() -> {}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DataSegment mergeAndPush(SegmentIdentifier identifier, Sink sink) {
        if (this.sinks.get(identifier) != sink) {
            log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", new Object[]{identifier});
            return null;
        }
        File persistDir = this.computePersistDir(identifier);
        File mergedTarget = new File(persistDir, "merged");
        File descriptorFile = this.computeDescriptorFile(identifier);
        for (FireHydrant hydrant : sink) {
            if (sink.isWritable()) {
                throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", new Object[]{identifier});
            }
            FireHydrant fireHydrant = hydrant;
            synchronized (fireHydrant) {
                if (!hydrant.hasSwapped()) {
                    throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", new Object[]{identifier});
                }
            }
        }
        try {
            File mergedFile;
            if (descriptorFile.exists()) {
                log.info("Segment[%s] already pushed.", new Object[]{identifier});
                return (DataSegment)this.objectMapper.readValue(descriptorFile, DataSegment.class);
            }
            log.info("Pushing merged index for segment[%s].", new Object[]{identifier});
            this.removeDirectory(mergedTarget);
            if (mergedTarget.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", new Object[]{mergedTarget});
            }
            ArrayList indexes = Lists.newArrayList();
            try (Closer closer = Closer.create();){
                for (FireHydrant fireHydrant : sink) {
                    Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
                    QueryableIndex queryableIndex = ((Segment)segmentAndCloseable.lhs).asQueryableIndex();
                    log.info("Adding hydrant[%s]", new Object[]{fireHydrant});
                    indexes.add(queryableIndex);
                    closer.register((Closeable)segmentAndCloseable.rhs);
                }
                mergedFile = this.indexMerger.mergeQueryableIndex((List)indexes, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), mergedTarget, this.tuningConfig.getIndexSpec(), this.tuningConfig.getSegmentWriteOutMediumFactory());
            }
            DataSegment segment = (DataSegment)RetryUtils.retry(() -> this.dataSegmentPusher.push(mergedFile, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes((List)indexes)), true), exception -> exception instanceof Exception, (int)5);
            this.objectMapper.writeValue(descriptorFile, (Object)segment);
            log.info("Pushed merged index for segment[%s], descriptor is: %s", new Object[]{identifier, segment});
            return segment;
        }
        catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn((Throwable)e, "Failed to push merged index for segment[%s].", new Object[]{identifier});
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            log.info("Appenderator already closed", new Object[0]);
            return;
        }
        log.info("Shutting down...", new Object[0]);
        ArrayList futures = Lists.newArrayList();
        for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
            futures.add(this.abandonSegment(entry.getKey(), entry.getValue(), false));
        }
        try {
            Futures.allAsList((Iterable)futures).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn((Throwable)e, "Interrupted during close()", new Object[0]);
        }
        catch (ExecutionException e) {
            log.warn((Throwable)e, "Unable to abandon existing segments during close()", new Object[0]);
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState((this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"persistExecutor not terminated");
            Preconditions.checkState((this.pushExecutor == null || this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"pushExecutor not terminated");
            Preconditions.checkState((this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.pushExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
        this.unlockBasePersistDirectory();
    }

    @Override
    public void closeNow() {
        if (!this.closed.compareAndSet(false, true)) {
            log.info("Appenderator already closed", new Object[0]);
            return;
        }
        log.info("Shutting down immediately...", new Object[0]);
        for (Map.Entry<SegmentIdentifier, Sink> entry : this.sinks.entrySet()) {
            try {
                this.segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Failed to unannounce segment[%s]", new Object[]{this.schema.getDataSource()}).addData("identifier", (Object)entry.getKey().getIdentifierAsString()).emit();
            }
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState((this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"persistExecutor not terminated");
            Preconditions.checkState((this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                this.basePersistDirLockChannel = FileChannel.open(this.computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", new Object[]{this.computeLockFile()});
                }
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void initializeExecutors() {
        int maxPendingPersists = this.tuningConfig.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)"appenderator_persist_%d", (int)maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)"appenderator_merge_%d", (int)1));
        }
        if (this.intermediateTempExecutor == null) {
            this.intermediateTempExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)"appenderator_abandon_%d", (int)0));
        }
    }

    private void shutdownExecutors() {
        if (this.persistExecutor != null) {
            this.persistExecutor.shutdownNow();
        }
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdownNow();
        }
        if (this.intermediateTempExecutor != null) {
            this.intermediateTempExecutor.shutdownNow();
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus((ReadablePeriod)this.tuningConfig.getIntermediatePersistPeriod()).getMillis();
    }

    private Object bootstrapSinksFromDisk() {
        Committed committed;
        Preconditions.checkState((boolean)this.sinks.isEmpty(), (Object)"Already bootstrapped?!");
        File baseDir = this.tuningConfig.getBasePersistDirectory();
        if (!baseDir.exists()) {
            return null;
        }
        File[] files = baseDir.listFiles();
        if (files == null) {
            return null;
        }
        File commitFile = null;
        try {
            this.commitLock.lock();
            commitFile = this.computeCommitFile();
            committed = commitFile.exists() ? (Committed)this.objectMapper.readValue(commitFile, Committed.class) : Committed.nil();
        }
        catch (Exception e) {
            throw new ISE((Throwable)e, "Failed to read commitFile: %s", new Object[]{commitFile});
        }
        finally {
            this.commitLock.unlock();
        }
        log.info("Loading sinks from[%s]: %s", new Object[]{baseDir, committed.getHydrants().keySet()});
        for (File sinkDir : files) {
            File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
            if (!identifierFile.isFile()) continue;
            try {
                SegmentIdentifier identifier = (SegmentIdentifier)this.objectMapper.readValue(new File(sinkDir, IDENTIFIER_FILE_NAME), SegmentIdentifier.class);
                int committedHydrants = committed.getCommittedHydrants(identifier.getIdentifierAsString());
                if (committedHydrants <= 0) {
                    log.info("Removing uncommitted sink at [%s]", new Object[]{sinkDir});
                    FileUtils.deleteDirectory((File)sinkDir);
                    continue;
                }
                File[] sinkFiles = sinkDir.listFiles(new FilenameFilter(){

                    @Override
                    public boolean accept(File dir, String fileName) {
                        return Ints.tryParse((String)fileName) != null;
                    }
                });
                Arrays.sort(sinkFiles, new Comparator<File>(){

                    @Override
                    public int compare(File o1, File o2) {
                        return Ints.compare((int)Integer.parseInt(o1.getName()), (int)Integer.parseInt(o2.getName()));
                    }
                });
                ArrayList hydrants = Lists.newArrayList();
                for (File hydrantDir : sinkFiles) {
                    int hydrantNumber = Integer.parseInt(hydrantDir.getName());
                    if (hydrantNumber >= committedHydrants) {
                        log.info("Removing uncommitted segment at [%s]", new Object[]{hydrantDir});
                        FileUtils.deleteDirectory((File)hydrantDir);
                        continue;
                    }
                    log.info("Loading previously persisted segment at [%s]", new Object[]{hydrantDir});
                    if (hydrantNumber != hydrants.size()) {
                        throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{hydrants.size(), sinkDir});
                    }
                    hydrants.add(new FireHydrant((Segment)new QueryableIndexSegment(identifier.getIdentifierAsString(), this.indexIO.loadIndex(hydrantDir)), hydrantNumber));
                }
                if (committedHydrants != hydrants.size()) {
                    throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{hydrants.size(), sinkDir});
                }
                Sink currSink = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.isReportParseExceptions(), hydrants);
                this.sinks.put(identifier, currSink);
                this.sinkTimeline.add(currSink.getInterval(), (Object)currSink.getVersion(), identifier.getShardSpec().createChunk((Object)currSink));
                this.segmentAnnouncer.announceSegment(currSink.getSegment());
            }
            catch (IOException e) {
                log.makeAlert((Throwable)e, "Problem loading sink[%s] from disk.", new Object[]{this.schema.getDataSource()}).addData("sinkDir", (Object)sinkDir).emit();
            }
        }
        HashSet loadedSinks = Sets.newHashSet((Iterable)Iterables.transform(this.sinks.keySet(), (Function)new Function<SegmentIdentifier, String>(){

            public String apply(SegmentIdentifier input) {
                return input.getIdentifierAsString();
            }
        }));
        Sets.SetView missingSinks = Sets.difference((Set)committed.getHydrants().keySet(), (Set)loadedSinks);
        if (!missingSinks.isEmpty()) {
            throw new ISE("Missing committed sinks [%s]", new Object[]{Joiner.on((String)", ").join((Iterable)missingSinks)});
        }
        return committed.getMetadata();
    }

    private ListenableFuture<?> abandonSegment(final SegmentIdentifier identifier, final Sink sink, final boolean removeOnDiskData) {
        sink.finishWriting();
        this.droppingSinks.add(identifier);
        this.rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
        this.totalRows.addAndGet(-sink.getNumRows());
        return Futures.transform(this.pushBarrier(), (Function)new Function<Object, Object>(){

            @Nullable
            public Object apply(@Nullable Object input) {
                if (AppenderatorImpl.this.sinks.get(identifier) != sink) {
                    log.warn("Sink for segment[%s] no longer valid, not abandoning.", new Object[]{identifier});
                    return null;
                }
                if (removeOnDiskData) {
                    log.info("Removing commit metadata for segment[%s].", new Object[]{identifier});
                    try {
                        AppenderatorImpl.this.commitLock.lock();
                        Committed oldCommit = AppenderatorImpl.this.readCommit();
                        if (oldCommit != null) {
                            AppenderatorImpl.this.writeCommit(oldCommit.without(identifier.getIdentifierAsString()));
                        }
                    }
                    catch (Exception e) {
                        log.makeAlert((Throwable)e, "Failed to update committed segments[%s]", new Object[]{AppenderatorImpl.this.schema.getDataSource()}).addData("identifier", (Object)identifier.getIdentifierAsString()).emit();
                        throw Throwables.propagate((Throwable)e);
                    }
                    finally {
                        AppenderatorImpl.this.commitLock.unlock();
                    }
                }
                try {
                    AppenderatorImpl.this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Failed to unannounce segment[%s]", new Object[]{AppenderatorImpl.this.schema.getDataSource()}).addData("identifier", (Object)identifier.getIdentifierAsString()).emit();
                }
                log.info("Removing sink for segment[%s].", new Object[]{identifier});
                AppenderatorImpl.this.sinks.remove(identifier);
                AppenderatorImpl.this.metrics.setSinkCount(AppenderatorImpl.this.sinks.size());
                AppenderatorImpl.this.droppingSinks.remove(identifier);
                AppenderatorImpl.this.sinkTimeline.remove(sink.getInterval(), (Object)sink.getVersion(), identifier.getShardSpec().createChunk((Object)sink));
                for (FireHydrant hydrant : sink) {
                    if (AppenderatorImpl.this.cache != null) {
                        AppenderatorImpl.this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
                    }
                    hydrant.swapSegment(null);
                }
                if (removeOnDiskData) {
                    AppenderatorImpl.this.removeDirectory(AppenderatorImpl.this.computePersistDir(identifier));
                }
                return null;
            }
        }, (Executor)this.persistExecutor);
    }

    private Committed readCommit() throws IOException {
        File commitFile = this.computeCommitFile();
        if (commitFile.exists()) {
            return (Committed)this.objectMapper.readValue(commitFile, Committed.class);
        }
        return null;
    }

    private void writeCommit(Committed newCommit) throws IOException {
        File commitFile = this.computeCommitFile();
        this.objectMapper.writeValue(commitFile, (Object)newCommit);
    }

    private File computeCommitFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), "commit.json");
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    private File computePersistDir(SegmentIdentifier identifier) {
        return new File(this.tuningConfig.getBasePersistDirectory(), identifier.getIdentifierAsString());
    }

    private File computeIdentifierFile(SegmentIdentifier identifier) {
        return new File(this.computePersistDir(identifier), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdentifier identifier) {
        return new File(this.computePersistDir(identifier), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdentifier identifier) throws IOException {
        File persistDir = this.computePersistDir(identifier);
        FileUtils.forceMkdir((File)persistDir);
        this.objectMapper.writeValue(this.computeIdentifierFile(identifier), (Object)identifier);
        return persistDir;
    }

    private int persistHydrant(FireHydrant indexToPersist, SegmentIdentifier identifier) {
        FireHydrant fireHydrant = indexToPersist;
        synchronized (fireHydrant) {
            if (indexToPersist.hasSwapped()) {
                log.info("Segment[%s], Hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{identifier, indexToPersist});
                return 0;
            }
            log.info("Segment[%s], persisting Hydrant[%s]", new Object[]{identifier, indexToPersist});
            try {
                int numRows = indexToPersist.getIndex().size();
                File persistDir = this.createPersistDirIfNeeded(identifier);
                IndexSpec indexSpec = this.tuningConfig.getIndexSpec();
                File persistedFile = this.indexMerger.persist(indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), indexSpec, this.tuningConfig.getSegmentWriteOutMediumFactory());
                indexToPersist.swapSegment((Segment)new QueryableIndexSegment(indexToPersist.getSegmentIdentifier(), this.indexIO.loadIndex(persistedFile)));
                return numRows;
            }
            catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", new Object[]{this.schema.getDataSource()}).addData("segment", (Object)identifier.getIdentifierAsString()).addData("count", (Object)indexToPersist.getCount()).emit();
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    private void removeDirectory(File target) {
        if (target.exists()) {
            try {
                log.info("Deleting Index File[%s]", new Object[]{target});
                FileUtils.deleteDirectory((File)target);
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Failed to remove directory[%s]", new Object[]{this.schema.getDataSource()}).addData("file", (Object)target).emit();
            }
        }
    }
}

