/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.dsbulk.connectors.commons;

import com.datastax.oss.dsbulk.config.ConfigUtils;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.io.CompressedIOUtils;
import com.datastax.oss.dsbulk.io.IOUtils;
import com.typesafe.config.Config;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public abstract class AbstractFileBasedConnector
implements Connector {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFileBasedConnector.class);
    protected static final String URL = "url";
    protected static final String URLFILE = "urlfile";
    protected static final String COMPRESSION = "compression";
    protected static final String ENCODING = "encoding";
    protected static final String FILE_NAME_PATTERN = "fileNamePattern";
    protected static final String SKIP_RECORDS = "skipRecords";
    protected static final String MAX_RECORDS = "maxRecords";
    protected static final String MAX_CONCURRENT_FILES = "maxConcurrentFiles";
    protected static final String RECURSIVE = "recursive";
    protected static final String FILE_NAME_FORMAT = "fileNameFormat";
    protected boolean read;
    protected boolean retainRecordSources;
    protected List<URL> urls;
    protected List<Path> roots = new ArrayList<Path>();
    protected List<URL> files = new ArrayList<URL>();
    protected Charset encoding;
    protected String compression;
    protected String fileNameFormat;
    protected boolean recursive;
    protected String pattern;
    protected long skipRecords;
    protected long maxRecords;
    protected int resourceCount;
    protected int maxConcurrentFiles;
    protected Deque<RecordWriter> writers;
    protected RecordWriter singleWriter;
    protected AtomicInteger fileCounter;
    protected AtomicInteger nextWriterIndex;

    public int readConcurrency() {
        assert (this.read);
        return Math.max(1, Math.min(this.resourceCount, this.maxConcurrentFiles));
    }

    public int writeConcurrency() {
        assert (!this.read);
        if (this.roots.isEmpty()) {
            return 1;
        }
        return this.maxConcurrentFiles;
    }

    public void configure(@NonNull Config settings, boolean read, boolean retainRecordSources) {
        this.read = read;
        this.retainRecordSources = retainRecordSources;
        this.urls = this.loadURLs(settings);
        this.encoding = ConfigUtils.getCharset((Config)settings, (String)ENCODING);
        this.compression = settings.getString(COMPRESSION);
        if (!CompressedIOUtils.isSupportedCompression((String)this.compression, (boolean)read).booleanValue()) {
            throw new IllegalArgumentException(String.format("Invalid value for connector.csv.%s, valid values: %s, got: '%s'", COMPRESSION, String.join((CharSequence)",", CompressedIOUtils.getSupportedCompressions((boolean)read)), this.compression));
        }
        this.pattern = settings.getString(FILE_NAME_PATTERN);
        if (!CompressedIOUtils.isNoneCompression((String)this.compression) && ConfigUtils.hasReferenceValue((Config)settings, (String)FILE_NAME_PATTERN)) {
            this.pattern = this.pattern + CompressedIOUtils.getCompressionSuffix((String)this.compression);
        }
        this.fileNameFormat = settings.getString(FILE_NAME_FORMAT);
        if (!CompressedIOUtils.isNoneCompression((String)this.compression) && ConfigUtils.hasReferenceValue((Config)settings, (String)FILE_NAME_FORMAT)) {
            this.fileNameFormat = this.fileNameFormat + CompressedIOUtils.getCompressionSuffix((String)this.compression);
        }
        this.recursive = settings.getBoolean(RECURSIVE);
        this.maxConcurrentFiles = "AUTO".equals(settings.getString(MAX_CONCURRENT_FILES)) ? ConfigUtils.resolveThreads((String)(read ? "1C" : "0.5C")) : ConfigUtils.getThreads((Config)settings, (String)MAX_CONCURRENT_FILES);
        this.skipRecords = settings.getLong(SKIP_RECORDS);
        this.maxRecords = settings.getLong(MAX_RECORDS);
    }

    public void init() throws URISyntaxException, IOException {
        if (this.read) {
            this.processURLsForRead();
        } else {
            this.processURLsForWrite();
            this.fileCounter = new AtomicInteger(0);
            this.nextWriterIndex = new AtomicInteger(0);
            if (!this.roots.isEmpty() && this.maxConcurrentFiles > 1) {
                this.writers = new ConcurrentLinkedDeque<RecordWriter>();
                for (int i = 0; i < this.maxConcurrentFiles; ++i) {
                    this.writers.add(this.newSingleFileWriter());
                }
            } else {
                this.singleWriter = this.newSingleFileWriter();
            }
        }
    }

    @NonNull
    public Publisher<Publisher<Record>> read() {
        assert (this.read);
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.roots).flatMap(this::scanRootDirectory), Flux.fromIterable(this.files)}).map(url -> this.readSingleFile((URL)url).transform(this::applyPerFileLimits));
    }

    @NonNull
    public Function<Publisher<Record>, Publisher<Record>> write() {
        assert (!this.read);
        if (!this.roots.isEmpty() && this.maxConcurrentFiles > 1) {
            return records -> Flux.from((Publisher)records).concatMap(record -> Mono.deferContextual(ctx -> {
                try {
                    RecordWriter writer = (RecordWriter)ctx.get((Object)"WRITER");
                    writer.write((Record)record);
                    return Mono.just((Object)record);
                }
                catch (Exception e) {
                    return Mono.error((Throwable)e);
                }
            }), 500).concatWith((Publisher)Mono.deferContextual(ctx -> {
                try {
                    RecordWriter writer = (RecordWriter)ctx.get((Object)"WRITER");
                    writer.flush();
                    this.writers.offer(writer);
                    return Mono.empty();
                }
                catch (Exception e) {
                    return Mono.error((Throwable)e);
                }
            })).contextWrite(ctx -> ctx.put((Object)"WRITER", (Object)this.writers.remove()));
        }
        return records -> Flux.from((Publisher)records).concatMap(record -> {
            try {
                this.singleWriter.write((Record)record);
                return Mono.just((Object)record);
            }
            catch (Exception e) {
                return Mono.error((Throwable)e);
            }
        }, 500).concatWith((Publisher)Flux.create(sink -> {
            try {
                this.singleWriter.flush();
                sink.complete();
            }
            catch (Exception e) {
                sink.error((Throwable)e);
            }
        }));
    }

    public void close() {
        if (this.writers != null) {
            IOException e = null;
            for (RecordWriter writer : this.writers) {
                try {
                    writer.flush();
                    writer.close();
                }
                catch (IOException e1) {
                    if (e == null) {
                        e = e1;
                        continue;
                    }
                    e.addSuppressed(e1);
                }
            }
            if (e != null) {
                throw new UncheckedIOException(e);
            }
        }
        if (this.singleWriter != null) {
            try {
                this.singleWriter.flush();
                this.singleWriter.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    @NonNull
    protected abstract String getConnectorName();

    @NonNull
    protected Flux<Record> readSingleFile(@NonNull URL url) {
        return Flux.generate(() -> this.newSingleFileReader(url), RecordReader::readNext, recordReader -> {
            try {
                recordReader.close();
            }
            catch (IOException e) {
                LOGGER.error("Error closing " + url, (Throwable)e);
            }
        });
    }

    @NonNull
    protected abstract RecordReader newSingleFileReader(@NonNull URL var1) throws IOException;

    @NonNull
    protected abstract RecordWriter newSingleFileWriter();

    @NonNull
    protected List<URL> loadURLs(@NonNull Config settings) {
        boolean hasUrl = ConfigUtils.isPathPresentAndNotEmpty((Config)settings, (String)URL);
        boolean hasUrlfile = ConfigUtils.isPathPresentAndNotEmpty((Config)settings, (String)URLFILE);
        if (this.read) {
            if (!hasUrl && !hasUrlfile) {
                throw new IllegalArgumentException(String.format("A URL or URL file is mandatory when using the %s connector for LOAD. Please set connector.%s.url or connector.%s.urlfile and try again. See settings.md or help for more information.", this.getConnectorName(), this.getConnectorName(), this.getConnectorName()));
            }
            if (hasUrl && hasUrlfile) {
                LOGGER.debug("You specified both URL and URL file. The URL file will take precedence.");
            }
        } else {
            if (hasUrlfile) {
                throw new IllegalArgumentException("The urlfile parameter is not supported for UNLOAD");
            }
            if (!hasUrl) {
                throw new IllegalArgumentException(String.format("A URL is mandatory when using the %s connector for UNLOAD. Please set connector.%s.url and try again. See settings.md or help for more information.", this.getConnectorName(), this.getConnectorName()));
            }
        }
        if (hasUrlfile) {
            try {
                return ConfigUtils.getURLsFromFile((Path)ConfigUtils.getPath((Config)settings, (String)URLFILE));
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Problem when retrieving urls from file specified by the URL file parameter", e);
            }
        }
        return Collections.singletonList(ConfigUtils.getURL((Config)settings, (String)URL));
    }

    protected void processURLsForRead() throws URISyntaxException, IOException {
        this.resourceCount = 0;
        for (URL u : this.urls) {
            try {
                Path root = Paths.get(u.toURI());
                if (Files.isDirectory(root, new LinkOption[0])) {
                    if (!Files.isReadable(root)) {
                        throw new IllegalArgumentException(String.format("Directory is not readable: %s.", root));
                    }
                    this.roots.add(root);
                    int inDirectoryResourceCount = Objects.requireNonNull((Long)this.scanRootDirectory(root).take(1000L).count().block()).intValue();
                    if (inDirectoryResourceCount == 0) {
                        if (IOUtils.countReadableFiles((Path)root, (boolean)this.recursive) == 0L) {
                            LOGGER.warn("Directory {} has no readable files.", (Object)root);
                        } else {
                            LOGGER.warn("No files in directory {} matched the connector.{}.fileNamePattern of \"{}\".", new Object[]{root, this.getConnectorName(), this.pattern});
                        }
                    }
                    this.resourceCount += inDirectoryResourceCount;
                    continue;
                }
                ++this.resourceCount;
                this.files.add(u);
            }
            catch (FileSystemNotFoundException ignored) {
                this.files.add(u);
                ++this.resourceCount;
            }
        }
    }

    protected void processURLsForWrite() throws URISyntaxException, IOException {
        try {
            this.resourceCount = -1;
            Path root = Paths.get(this.urls.get(0).toURI());
            if (!Files.exists(root, new LinkOption[0])) {
                root = Files.createDirectories(root, new FileAttribute[0]);
            }
            if (Files.isDirectory(root, new LinkOption[0])) {
                if (!Files.isWritable(root)) {
                    throw new IllegalArgumentException(String.format("Directory is not writable: %s.", root));
                }
                if (IOUtils.isDirectoryNonEmpty((Path)root)) {
                    throw new IllegalArgumentException(String.format("Invalid value for connector.%s.url: target directory " + root + " must be empty.", this.getConnectorName()));
                }
                this.roots.add(root);
            }
        }
        catch (FileSystemNotFoundException fileSystemNotFoundException) {
            // empty catch block
        }
    }

    @NonNull
    protected Flux<URL> scanRootDirectory(@NonNull Path root) {
        try {
            Stream<Path> files = Files.walk(root, this.recursive ? Integer.MAX_VALUE : 1, new FileVisitOption[0]);
            PathMatcher matcher = root.getFileSystem().getPathMatcher("glob:" + this.pattern);
            return Flux.fromStream(files).filter(Files::isReadable).filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).filter(matcher::matches).map(file -> {
                try {
                    return file.toUri().toURL();
                }
                catch (MalformedURLException e) {
                    throw new UncheckedIOException(e);
                }
            });
        }
        catch (IOException e) {
            throw new UncheckedIOException("Error scanning directory " + root, e);
        }
    }

    @NonNull
    protected Flux<Record> applyPerFileLimits(@NonNull Flux<Record> records) {
        if (this.skipRecords > 0L) {
            records = records.skip(this.skipRecords);
        }
        if (this.maxRecords != -1L) {
            records = records.take(this.maxRecords);
        }
        return records;
    }

    @NonNull
    protected URL getOrCreateDestinationURL() {
        if (!this.roots.isEmpty()) {
            try {
                String next = String.format(this.fileNameFormat, this.fileCounter.incrementAndGet());
                return this.roots.get(0).resolve(next).toUri().toURL();
            }
            catch (MalformedURLException e) {
                throw new UncheckedIOException(String.format("Could not create file URL with format %s", this.fileNameFormat), e);
            }
        }
        return this.urls.get(0);
    }

    protected boolean isDataSizeSamplingAvailable() {
        return this.read && this.urls.stream().noneMatch(IOUtils::isStandardStream);
    }

    protected static interface RecordWriter
    extends AutoCloseable {
        public void write(@NonNull Record var1) throws IOException;

        public void flush() throws IOException;

        @Override
        public void close() throws IOException;
    }

    protected static interface RecordReader
    extends AutoCloseable {
        @NonNull
        public RecordReader readNext(@NonNull SynchronousSink<Record> var1);

        @Override
        public void close() throws IOException;
    }
}

