/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.RetryTransactionException;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.ShardSpecFactory;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;

public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStorageCoordinator {
    private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
    private final ObjectMapper jsonMapper;
    private final MetadataStorageTablesConfig dbTables;
    private final SQLMetadataConnector connector;

    @Inject
    public IndexerSQLMetadataStorageCoordinator(ObjectMapper jsonMapper, MetadataStorageTablesConfig dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.dbTables = dbTables;
        this.connector = connector;
    }

    @LifecycleStart
    public void start() {
        this.connector.createDataSourceTable();
        this.connector.createPendingSegmentsTable();
        this.connector.createSegmentTable();
    }

    @Override
    public Collection<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> intervals, Segments visibility) {
        return (Collection)this.connector.retryWithHandle(handle -> {
            if (visibility == Segments.ONLY_VISIBLE) {
                VersionedIntervalTimeline<String, DataSegment> timeline = this.getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
                return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
            }
            return this.getAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
        });
    }

    private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(Handle handle, String dataSource, Interval interval) throws IOException {
        ArrayList<SegmentIdWithShardSpec> identifiers = new ArrayList<SegmentIdWithShardSpec>();
        ResultIterator dbSegments = ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start <= :end and %2$send%2$s >= :start", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((ResultSetMapper)ByteArrayMapper.FIRST).iterator();
        while (dbSegments.hasNext()) {
            byte[] payload = (byte[])dbSegments.next();
            SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
            if (!interval.overlaps((ReadableInterval)identifier.getInterval())) continue;
            identifiers.add(identifier);
        }
        dbSegments.close();
        return identifiers;
    }

    private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) {
        Query<Map<String, Object>> sql = this.createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals);
        try (ResultIterator dbSegments = sql.map((ResultSetMapper)ByteArrayMapper.FIRST).iterator();){
            VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments((Iterator)Iterators.transform((Iterator)dbSegments, payload -> (DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])payload, DataSegment.class)));
            return versionedIntervalTimeline;
        }
    }

    private Collection<DataSegment> getAllUsedSegmentsForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) {
        return this.createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals).map((index, r, ctx) -> (DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getBytes("payload"), DataSegment.class)).list();
    }

    private Query<Map<String, Object>> createUsedSegmentsSqlQueryForIntervals(Handle handle, String dataSource, List<Interval> intervals) {
        if (intervals == null || intervals.isEmpty()) {
            throw new IAE("null/empty intervals", new Object[0]);
        }
        StringBuilder sb = new StringBuilder();
        sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (");
        for (int i = 0; i < intervals.size(); ++i) {
            sb.append(StringUtils.format((String)"(start < ? AND %1$send%1$s > ?)", (Object[])new Object[]{this.connector.getQuoteString()}));
            if (i == intervals.size() - 1) {
                sb.append(")");
                continue;
            }
            sb.append(" OR ");
        }
        Query sql = (Query)handle.createQuery(StringUtils.format((String)sb.toString(), (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind(0, dataSource);
        for (int i = 0; i < intervals.size(); ++i) {
            Interval interval = intervals.get(i);
            sql = (Query)((Query)sql.bind(2 * i + 1, interval.getEnd().toString())).bind(2 * i + 2, interval.getStart().toString());
        }
        return sql;
    }

    @Override
    public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException {
        SegmentPublishResult result = this.announceHistoricalSegments(segments, null, null);
        if (!result.isSuccess()) {
            throw new ISE("WTF?! announceHistoricalSegments failed with null metadata, should not happen.", new Object[0]);
        }
        return result.getSegments();
    }

    @Override
    public SegmentPublishResult announceHistoricalSegments(final Set<DataSegment> segments, final @Nullable DataSourceMetadata startMetadata, final @Nullable DataSourceMetadata endMetadata) throws IOException {
        if (segments.isEmpty()) {
            throw new IllegalArgumentException("segment set must not be empty");
        }
        final String dataSource = segments.iterator().next().getDataSource();
        for (DataSegment segment : segments) {
            if (dataSource.equals(segment.getDataSource())) continue;
            throw new IllegalArgumentException("segments must all be from the same dataSource");
        }
        if (startMetadata == null && endMetadata != null || startMetadata != null && endMetadata == null) {
            throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
        }
        final HashSet<Object> usedSegments = new HashSet<Object>();
        List segmentHolders = VersionedIntervalTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY);
        for (TimelineObjectHolder holder : segmentHolders) {
            for (PartitionChunk chunk : holder.getObject()) {
                usedSegments.add(chunk.getObject());
            }
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    DataSourceMetadataUpdateResult result;
                    definitelyNotUpdated.set(false);
                    HashSet<DataSegment> inserted = new HashSet<DataSegment>();
                    if (startMetadata != null && (result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata)) != DataSourceMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result == DataSourceMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    for (DataSegment segment : segments) {
                        if (!IndexerSQLMetadataStorageCoordinator.this.announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) continue;
                        inserted.add(segment);
                    }
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.copyOf(inserted));
                }
            }, 3, 10);
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @Override
    public SegmentPublishResult commitMetadataOnly(final String dataSource, final DataSourceMetadata startMetadata, final DataSourceMetadata endMetadata) {
        if (dataSource == null) {
            throw new IllegalArgumentException("datasource name cannot be null");
        }
        if (startMetadata == null) {
            throw new IllegalArgumentException("start metadata cannot be null");
        }
        if (endMetadata == null) {
            throw new IllegalArgumentException("end metadata cannot be null");
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    definitelyNotUpdated.set(false);
                    DataSourceMetadataUpdateResult result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
                    if (result != DataSourceMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result == DataSourceMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.of());
                }
            }, 3, 10);
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @Override
    public SegmentIdWithShardSpec allocatePendingSegment(String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, ShardSpecFactory shardSpecFactory, String maxVersion, boolean skipSegmentLineageCheck) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)sequenceName, (Object)"sequenceName");
        Preconditions.checkNotNull((Object)interval, (Object)"interval");
        Preconditions.checkNotNull((Object)maxVersion, (Object)"version");
        return (SegmentIdWithShardSpec)this.connector.retryWithHandle(handle -> {
            if (skipSegmentLineageCheck) {
                return this.allocatePendingSegment(handle, dataSource, sequenceName, interval, shardSpecFactory, maxVersion);
            }
            return this.allocatePendingSegmentWithSegmentLineageCheck(handle, dataSource, sequenceName, previousSegmentId, interval, shardSpecFactory, maxVersion);
        });
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(Handle handle, String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, ShardSpecFactory shardSpecFactory, String maxVersion) throws IOException {
        String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
        CheckExistingSegmentIdResult result = this.checkAndGetExistingSegmentId((Query<Map<String, Object>>)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})), interval, sequenceName, previousSegmentIdNotNull, Pair.of((Object)"dataSource", (Object)dataSource), Pair.of((Object)"sequence_name", (Object)sequenceName), Pair.of((Object)"sequence_prev_id", (Object)previousSegmentIdNotNull));
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, shardSpecFactory, maxVersion);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putBytes(StringUtils.toUtf8((String)previousSegmentIdNotNull)).hash().asBytes());
        this.insertToMetastore(handle, newIdentifier, dataSource, interval, previousSegmentIdNotNull, sequenceName, sequenceNamePrevIdSha1);
        return newIdentifier;
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegment(Handle handle, String dataSource, String sequenceName, Interval interval, ShardSpecFactory shardSpecFactory, String maxVersion) throws IOException {
        CheckExistingSegmentIdResult result = this.checkAndGetExistingSegmentId((Query<Map<String, Object>>)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND start = :start AND %2$send%2$s = :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})), interval, sequenceName, null, Pair.of((Object)"dataSource", (Object)dataSource), Pair.of((Object)"sequence_name", (Object)sequenceName), Pair.of((Object)"start", (Object)interval.getStart().toString()), Pair.of((Object)"end", (Object)interval.getEnd().toString()));
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, shardSpecFactory, maxVersion);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putLong(interval.getStartMillis()).putLong(interval.getEndMillis()).hash().asBytes());
        this.insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
        log.info("Allocated pending segment [%s] for sequence[%s] in DB", new Object[]{newIdentifier, sequenceName});
        return newIdentifier;
    }

    private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(Query<Map<String, Object>> query, Interval interval, String sequenceName, @Nullable String previousSegmentId, Pair<String, String> ... queryVars) throws IOException {
        Query boundQuery = query;
        for (Pair<String, String> var : queryVars) {
            boundQuery = (Query)boundQuery.bind((String)var.lhs, (String)var.rhs);
        }
        List existingBytes = boundQuery.map((ResultSetMapper)ByteArrayMapper.FIRST).list();
        if (!existingBytes.isEmpty()) {
            SegmentIdWithShardSpec existingIdentifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue((byte[])Iterables.getOnlyElement((Iterable)existingBytes), SegmentIdWithShardSpec.class);
            if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
                if (previousSegmentId == null) {
                    log.info("Found existing pending segment [%s] for sequence[%s] in DB", new Object[]{existingIdentifier, sequenceName});
                } else {
                    log.info("Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", new Object[]{existingIdentifier, sequenceName, previousSegmentId});
                }
                return new CheckExistingSegmentIdResult(true, existingIdentifier);
            }
            if (previousSegmentId == null) {
                log.warn("Cannot use existing pending segment [%s] for sequence[%s] in DB, does not match requested interval[%s]", new Object[]{existingIdentifier, sequenceName, interval});
            } else {
                log.warn("Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, does not match requested interval[%s]", new Object[]{existingIdentifier, sequenceName, previousSegmentId, interval});
            }
            return new CheckExistingSegmentIdResult(true, null);
        }
        return new CheckExistingSegmentIdResult(false, null);
    }

    private void insertToMetastore(Handle handle, SegmentIdWithShardSpec newIdentifier, String dataSource, Interval interval, String previousSegmentId, String sequenceName, String sequenceNamePrevIdSha1) throws JsonProcessingException {
        ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("id", newIdentifier.toString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentId)).bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)).bind("payload", this.jsonMapper.writeValueAsBytes((Object)newIdentifier))).execute();
    }

    @Nullable
    private SegmentIdWithShardSpec createNewSegment(Handle handle, String dataSource, Interval interval, ShardSpecFactory shardSpecFactory, String maxVersion) throws IOException {
        List existingChunks = this.getTimelineForIntervalsWithHandle(handle, dataSource, (List<Interval>)ImmutableList.of((Object)interval)).lookup(interval);
        if (existingChunks.size() > 1) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.", new Object[]{dataSource, interval, existingChunks.size()});
            return null;
        }
        if (existingChunks.stream().flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)).anyMatch(chunk -> !((DataSegment)chunk.getObject()).getShardSpec().isCompatible(shardSpecFactory.getShardSpecClass()))) {
            return null;
        }
        SegmentIdWithShardSpec maxId = null;
        if (!existingChunks.isEmpty()) {
            TimelineObjectHolder existingHolder = (TimelineObjectHolder)Iterables.getOnlyElement((Iterable)existingChunks);
            maxId = StreamSupport.stream(existingHolder.getObject().spliterator(), false).filter(chunk -> ((DataSegment)chunk.getObject()).getShardSpec().getClass() == shardSpecFactory.getShardSpecClass()).max(Comparator.comparing(chunk -> ((DataSegment)chunk.getObject()).getShardSpec().getPartitionNum())).map(chunk -> SegmentIdWithShardSpec.fromDataSegment((DataSegment)chunk.getObject())).orElse(null);
        }
        List<SegmentIdWithShardSpec> pendings = this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
        if (maxId != null) {
            pendings.add(maxId);
        }
        maxId = pendings.stream().filter(id -> id.getShardSpec().getClass() == shardSpecFactory.getShardSpecClass()).max((id1, id2) -> {
            int versionCompare = id1.getVersion().compareTo(id2.getVersion());
            if (versionCompare != 0) {
                return versionCompare;
            }
            return Integer.compare(id1.getShardSpec().getPartitionNum(), id2.getShardSpec().getPartitionNum());
        }).orElse(null);
        String versionOfExistingChunks = !existingChunks.isEmpty() ? (String)((TimelineObjectHolder)existingChunks.get(0)).getVersion() : (!pendings.isEmpty() ? pendings.get(0).getVersion() : null);
        if (maxId == null) {
            ShardSpec shardSpec = shardSpecFactory.create(this.jsonMapper, null);
            return new SegmentIdWithShardSpec(dataSource, interval, versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks, shardSpec);
        }
        if (!maxId.getInterval().equals((Object)interval) || maxId.getVersion().compareTo(maxVersion) > 0) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", new Object[]{dataSource, interval, maxVersion, maxId});
            return null;
        }
        ShardSpec newShardSpec = shardSpecFactory.create(this.jsonMapper, maxId.getShardSpec());
        return new SegmentIdWithShardSpec(dataSource, maxId.getInterval(), (String)Preconditions.checkNotNull((Object)versionOfExistingChunks, (Object)"versionOfExistingChunks"), newShardSpec);
    }

    @Override
    public int deletePendingSegments(String dataSource, Interval deleteInterval) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"delete from %s where datasource = :dataSource and created_date >= :start and created_date < :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})).bind("dataSource", dataSource)).bind("start", deleteInterval.getStart().toString())).bind("end", deleteInterval.getEnd().toString())).execute());
    }

    private boolean announceHistoricalSegment(Handle handle, DataSegment segment, boolean used) throws IOException {
        try {
            if (this.segmentExists(handle, segment)) {
                log.info("Found [%s] in DB, not updating DB", new Object[]{segment.getId()});
                return false;
            }
            int numRowsInserted = ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), this.connector.getQuoteString()})).bind("id", segment.getId().toString())).bind("dataSource", segment.getDataSource())).bind("created_date", DateTimes.nowUtc().toString())).bind("start", segment.getInterval().getStart().toString())).bind("end", segment.getInterval().getEnd().toString())).bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))).bind("version", segment.getVersion())).bind("used", used)).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).execute();
            if (numRowsInserted != 1) {
                if (numRowsInserted == 0) {
                    throw new ISE("Failed to publish segment[%s] to DB with used flag[%s], json[%s]", new Object[]{segment.getId(), used, this.jsonMapper.writeValueAsString((Object)segment)});
                }
                throw new ISE("WTH? numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]", new Object[]{numRowsInserted, segment.getId(), used, this.jsonMapper.writeValueAsString((Object)segment)});
            }
            log.info("Published segment [%s] to DB with used flag [%s], json[%s]", new Object[]{segment.getId(), used, this.jsonMapper.writeValueAsString((Object)segment)});
        }
        catch (Exception e) {
            log.error((Throwable)e, "Exception inserting segment [%s] with used flag [%s] into DB", new Object[]{segment.getId(), used});
            throw e;
        }
        return true;
    }

    private boolean segmentExists(Handle handle, DataSegment segment) {
        return !((Query)handle.createQuery(StringUtils.format((String)"SELECT id FROM %s WHERE id = :identifier", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("identifier", segment.getId().toString())).map((ResultSetMapper)StringMapper.FIRST).list().isEmpty();
    }

    @Override
    public DataSourceMetadata getDataSourceMetadata(String dataSource) {
        byte[] bytes = this.connector.lookup(this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
        if (bytes == null) {
            return null;
        }
        return (DataSourceMetadata)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])bytes, DataSourceMetadata.class);
    }

    private byte[] getDataSourceMetadataWithHandleAsBytes(Handle handle, String dataSource) {
        return this.connector.lookupWithHandle(handle, this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
    }

    protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(Handle handle, String dataSource, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) throws IOException {
        DataSourceMetadataUpdateResult retVal;
        DataSourceMetadata oldCommitMetadataFromDb;
        String oldCommitMetadataSha1FromDb;
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)startMetadata, (Object)"startMetadata");
        Preconditions.checkNotNull((Object)endMetadata, (Object)"endMetadata");
        byte[] oldCommitMetadataBytesFromDb = this.getDataSourceMetadataWithHandleAsBytes(handle, dataSource);
        if (oldCommitMetadataBytesFromDb == null) {
            oldCommitMetadataSha1FromDb = null;
            oldCommitMetadataFromDb = null;
        } else {
            oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes());
            oldCommitMetadataFromDb = (DataSourceMetadata)this.jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
        }
        boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null ? startMetadata.isValidStart() : startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
        if (!startMetadataMatchesExisting) {
            log.error("Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].", new Object[]{oldCommitMetadataFromDb, startMetadata});
            return DataSourceMetadataUpdateResult.FAILURE;
        }
        DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null ? endMetadata : oldCommitMetadataFromDb.plus(endMetadata);
        byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)newCommitMetadata);
        String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        if (oldCommitMetadataBytesFromDb == null) {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", newCommitMetadataBytes)).bind("commit_metadata_sha1", newCommitMetadataSha1)).execute();
            retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
        } else {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
            DataSourceMetadataUpdateResult dataSourceMetadataUpdateResult = retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
        }
        if (retVal == DataSourceMetadataUpdateResult.SUCCESS) {
            log.info("Updated metadata from[%s] to[%s].", new Object[]{oldCommitMetadataFromDb, newCommitMetadata});
        } else {
            log.info("Not updating metadata, compare-and-swap failure.", new Object[0]);
        }
        return retVal;
    }

    @Override
    public boolean deleteDataSourceMetadata(final String dataSource) {
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int rows = ((Update)handle.createStatement(StringUtils.format((String)"DELETE from %s WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).execute();
                return rows > 0;
            }
        });
    }

    @Override
    public boolean resetDataSourceMetadata(final String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException {
        final byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)dataSourceMetadata);
        final String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int numRows = ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
                return numRows == 1;
            }
        });
    }

    @Override
    public void updateSegmentMetadata(final Set<DataSegment> segments) {
        this.connector.getDBI().inTransaction((TransactionCallback)new TransactionCallback<Void>(){

            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.updatePayload(handle, segment);
                }
                return null;
            }
        });
    }

    @Override
    public void deleteSegments(final Set<DataSegment> segments) {
        this.connector.getDBI().inTransaction((TransactionCallback)new TransactionCallback<Void>(){

            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.deleteSegment(handle, segment);
                }
                return null;
            }
        });
    }

    private void deleteSegment(Handle handle, DataSegment segment) {
        ((Update)handle.createStatement(StringUtils.format((String)"DELETE from %s WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("id", segment.getId().toString())).execute();
    }

    private void updatePayload(Handle handle, DataSegment segment) throws IOException {
        try {
            ((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET payload = :payload WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("id", segment.getId().toString())).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).execute();
        }
        catch (IOException e) {
            log.error((Throwable)e, "Exception inserting into DB", new Object[0]);
            throw e;
        }
    }

    @Override
    public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval) {
        List<DataSegment> matchingSegments = this.connector.inReadOnlyTransaction(new TransactionCallback<List<DataSegment>>(){

            public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) {
                return (List)((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start and start <= :end and %2$send%2$s <= :end and used = false", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getSegmentsTable(), IndexerSQLMetadataStorageCoordinator.this.connector.getQuoteString()})).setFetchSize(IndexerSQLMetadataStorageCoordinator.this.connector.getStreamingFetchSize()).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((ResultSetMapper)ByteArrayMapper.FIRST).fold(new ArrayList(), (Folder3)new Folder3<List<DataSegment>, byte[]>(){

                    public List<DataSegment> fold(List<DataSegment> accumulator, byte[] payload, FoldController foldController, StatementContext statementContext) {
                        accumulator.add((DataSegment)JacksonUtils.readValue((ObjectMapper)IndexerSQLMetadataStorageCoordinator.this.jsonMapper, (byte[])payload, DataSegment.class));
                        return accumulator;
                    }
                });
            }
        });
        log.info("Found %,d segments for %s for interval %s.", new Object[]{matchingSegments.size(), dataSource, interval});
        return matchingSegments;
    }

    @Override
    public Collection<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval) {
        return (Collection)this.connector.retryWithHandle(handle -> ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND start >= :start AND %2$send%2$s <= :end AND used = true", (Object[])new Object[]{this.dbTables.getSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((index, r, ctx) -> new Pair(JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getBytes("payload"), DataSegment.class), (Object)r.getString("created_date"))).list());
    }

    @Override
    public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) {
        return 1 == (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", this.jsonMapper.writeValueAsBytes((Object)metadata))).bind("commit_metadata_sha1", BaseEncoding.base16().encode(Hashing.sha1().hashBytes(this.jsonMapper.writeValueAsBytes((Object)metadata)).asBytes()))).execute());
    }

    private static class CheckExistingSegmentIdResult {
        private final boolean found;
        @Nullable
        private final SegmentIdWithShardSpec segmentIdentifier;

        CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdWithShardSpec segmentIdentifier) {
            this.found = found;
            this.segmentIdentifier = segmentIdentifier;
        }
    }

    static enum DataSourceMetadataUpdateResult {
        SUCCESS,
        FAILURE,
        TRY_AGAIN;

    }
}

