/*
 * Decompiled with CFR 0.152.
 */
package io.druid.server.coordinator;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import io.druid.java.util.http.client.response.ClientResponse;
import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.timeline.DataSegment;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;

public class HttpLoadQueuePeon
extends LoadQueuePeon {
    public static final TypeReference REQUEST_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeRequest>>(){};
    public static final TypeReference RESPONSE_ENTITY_TYPE_REF = new TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>(){};
    private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class);
    private final AtomicLong queuedSize = new AtomicLong(0L);
    private final AtomicInteger failedAssignCount = new AtomicInteger(0);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap(DruidCoordinator.SEGMENT_COMPARATOR);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap(DruidCoordinator.SEGMENT_COMPARATOR);
    private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<DataSegment>(DruidCoordinator.SEGMENT_COMPARATOR);
    private final ScheduledExecutorService processingExecutor;
    private volatile boolean stopped = false;
    private final Object lock = new Object();
    private final DruidCoordinatorConfig config;
    private final ObjectMapper jsonMapper;
    private final HttpClient httpClient;
    private final URL changeRequestURL;
    private final String serverId;
    private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
    private final ExecutorService callBackExecutor;
    private final ObjectWriter requestBodyWriter;

    public HttpLoadQueuePeon(String baseUrl, ObjectMapper jsonMapper, HttpClient httpClient, DruidCoordinatorConfig config, ScheduledExecutorService processingExecutor, ExecutorService callBackExecutor) {
        this.jsonMapper = jsonMapper;
        this.requestBodyWriter = jsonMapper.writerWithType(REQUEST_ENTITY_TYPE_REF);
        this.httpClient = httpClient;
        this.config = config;
        this.processingExecutor = processingExecutor;
        this.callBackExecutor = callBackExecutor;
        this.serverId = baseUrl;
        try {
            this.changeRequestURL = new URL(new URL(baseUrl), StringUtils.nonStrictFormat((String)"druid-internal/v1/segments/changeRequests?timeout=%d", (Object[])new Object[]{config.getHttpLoadQueuePeonHostTimeout().getMillis()}));
        }
        catch (MalformedURLException ex) {
            throw Throwables.propagate((Throwable)ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSegmentManagement() {
        int batchSize;
        if (this.stopped || !this.mainLoopInProgress.compareAndSet(false, true)) {
            log.debug("[%s]Ignoring tick. Either in-progress already or stopped.", new Object[]{this.serverId});
            return;
        }
        ArrayList<DataSegmentChangeRequest> newRequests = new ArrayList<DataSegmentChangeRequest>(batchSize);
        Object object = this.lock;
        synchronized (object) {
            Iterator iter = Iterators.concat(this.segmentsToDrop.entrySet().iterator(), this.segmentsToLoad.entrySet().iterator());
            for (batchSize = this.config.getHttpLoadQueuePeonBatchSize(); batchSize > 0 && iter.hasNext(); --batchSize) {
                Map.Entry entry = (Map.Entry)iter.next();
                if (((SegmentHolder)entry.getValue()).hasTimedOut()) {
                    ((SegmentHolder)entry.getValue()).requestFailed("timed out");
                    iter.remove();
                    continue;
                }
                newRequests.add(((SegmentHolder)entry.getValue()).getChangeRequest());
            }
        }
        if (newRequests.size() == 0) {
            log.debug("[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", new Object[]{this.serverId, this.segmentsToLoad.size(), this.segmentsToDrop.size(), this.config.getHttpLoadQueuePeonBatchSize()});
            this.mainLoopInProgress.set(false);
            return;
        }
        try {
            log.debug("Sending [%d] load/drop requests to Server[%s].", new Object[]{newRequests.size(), this.serverId});
            final BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
            ListenableFuture future = this.httpClient.go(new Request(HttpMethod.POST, this.changeRequestURL).addHeader("Accept", "application/json").addHeader("Content-Type", "application/json").setContent(this.requestBodyWriter.writeValueAsBytes(newRequests)), (HttpResponseHandler)responseHandler, new Duration(this.config.getHttpLoadQueuePeonHostTimeout().getMillis() + 5000L));
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<InputStream>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void onSuccess(InputStream result) {
                    boolean scheduleNextRunImmediately = true;
                    try {
                        if (responseHandler.status == 204) {
                            log.debug("Received NO CONTENT reseponse from [%s]", new Object[]{HttpLoadQueuePeon.this.serverId});
                            return;
                        }
                        if (200 == responseHandler.status) {
                            try {
                                List statuses = (List)HttpLoadQueuePeon.this.jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF);
                                log.debug("Server[%s] returned status response [%s].", new Object[]{HttpLoadQueuePeon.this.serverId, statuses});
                                Object object = HttpLoadQueuePeon.this.lock;
                                synchronized (object) {
                                    if (HttpLoadQueuePeon.this.stopped) {
                                        log.debug("Ignoring response from Server[%s]. We are already stopped.", new Object[]{HttpLoadQueuePeon.this.serverId});
                                        scheduleNextRunImmediately = false;
                                        return;
                                    }
                                    Iterator iterator = statuses.iterator();
                                    block14: while (iterator.hasNext()) {
                                        SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e = (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus)iterator.next();
                                        switch (e.getStatus().getState()) {
                                            case SUCCESS: 
                                            case FAILED: {
                                                HttpLoadQueuePeon.this.handleResponseStatus(e.getRequest(), e.getStatus());
                                                continue block14;
                                            }
                                            case PENDING: {
                                                log.info("Request[%s] is still pending on server[%s].", new Object[]{e.getRequest(), HttpLoadQueuePeon.this.serverId});
                                                continue block14;
                                            }
                                        }
                                        scheduleNextRunImmediately = false;
                                        log.error("WTF! Server[%s] returned unknown state in status[%s].", new Object[]{HttpLoadQueuePeon.this.serverId, e.getStatus()});
                                    }
                                    return;
                                }
                            }
                            catch (Exception ex) {
                                scheduleNextRunImmediately = false;
                                this.logRequestFailure(ex);
                                return;
                            }
                        }
                        scheduleNextRunImmediately = false;
                        this.logRequestFailure((Throwable)new RE("Unexpected Response Status.", new Object[0]));
                        return;
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                        if (scheduleNextRunImmediately) {
                            HttpLoadQueuePeon.this.processingExecutor.execute(() -> HttpLoadQueuePeon.this.doSegmentManagement());
                        }
                    }
                }

                public void onFailure(Throwable t) {
                    try {
                        this.logRequestFailure(t);
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                    }
                }

                private void logRequestFailure(Throwable t) {
                    log.error(t, "Request[%s] Failed with status[%s]. Reason[%s].", new Object[]{HttpLoadQueuePeon.this.changeRequestURL, responseHandler.status, responseHandler.description});
                }
            }, (Executor)this.processingExecutor);
        }
        catch (Throwable th) {
            log.error(th, "Error sending load/drop request to [%s].", new Object[]{this.serverId});
            this.mainLoopInProgress.set(false);
        }
    }

    private void handleResponseStatus(DataSegmentChangeRequest changeRequest, final SegmentLoadDropHandler.Status status) {
        changeRequest.go(new DataSegmentChangeHandler(){

            @Override
            public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToLoad.remove(segment), status);
            }

            @Override
            public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToDrop.remove(segment), status);
            }

            private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status2) {
                if (holder == null) {
                    return;
                }
                if (status2.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
                    holder.requestFailed(status2.getFailureCause());
                } else {
                    holder.requestSucceeded();
                }
            }
        }, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                throw new ISE("Can't start.", new Object[0]);
            }
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.processingExecutor, (Duration)new Duration((Object)this.config.getHttpLoadQueuePeonRepeatDelay()), (Callable)new Callable<ScheduledExecutors.Signal>(){

                @Override
                public ScheduledExecutors.Signal call() {
                    if (!HttpLoadQueuePeon.this.stopped) {
                        HttpLoadQueuePeon.this.doSegmentManagement();
                    }
                    if (HttpLoadQueuePeon.this.stopped) {
                        return ScheduledExecutors.Signal.STOP;
                    }
                    return ScheduledExecutors.Signal.REPEAT;
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            for (SegmentHolder holder : this.segmentsToDrop.values()) {
                holder.requestSucceeded();
            }
            for (SegmentHolder holder : this.segmentsToLoad.values()) {
                holder.requestSucceeded();
            }
            this.segmentsToDrop.clear();
            this.segmentsToLoad.clear();
            this.queuedSize.set(0L);
            this.failedAssignCount.set(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void loadSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            SegmentHolder holder = this.segmentsToLoad.get(segment);
            if (holder == null) {
                log.info("Server[%s] to load segment[%s] queued.", new Object[]{this.serverId, segment.getIdentifier()});
                this.segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback));
                this.processingExecutor.execute(this::doSegmentManagement);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dropSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            SegmentHolder holder = this.segmentsToDrop.get(segment);
            if (holder == null) {
                log.info("Server[%s] to drop segment[%s] queued.", new Object[]{this.serverId, segment.getIdentifier()});
                this.segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback));
                this.processingExecutor.execute(this::doSegmentManagement);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    @Override
    public Set<DataSegment> getSegmentsToLoad() {
        return Collections.unmodifiableSet(this.segmentsToLoad.keySet());
    }

    @Override
    public Set<DataSegment> getSegmentsToDrop() {
        return Collections.unmodifiableSet(this.segmentsToDrop.keySet());
    }

    @Override
    public long getLoadQueueSize() {
        return this.queuedSize.get();
    }

    @Override
    public int getAndResetFailedAssignCount() {
        return this.failedAssignCount.getAndSet(0);
    }

    @Override
    public void markSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.add(dataSegment);
    }

    @Override
    public void unmarkSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.remove(dataSegment);
    }

    @Override
    public int getNumberOfSegmentsInQueue() {
        return this.segmentsToLoad.size();
    }

    @Override
    public Set<DataSegment> getSegmentsMarkedToDrop() {
        return Collections.unmodifiableSet(this.segmentsMarkedToDrop);
    }

    private static class BytesAccumulatingResponseHandler
    extends InputStreamResponseHandler {
        private int status;
        private String description;

        private BytesAccumulatingResponseHandler() {
        }

        public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response) {
            this.status = response.getStatus().getCode();
            this.description = response.getStatus().getReasonPhrase();
            return ClientResponse.unfinished((Object)super.handleResponse(response).getObj());
        }
    }

    private class DropSegmentHolder
    extends SegmentHolder {
        public DropSegmentHolder(DataSegment segment, LoadPeonCallback callback) {
            super(segment, new SegmentChangeRequestDrop(segment), callback);
        }
    }

    private class LoadSegmentHolder
    extends SegmentHolder {
        public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback) {
            super(segment, new SegmentChangeRequestLoad(segment), callback);
            HttpLoadQueuePeon.this.queuedSize.addAndGet(segment.getSize());
        }

        @Override
        public void requestSucceeded() {
            HttpLoadQueuePeon.this.queuedSize.addAndGet(-this.getSegment().getSize());
            super.requestSucceeded();
        }
    }

    private abstract class SegmentHolder {
        private final DataSegment segment;
        private final DataSegmentChangeRequest changeRequest;
        private final List<LoadPeonCallback> callbacks = Lists.newArrayList();
        private volatile long scheduleTime = -1L;

        private SegmentHolder(DataSegment segment, DataSegmentChangeRequest changeRequest, LoadPeonCallback callback) {
            this.segment = segment;
            this.changeRequest = changeRequest;
            if (callback != null) {
                this.callbacks.add(callback);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addCallback(LoadPeonCallback newCallback) {
            List<LoadPeonCallback> list = this.callbacks;
            synchronized (list) {
                if (newCallback != null) {
                    this.callbacks.add(newCallback);
                }
            }
        }

        public DataSegment getSegment() {
            return this.segment;
        }

        public DataSegmentChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public boolean hasTimedOut() {
            if (this.scheduleTime < 0L) {
                this.scheduleTime = System.currentTimeMillis();
                return false;
            }
            return System.currentTimeMillis() - this.scheduleTime > HttpLoadQueuePeon.this.config.getLoadTimeoutDelay().getMillis();
        }

        public void requestSucceeded() {
            log.info("Server[%s] Successfully processed segment[%s] request[%s].", new Object[]{HttpLoadQueuePeon.this.serverId, this.segment.getIdentifier(), this.changeRequest.getClass().getSimpleName()});
            HttpLoadQueuePeon.this.callBackExecutor.execute(() -> {
                for (LoadPeonCallback callback : this.callbacks) {
                    if (callback == null) continue;
                    callback.execute();
                }
            });
        }

        public void requestFailed(String failureCause) {
            log.error("Server[%s] Failed segment[%s] request[%s] with cause [%s].", new Object[]{HttpLoadQueuePeon.this.serverId, this.segment.getIdentifier(), this.changeRequest.getClass().getSimpleName(), failureCause});
            HttpLoadQueuePeon.this.failedAssignCount.getAndIncrement();
            HttpLoadQueuePeon.this.callBackExecutor.execute(() -> {
                for (LoadPeonCallback callback : this.callbacks) {
                    if (callback == null) continue;
                    callback.execute();
                }
            });
        }

        public String toString() {
            return this.changeRequest.toString();
        }
    }
}

