/*
 * Decompiled with CFR 0.152.
 */
package com.uber.cadence.internal.worker;

import com.uber.cadence.BadRequestError;
import com.uber.cadence.DomainNotActiveError;
import com.uber.cadence.EntityNotExistsError;
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.RespondActivityTaskCanceledRequest;
import com.uber.cadence.RespondActivityTaskCompletedRequest;
import com.uber.cadence.RespondActivityTaskFailedRequest;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.internal.common.Retryer;
import com.uber.cadence.internal.worker.ActivityPollTask;
import com.uber.cadence.internal.worker.ActivityTaskHandler;
import com.uber.cadence.internal.worker.NoopSuspendableWorker;
import com.uber.cadence.internal.worker.PollTaskExecutor;
import com.uber.cadence.internal.worker.Poller;
import com.uber.cadence.internal.worker.PollerOptions;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.internal.worker.SuspendableWorker;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.Duration;
import com.uber.m3.util.ImmutableMap;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.slf4j.MDC;

public final class ActivityWorker
implements SuspendableWorker {
    private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskList=";
    private SuspendableWorker poller = new NoopSuspendableWorker();
    private final ActivityTaskHandler handler;
    private final IWorkflowService service;
    private final String domain;
    private final String taskList;
    private final SingleWorkerOptions options;

    public ActivityWorker(IWorkflowService service, String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
        this.service = Objects.requireNonNull(service);
        this.domain = Objects.requireNonNull(domain);
        this.taskList = Objects.requireNonNull(taskList);
        this.handler = handler;
        PollerOptions pollerOptions = options.getPollerOptions();
        if (pollerOptions.getPollThreadNamePrefix() == null) {
            pollerOptions = new PollerOptions.Builder(pollerOptions).setPollThreadNamePrefix("Activity Poller taskList=\"" + taskList + "\", domain=\"" + domain + "\"").build();
        }
        this.options = new SingleWorkerOptions.Builder(options).setPollerOptions(pollerOptions).build();
    }

    @Override
    public void start() {
        if (this.handler.isAnyTypeSupported()) {
            this.poller = new Poller<MeasurableActivityTask>(this.options.getIdentity(), new ActivityPollTask(this.service, this.domain, this.taskList, this.options), new PollTaskExecutor<MeasurableActivityTask>(this.domain, this.taskList, this.options, new TaskHandlerImpl(this.handler)), this.options.getPollerOptions(), this.options.getMetricsScope());
            this.poller.start();
            this.options.getMetricsScope().counter("cadence-worker-start").inc(1L);
        }
    }

    @Override
    public boolean isStarted() {
        return this.poller.isStarted();
    }

    @Override
    public boolean isShutdown() {
        return this.poller.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return this.poller.isTerminated();
    }

    @Override
    public void shutdown() {
        this.poller.shutdown();
    }

    @Override
    public void shutdownNow() {
        this.poller.shutdownNow();
    }

    @Override
    public void awaitTermination(long timeout, TimeUnit unit) {
        this.poller.awaitTermination(timeout, unit);
    }

    @Override
    public void suspendPolling() {
        this.poller.suspendPolling();
    }

    @Override
    public void resumePolling() {
        this.poller.resumePolling();
    }

    @Override
    public boolean isSuspended() {
        return this.poller.isSuspended();
    }

    private class TaskHandlerImpl
    implements PollTaskExecutor.TaskHandler<MeasurableActivityTask> {
        final ActivityTaskHandler handler;

        private TaskHandlerImpl(ActivityTaskHandler handler) {
            this.handler = handler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(MeasurableActivityTask task) throws Exception {
            Scope metricsScope = ActivityWorker.this.options.getMetricsScope().tagged((Map)ImmutableMap.of((Object)"ActivityType", (Object)task.task.getActivityType().getName()));
            metricsScope.timer("cadence-tasklist-queue-latency").record(Duration.ofNanos((long)(task.task.getStartedTimestamp() - task.task.getScheduledTimestamp())));
            MDC.put((String)"ActivityID", (String)task.task.getActivityId());
            MDC.put((String)"ActivityType", (String)task.task.getActivityType().getName());
            MDC.put((String)"WorkflowID", (String)task.task.getWorkflowExecution().getWorkflowId());
            MDC.put((String)"RunID", (String)task.task.getWorkflowExecution().getRunId());
            try {
                Stopwatch sw = metricsScope.timer("cadence-activity-execution-latency").start();
                ActivityTaskHandler.Result response = this.handler.handle(task.task, metricsScope, false);
                sw.stop();
                sw = metricsScope.timer("cadence-activity-response-latency").start();
                this.sendReply(task.task, response, metricsScope);
                sw.stop();
                task.markDone();
            }
            catch (CancellationException e) {
                RespondActivityTaskCanceledRequest cancelledRequest = new RespondActivityTaskCanceledRequest();
                cancelledRequest.setDetails(String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8));
                Stopwatch sw = metricsScope.timer("cadence-activity-response-latency").start();
                this.sendReply(task.task, new ActivityTaskHandler.Result(null, null, cancelledRequest, null), metricsScope);
                sw.stop();
            }
            finally {
                MDC.remove((String)"ActivityID");
                MDC.remove((String)"ActivityType");
                MDC.remove((String)"WorkflowID");
                MDC.remove((String)"RunID");
            }
        }

        @Override
        public Throwable wrapFailure(MeasurableActivityTask task, Throwable failure) {
            WorkflowExecution execution = task.task.getWorkflowExecution();
            return new RuntimeException("Failure processing activity task. WorkflowID=" + execution.getWorkflowId() + ", RunID=" + execution.getRunId() + ", ActivityType=" + task.task.getActivityType().getName() + ", ActivityID=" + task.task.getActivityId(), failure);
        }

        private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Result response, Scope metricsScope) throws TException {
            RetryOptions ro = response.getRequestRetryOptions();
            RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
            if (taskCompleted != null) {
                ro = ActivityWorker.this.options.getReportCompletionRetryOptions().merge(ro).addDoNotRetry(BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class);
                taskCompleted.setTaskToken(task.getTaskToken());
                taskCompleted.setIdentity(ActivityWorker.this.options.getIdentity());
                Retryer.retry(ro, () -> ActivityWorker.this.service.RespondActivityTaskCompleted(taskCompleted));
                metricsScope.counter("cadence-activity-task-completed").inc(1L);
            } else if (response.getTaskFailedResult() != null) {
                RespondActivityTaskFailedRequest taskFailed = response.getTaskFailedResult().getTaskFailedRequest();
                ro = ActivityWorker.this.options.getReportFailureRetryOptions().merge(ro).addDoNotRetry(BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class);
                taskFailed.setTaskToken(task.getTaskToken());
                taskFailed.setIdentity(ActivityWorker.this.options.getIdentity());
                Retryer.retry(ro, () -> ActivityWorker.this.service.RespondActivityTaskFailed(taskFailed));
                metricsScope.counter("cadence-activity-task-failed").inc(1L);
            } else {
                RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled();
                if (taskCancelled != null) {
                    taskCancelled.setTaskToken(task.getTaskToken());
                    taskCancelled.setIdentity(ActivityWorker.this.options.getIdentity());
                    ro = ActivityWorker.this.options.getReportFailureRetryOptions().merge(ro).addDoNotRetry(BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class);
                    Retryer.retry(ro, () -> ActivityWorker.this.service.RespondActivityTaskCanceled(taskCancelled));
                    metricsScope.counter("cadence-activity-task-canceled").inc(1L);
                }
            }
        }
    }

    static class MeasurableActivityTask {
        PollForActivityTaskResponse task;
        Stopwatch sw;

        MeasurableActivityTask(PollForActivityTaskResponse task, Stopwatch sw) {
            this.task = Objects.requireNonNull(task);
            this.sw = Objects.requireNonNull(sw);
        }

        void markDone() {
            this.sw.stop();
        }
    }
}

