/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.telemetry.events.exporter.http;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.MessageLite;
import io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClient;
import io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientBatchResult;
import io.confluent.telemetry.client.Credentials;
import io.confluent.telemetry.client.ProxyConfig;
import io.confluent.telemetry.events.exporter.Exporter;
import io.confluent.telemetry.events.exporter.ExporterConfig;
import io.confluent.telemetry.events.exporter.http.HttpExporterConfig;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpExporter<Data, Req extends MessageLite, Resp extends MessageLite>
implements Exporter<Data> {
    private static final Logger log = LoggerFactory.getLogger(HttpExporter.class);
    protected Function<Collection<Data>, Req> requestConverter;
    protected Function<ByteBuffer, Resp> responseDeserializer;
    protected BufferingAsyncTelemetryHttpClient<Data, Req, Resp> bufferingClient;
    protected String endpoint;
    private HttpExporterConfig config;
    private volatile Credentials credentials;
    private volatile ProxyConfig proxyConfig;

    public void configure(Map<String, ?> configs) {
        this.config = new HttpExporterConfig(configs);
        this.setClientAndSubscribe(this.config.getBufferingAsyncClientBuilder().setClient(this.config.getClientBuilder().setResponseDeserializer(this.responseDeserializer).setEndpoint(this.endpoint).setCredentialsSupplier(() -> this.credentials).setProxyConfigSupplier(() -> this.proxyConfig).build()).setCreateRequestFn(this.requestConverter).build());
        this.bufferingClient.getBatchResults().doOnNext(this::trackResponses);
        this.setDynamicFields(this.config);
    }

    private void setDynamicFields(HttpExporterConfig config) {
        this.credentials = config.getCredentials();
        this.proxyConfig = config.getProxyConfig();
    }

    @VisibleForTesting
    void setClientAndSubscribe(BufferingAsyncTelemetryHttpClient<Data, Req, Resp> bufferingClient) {
        this.bufferingClient = bufferingClient;
        this.bufferingClient.getBatchResults().doOnNext(this::trackResponses);
    }

    private void trackResponses(BufferingAsyncTelemetryHttpClientBatchResult<Data, Resp> batchResult) {
        if (!batchResult.isSuccess()) {
            log.error("Confluent Telemetry Failure", batchResult.getThrowable());
        }
    }

    @Override
    public void emit(Data data) throws RuntimeException {
        if (this.credentials == null) {
            return;
        }
        this.bufferingClient.submit(Collections.singleton(data));
    }

    @Override
    public void close() throws IOException {
        this.bufferingClient.close();
    }

    public void reconfigure(Map<String, ?> configs) {
        this.setDynamicFields(new HttpExporterConfig(configs));
    }

    public Set<String> reconfigurableConfigs() {
        return Sets.union(ExporterConfig.RECONFIGURABLES, HttpExporterConfig.RECONFIGURABLE_CONFIGS);
    }

    public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
    }
}

