/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.hadoop.io.bigquery;

import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.client.http.InputStreamContent;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
import com.google.cloud.hadoop.io.bigquery.BigQueryHelper;
import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import com.google.cloud.hadoop.util.HadoopToStringUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryRecordWriter<K, V extends JsonObject>
extends RecordWriter<K, V> {
    public static final Logger LOG = LoggerFactory.getLogger(BigQueryRecordWriter.class);
    private static Counters counters = new Counters();
    private ClientRequestHelper<Job> clientRequestHelper;
    private final Configuration configuration;
    private final Progressable progressable;
    private Gson gson;
    private BigQueryAsyncWriteChannel byteChannel;
    private ExecutorService threadPool = Executors.newCachedThreadPool();
    private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
    private long bytesWritten = 0L;

    public BigQueryRecordWriter(BigQueryFactory factory, ExecutorService threadPool, ClientRequestHelper<Job> clientRequestHelper, Configuration configuration, Progressable progressable, String taskIdentifier, List<TableFieldSchema> outputRecordSchema, String projectId, TableReference tableRef, int writeBufferSize) throws IOException {
        BigQueryHelper bigQueryHelper;
        LOG.debug("Intialize with projectId: '{}', tableRef: '{}', writeBufferSize: {}", new Object[]{projectId, BigQueryStrings.toString(tableRef), writeBufferSize});
        Preconditions.checkArgument((outputRecordSchema != null ? 1 : 0) != 0, (Object)"outputRecordSchema should not be not null.");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)projectId) ? 1 : 0) != 0, (Object)"projectId should not be not null or empty.");
        Preconditions.checkArgument((tableRef != null ? 1 : 0) != 0, (Object)"tableRef must not be null.");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)tableRef.getProjectId()) ? 1 : 0) != 0, (Object)"tableRef.getProjectId() should not be not null or empty.");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)tableRef.getDatasetId()) ? 1 : 0) != 0, (Object)"tableRef.getDatasetId() should not be not null or empty.");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)tableRef.getTableId()) ? 1 : 0) != 0, (Object)"tableRef.getTableId() should not be not null or empty.");
        Preconditions.checkArgument((writeBufferSize > 0 ? 1 : 0) != 0, (Object)"numRecordsInBatch should be positive.");
        this.gson = new Gson();
        this.configuration = configuration;
        this.progressable = progressable;
        this.threadPool = threadPool;
        this.clientRequestHelper = clientRequestHelper;
        try {
            bigQueryHelper = factory.getBigQueryHelper(configuration);
        }
        catch (GeneralSecurityException e) {
            LOG.error("Could not connect to BigQuery:", (Throwable)e);
            throw new IOException(e);
        }
        JobConfigurationLoad loadConfig = new JobConfigurationLoad();
        loadConfig.setCreateDisposition("CREATE_IF_NEEDED");
        loadConfig.setWriteDisposition("WRITE_TRUNCATE");
        loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");
        loadConfig.setDestinationTable(tableRef);
        TableSchema schema = new TableSchema();
        schema.setFields(outputRecordSchema);
        loadConfig.setSchema(schema);
        JobConfiguration jobConfig = new JobConfiguration();
        jobConfig.setLoad(loadConfig);
        JobReference jobRef = bigQueryHelper.createJobReference(projectId, taskIdentifier);
        Job outputJob = new Job();
        outputJob.setConfiguration(jobConfig);
        outputJob.setJobReference(jobRef);
        this.byteChannel = this.createByteChannel(bigQueryHelper, outputJob, projectId, writeBufferSize);
    }

    public BigQueryRecordWriter(Configuration configuration, Progressable progressable, String taskIdentifier, List<TableFieldSchema> outputRecordSchema, String projectId, TableReference tableRef, int writeBufferSize) throws IOException {
        this(new BigQueryFactory(), Executors.newCachedThreadPool(), (ClientRequestHelper<Job>)new ClientRequestHelper(), configuration, progressable, taskIdentifier, outputRecordSchema, projectId, tableRef, writeBufferSize);
    }

    private BigQueryAsyncWriteChannel createByteChannel(BigQueryHelper bigQueryHelper, Job outputJob, String projectId, int writeBufferSize) throws IOException {
        if (this.configuration.getBoolean("mapred.bq.output.async.write.enabled", true)) {
            LOG.debug("Using asynchronous write channel.");
        } else {
            LOG.warn("Got 'false' for obsolete key '{}', using asynchronous write channel anyway.", (Object)"mapred.bq.output.async.write.enabled");
        }
        AsyncWriteChannelOptions options = AsyncWriteChannelOptions.newBuilder().setUploadBufferSize(writeBufferSize).build();
        BigQueryAsyncWriteChannel channel = new BigQueryAsyncWriteChannel(bigQueryHelper, outputJob, projectId, options);
        channel.initialize();
        return channel;
    }

    public void write(K key, V value) throws IOException {
        long startTime = System.nanoTime();
        String stringValue = this.gson.toJson(value) + "\n";
        byte[] valueBytes = stringValue.getBytes(StandardCharsets.UTF_8);
        this.bytesWritten += (long)valueBytes.length;
        this.byteChannel.write(ByteBuffer.wrap(valueBytes));
        long duration = System.nanoTime() - startTime;
        BigQueryRecordWriter.increment(Counter.BYTES_WRITTEN, valueBytes.length);
        BigQueryRecordWriter.increment(Counter.WRITE_CALLS);
        BigQueryRecordWriter.increment(Counter.WRITE_TOTAL_TIME, duration);
    }

    public void close(TaskAttemptContext context) throws IOException {
        long startTime = System.nanoTime();
        if (LOG.isDebugEnabled()) {
            LOG.debug("close({})", (Object)HadoopToStringUtil.toString((TaskAttemptContext)context));
        }
        this.threadPool.shutdown();
        this.byteChannel.close();
        long duration = System.nanoTime() - startTime;
        BigQueryRecordWriter.increment(Counter.CLOSE_CALLS);
        BigQueryRecordWriter.increment(Counter.CLOSE_TOTAL_TIME, duration);
    }

    public long getBytesWritten() {
        return this.bytesWritten;
    }

    @VisibleForTesting
    void setErrorExtractor(ApiErrorExtractor errorExtractor) {
        this.errorExtractor = errorExtractor;
    }

    static void increment(Counter key) {
        BigQueryRecordWriter.increment(key, 1L);
    }

    static void increment(Counter key, long value) {
        counters.incrCounter((Enum)key, value);
    }

    public static String countersToString() {
        StringBuilder sb = new StringBuilder();
        sb.append("\n");
        double numNanoSecPerMS = TimeUnit.MILLISECONDS.toNanos(1L);
        String callsSuffix = "_CALLS";
        int callsLength = callsSuffix.length();
        String timeSuffix = "_TIME";
        String totalTimeSuffix = "_TOTAL" + timeSuffix;
        String avgTimeSuffix = "_AVG" + timeSuffix;
        for (Counter c : Counter.values()) {
            String name = c.toString();
            if (name.endsWith(callsSuffix)) {
                String prefix = name.substring(0, name.length() - callsLength);
                long count = counters.getCounter((Enum)c);
                sb.append(String.format("%20s = %d\n", name, count));
                String timeName = prefix + totalTimeSuffix;
                double totalTime = (double)counters.getCounter((Enum)Enum.valueOf(Counter.class, timeName)) / numNanoSecPerMS;
                sb.append(String.format("%20s = %.2f (ms)\n", timeName, totalTime));
                String avgName = prefix + avgTimeSuffix;
                double avg = totalTime / (double)count;
                sb.append(String.format("%20s = %.2f (ms)\n", avgName, avg));
                continue;
            }
            if (name.endsWith(timeSuffix)) continue;
            long count = counters.getCounter((Enum)c);
            sb.append(String.format("%20s = %d\n", name, count));
        }
        return sb.toString();
    }

    static void logCounters() {
        LOG.debug(BigQueryRecordWriter.countersToString());
    }

    private class BigQueryAsyncWriteChannel
    extends AbstractGoogleAsyncWriteChannel<Bigquery.Jobs.Insert, Job> {
        private final BigQueryHelper bigQueryHelper;
        private final Job job;
        private final String projectId;

        public BigQueryAsyncWriteChannel(BigQueryHelper bigQueryHelper, Job job, String projectId, AsyncWriteChannelOptions options) {
            super(BigQueryRecordWriter.this.threadPool, options);
            this.bigQueryHelper = bigQueryHelper;
            this.job = job;
            this.projectId = projectId;
            this.setClientRequestHelper(BigQueryRecordWriter.this.clientRequestHelper);
        }

        public Bigquery.Jobs.Insert createRequest(InputStreamContent inputStream) throws IOException {
            Bigquery.Jobs.Insert insert = this.bigQueryHelper.getRawBigquery().jobs().insert(this.projectId, this.job, (AbstractInputStreamContent)inputStream);
            insert.setProjectId(this.projectId);
            BigQueryRecordWriter.increment(Counter.JOBS_INSERTED);
            return insert;
        }

        public void handleResponse(Job response) throws IOException {
            this.bigQueryHelper.checkJobIdEquality(this.job, response);
            JobReference jobReference = response.getJobReference();
            try {
                BigQueryUtils.waitForJobCompletion(this.bigQueryHelper.getRawBigquery(), this.projectId, jobReference, BigQueryRecordWriter.this.progressable);
            }
            catch (InterruptedException e) {
                LOG.error(e.getMessage());
                throw new IOException(e);
            }
        }

        public Job createResponseFromException(IOException ioe) {
            if (BigQueryRecordWriter.this.errorExtractor.itemAlreadyExists(ioe)) {
                return new Job().setJobReference(this.job.getJobReference());
            }
            return null;
        }
    }

    public static enum Counter {
        BYTES_WRITTEN,
        CLOSE_CALLS,
        CLOSE_TOTAL_TIME,
        JOBS_INSERTED,
        WRITE_CALLS,
        WRITE_TOTAL_TIME;

    }
}

