/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.rest.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/connectors")
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
public class ConnectorsResource
implements ConnectResource {
    private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
    private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE = new TypeReference<List<Map<String, String>>>(){};
    private final Herder herder;
    private final RestClient restClient;
    private long requestTimeoutMs;
    @Context
    private ServletContext context;
    private final boolean isTopicTrackingDisabled;
    private final boolean isTopicTrackingResetDisabled;

    public ConnectorsResource(Herder herder, WorkerConfig config, RestClient restClient) {
        this.herder = herder;
        this.restClient = restClient;
        this.isTopicTrackingDisabled = config.getBoolean("topic.tracking.enable") == false;
        this.isTopicTrackingResetDisabled = config.getBoolean("topic.tracking.allow.reset") == false;
        this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
    }

    @Override
    public void requestTimeout(long requestTimeoutMs) {
        if (requestTimeoutMs < 1L) {
            throw new IllegalArgumentException("REST request timeout must be positive");
        }
        this.requestTimeoutMs = requestTimeoutMs;
    }

    @GET
    @Path(value="/")
    @Operation(summary="List all active connectors")
    public Response listConnectors(@Context UriInfo uriInfo, @Context HttpHeaders headers) {
        if (uriInfo.getQueryParameters().containsKey((Object)"expand")) {
            HashMap out = new HashMap();
            for (String connector : this.herder.connectors()) {
                try {
                    HashMap<String, Object> connectorExpansions = new HashMap<String, Object>();
                    Iterator iterator = ((List)uriInfo.getQueryParameters().get((Object)"expand")).iterator();
                    block11: while (iterator.hasNext()) {
                        String expansion;
                        switch (expansion = (String)iterator.next()) {
                            case "status": {
                                connectorExpansions.put("status", this.herder.connectorStatus(connector));
                                continue block11;
                            }
                            case "info": {
                                connectorExpansions.put("info", this.herder.connectorInfo(connector));
                                continue block11;
                            }
                        }
                        log.info("Ignoring unknown expansion type {}", (Object)expansion);
                    }
                    out.put(connector, connectorExpansions);
                }
                catch (NotFoundException e) {
                    log.debug("Unable to get connector info for {} on this worker", (Object)connector);
                }
            }
            return Response.ok(out).build();
        }
        return Response.ok(this.herder.connectors()).build();
    }

    @POST
    @Path(value="/")
    @Operation(summary="Create a new connector")
    public Response createConnector(@Parameter(hidden=true) @QueryParam(value="forward") Boolean forward, @Context HttpHeaders headers, CreateConnectorRequest createRequest) throws Throwable {
        String name = createRequest.name() == null ? "" : createRequest.name().trim();
        Map<String, String> configs = createRequest.config();
        this.checkAndPutConnectorConfigName(name, configs);
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.herder.putConnectorConfig(name, configs, false, cb);
        Herder.Created<ConnectorInfo> info = this.completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest, new TypeReference<ConnectorInfo>(){}, new CreatedConnectorInfoTranslator(), forward);
        URI location = UriBuilder.fromUri((String)"/connectors").path(name).build(new Object[0]);
        return Response.created((URI)location).entity((Object)info.result()).build();
    }

    @GET
    @Path(value="/{connector}")
    @Operation(summary="Get the details for the specified connector")
    public ConnectorInfo getConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<ConnectorInfo> cb = new FutureCallback<ConnectorInfo>();
        this.herder.connectorInfo(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/config")
    @Operation(summary="Get the configuration for the specified connector")
    public Map<String, String> getConnectorConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Map<String, String>> cb = new FutureCallback<Map<String, String>>();
        this.herder.connectorConfig(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/tasks-config")
    @Operation(summary="Get the configuration of all tasks for the specified connector")
    public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<Map<ConnectorTaskId, Map<String, String>>>();
        this.herder.tasksConfig(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/status")
    @Operation(summary="Get the status for the specified connector")
    public ConnectorStateInfo getConnectorStatus(@PathParam(value="connector") String connector) {
        return this.herder.connectorStatus(connector);
    }

    @GET
    @Path(value="/{connector}/topics")
    @Operation(summary="Get the list of topics actively used by the specified connector")
    public Response getConnectorActiveTopics(@PathParam(value="connector") String connector) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        ActiveTopicsInfo info = this.herder.connectorActiveTopics(connector);
        return Response.ok(Collections.singletonMap(info.connector(), info)).build();
    }

    @PUT
    @Path(value="/{connector}/topics/reset")
    @Operation(summary="Reset the list of topics actively used by the specified connector")
    public Response resetConnectorActiveTopics(@PathParam(value="connector") String connector, @Context HttpHeaders headers) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        if (this.isTopicTrackingResetDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking reset is disabled.");
        }
        this.herder.resetConnectorActiveTopics(connector);
        return Response.accepted().build();
    }

    @PUT
    @Path(value="/{connector}/config")
    @Operation(summary="Create or reconfigure the specified connector")
    public Response putConnectorConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward, Map<String, String> connectorConfig) throws Throwable {
        Response.ResponseBuilder response;
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.checkAndPutConnectorConfigName(connector, connectorConfig);
        this.herder.putConnectorConfig(connector, connectorConfig, true, cb);
        Herder.Created<ConnectorInfo> createdInfo = this.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>(){}, new CreatedConnectorInfoTranslator(), forward);
        if (createdInfo.created()) {
            URI location = UriBuilder.fromUri((String)"/connectors").path(connector).build(new Object[0]);
            response = Response.created((URI)location);
        } else {
            response = Response.ok();
        }
        return response.entity((Object)createdInfo.result()).build();
    }

    @POST
    @Path(value="/{connector}/restart")
    @Operation(summary="Restart the specified connector")
    public Response restartConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @DefaultValue(value="false") @QueryParam(value="includeTasks") @Parameter(description="Whether to also restart tasks") Boolean includeTasks, @DefaultValue(value="false") @QueryParam(value="onlyFailed") @Parameter(description="Whether to only restart failed tasks/connectors") Boolean onlyFailed, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks);
        String forwardingPath = "/connectors/" + connector + "/restart";
        if (restartRequest.forceRestartConnectorOnly()) {
            FutureCallback<Void> cb = new FutureCallback<Void>();
            this.herder.restartConnector(connector, cb);
            this.completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
            return Response.noContent().build();
        }
        FutureCallback<ConnectorStateInfo> cb = new FutureCallback<ConnectorStateInfo>();
        this.herder.restartConnectorAndTasks(restartRequest, cb);
        HashMap<String, String> queryParameters = new HashMap<String, String>();
        queryParameters.put("includeTasks", includeTasks.toString());
        queryParameters.put("onlyFailed", onlyFailed.toString());
        ConnectorStateInfo stateInfo = this.completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>(){}, new IdentityTranslator(), forward);
        return Response.accepted().entity((Object)stateInfo).build();
    }

    @PUT
    @Path(value="/{connector}/pause")
    @Operation(summary="Pause the specified connector", description="This operation is idempotent and has no effects if the connector is already paused")
    public Response pauseConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers) {
        this.herder.pauseConnector(connector);
        return Response.accepted().build();
    }

    @PUT
    @Path(value="/{connector}/resume")
    @Operation(summary="Resume the specified connector", description="This operation is idempotent and has no effects if the connector is already running")
    public Response resumeConnector(@PathParam(value="connector") String connector) {
        this.herder.resumeConnector(connector);
        return Response.accepted().build();
    }

    @GET
    @Path(value="/{connector}/tasks")
    @Operation(summary="List all tasks for the specified connector")
    public List<TaskInfo> getTaskConfigs(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<List<TaskInfo>> cb = new FutureCallback<List<TaskInfo>>();
        this.herder.taskConfigs(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>(){}, forward);
    }

    @POST
    @Path(value="/{connector}/tasks")
    @Operation(hidden=true, summary="This operation is only for inter-worker communications")
    public void putTaskConfigs(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward, byte[] requestBody) throws Throwable {
        List taskConfigs = (List)new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
        FutureCallback<Void> cb = new FutureCallback<Void>();
        this.herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
        this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
    }

    @PUT
    @Path(value="/{connector}/fence")
    @Operation(hidden=true, summary="This operation is only for inter-worker communications")
    public void fenceZombies(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward, byte[] requestBody) throws Throwable {
        FutureCallback<Void> cb = new FutureCallback<Void>();
        this.herder.fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
        this.completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward);
    }

    @GET
    @Path(value="/{connector}/tasks/{task}/status")
    @Operation(summary="Get the state of the specified task for the specified connector")
    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @PathParam(value="task") Integer task) {
        return this.herder.taskStatus(new ConnectorTaskId(connector, task));
    }

    @POST
    @Path(value="/{connector}/tasks/{task}/restart")
    @Operation(summary="Restart the specified task for the specified connector")
    public void restartTask(@PathParam(value="connector") String connector, @PathParam(value="task") Integer task, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Void> cb = new FutureCallback<Void>();
        ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
        this.herder.restartTask(taskId, cb);
        this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
    }

    @DELETE
    @Path(value="/{connector}")
    @Operation(summary="Delete the specified connector")
    public void destroyConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @Parameter(hidden=true) @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.herder.deleteConnectorConfig(connector, cb);
        this.completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
    }

    private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
        String includedName = connectorConfig.get("name");
        if (includedName != null) {
            if (!includedName.equals(connectorName)) {
                throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connectorName + ")");
            }
        } else {
            connectorConfig.put("name", connectorName);
        }
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Map<String, String> queryParameters, Object body, TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
        try {
            return cb.get(this.requestTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RequestTargetException) {
                if (this.restClient == null) {
                    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Cannot complete request as non-leader with request forwarding disabled");
                }
                if (forward == null || forward.booleanValue()) {
                    boolean recursiveForward = forward == null;
                    RequestTargetException targetException = (RequestTargetException)((Object)cause);
                    String forwardedUrl = targetException.forwardUrl();
                    if (forwardedUrl == null) {
                        throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to no known leader URL, likely because a rebalance was underway.");
                    }
                    UriBuilder uriBuilder = UriBuilder.fromUri((String)forwardedUrl).path(path).queryParam("forward", new Object[]{recursiveForward});
                    if (queryParameters != null) {
                        queryParameters.forEach((x$0, xva$1) -> uriBuilder.queryParam(x$0, new Object[]{xva$1}));
                    }
                    String forwardUrl = uriBuilder.build(new Object[0]).toString();
                    log.debug("Forwarding request {} {} {}", new Object[]{forwardUrl, method, body});
                    return translator.translate(this.restClient.httpRequest(forwardUrl, method, headers, body, resultType));
                }
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
            }
            if (cause instanceof RebalanceNeededException) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
            }
            throw cause;
        }
        catch (TimeoutException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
        }
        catch (InterruptedException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
        }
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, TypeReference<T> resultType, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator(), forward);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator(), forward);
    }

    private static class CreatedConnectorInfoTranslator
    implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
        private CreatedConnectorInfoTranslator() {
        }

        @Override
        public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> response) {
            boolean created = response.status() == 201;
            return new Herder.Created<ConnectorInfo>(created, response.body());
        }
    }

    private static class IdentityTranslator<T>
    implements Translator<T, T> {
        private IdentityTranslator() {
        }

        @Override
        public T translate(RestClient.HttpResponse<T> response) {
            return response.body();
        }
    }

    private static interface Translator<T, U> {
        public T translate(RestClient.HttpResponse<U> var1);
    }
}

