/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Verify;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileBasedSource<T>
extends OffsetBasedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
    private final ValueProvider<String> fileOrPatternSpec;
    private final EmptyMatchTreatment emptyMatchTreatment;
    private  @Nullable MatchResult.Metadata singleFileMetadata;
    private final Mode mode;

    protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment, long minBundleSize) {
        super(0L, Long.MAX_VALUE, minBundleSize);
        this.mode = Mode.FILEPATTERN;
        this.emptyMatchTreatment = emptyMatchTreatment;
        this.fileOrPatternSpec = fileOrPatternSpec;
    }

    protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
        this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
    }

    protected FileBasedSource(MatchResult.Metadata fileMetadata, long minBundleSize, long startOffset, long endOffset) {
        super(startOffset, endOffset, minBundleSize);
        this.mode = Mode.SINGLE_FILE_OR_SUBRANGE;
        this.singleFileMetadata = Preconditions.checkNotNull(fileMetadata, "fileMetadata");
        this.fileOrPatternSpec = ValueProvider.StaticValueProvider.of(fileMetadata.resourceId().toString());
        this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
    }

    public final MatchResult.Metadata getSingleFileMetadata() {
        Preconditions.checkArgument(this.mode == Mode.SINGLE_FILE_OR_SUBRANGE, "This function should only be called for a single file, not %s", (Object)this);
        Preconditions.checkState(this.singleFileMetadata != null, "It should not be possible to construct a %s in mode %s with null metadata: %s", FileBasedSource.class, (Object)this.mode, (Object)this);
        return this.singleFileMetadata;
    }

    public final String getFileOrPatternSpec() {
        return this.fileOrPatternSpec.get();
    }

    public final ValueProvider<String> getFileOrPatternSpecProvider() {
        return this.fileOrPatternSpec;
    }

    public final EmptyMatchTreatment getEmptyMatchTreatment() {
        return this.emptyMatchTreatment;
    }

    public final Mode getMode() {
        return this.mode;
    }

    @Override
    public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
        Preconditions.checkArgument(start >= this.getStartOffset(), "Start offset value %s of the subrange cannot be smaller than the start offset value %s of the parent source", start, this.getStartOffset());
        Preconditions.checkArgument(end <= this.getEndOffset(), "End offset value %s of the subrange cannot be larger than the end offset value %s", end, this.getEndOffset());
        Preconditions.checkState(this.singleFileMetadata != null, "A single file source should not have null metadata: %s", (Object)this);
        FileBasedSource<T> source = this.createForSubrangeOfFile(this.singleFileMetadata, start, end);
        if (start > 0L || end != Long.MAX_VALUE) {
            Preconditions.checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [%s,%s) must be a subrange source", start, end);
        }
        return source;
    }

    protected abstract FileBasedSource<T> createForSubrangeOfFile(MatchResult.Metadata var1, long var2, long var4);

    protected abstract FileBasedReader<T> createSingleFileReader(PipelineOptions var1);

    @Override
    public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
        String fileOrPattern = this.fileOrPatternSpec.get();
        if (this.mode == Mode.FILEPATTERN) {
            long totalSize = 0L;
            List<MatchResult.Metadata> allMatches = FileSystems.match(fileOrPattern, this.emptyMatchTreatment).metadata();
            for (MatchResult.Metadata metadata : allMatches) {
                totalSize += metadata.sizeBytes();
            }
            LOG.info("Filepattern {} matched {} files with total size {}", fileOrPattern, allMatches.size(), totalSize);
            return totalSize;
        }
        long start = this.getStartOffset();
        long end = Math.min(this.getEndOffset(), this.getMaxEndOffset(options));
        return end - start;
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        if (this.mode == Mode.FILEPATTERN) {
            builder.add(DisplayData.item("filePattern", this.getFileOrPatternSpecProvider()).withLabel("File Pattern"));
        }
    }

    @Override
    public final List<? extends FileBasedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
        String fileOrPattern = this.fileOrPatternSpec.get();
        if (this.mode == Mode.FILEPATTERN) {
            long startTime = System.currentTimeMillis();
            List<MatchResult.Metadata> expandedFiles = FileSystems.match(fileOrPattern, this.emptyMatchTreatment).metadata();
            ArrayList<FileBasedSource<T>> splitResults = new ArrayList<FileBasedSource<T>>(expandedFiles.size());
            for (MatchResult.Metadata metadata : expandedFiles) {
                FileBasedSource<T> split = this.createForSubrangeOfFile(metadata, 0L, metadata.sizeBytes());
                Verify.verify(split.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "%s.createForSubrangeOfFile must return a source in mode %s", split, (Object)Mode.SINGLE_FILE_OR_SUBRANGE);
                splitResults.addAll(split.split(desiredBundleSizeBytes, options));
            }
            LOG.info("Splitting filepattern {} into bundles of size {} took {} ms and produced {} files and {} bundles", fileOrPattern, desiredBundleSizeBytes, System.currentTimeMillis() - startTime, expandedFiles.size(), splitResults.size());
            return splitResults;
        }
        if (this.isSplittable()) {
            List splits = super.split(desiredBundleSizeBytes, options);
            return splits;
        }
        LOG.debug("The source for file {} is not split into sub-range based sources since the file is not seekable", (Object)fileOrPattern);
        return ImmutableList.of(this);
    }

    protected boolean isSplittable() throws Exception {
        if (this.mode == Mode.FILEPATTERN) {
            return true;
        }
        return this.getSingleFileMetadata().isReadSeekEfficient();
    }

    @Override
    public final BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
        this.validate();
        String fileOrPattern = this.fileOrPatternSpec.get();
        if (this.mode == Mode.FILEPATTERN) {
            long startTime = System.currentTimeMillis();
            List<MatchResult.Metadata> fileMetadata = FileSystems.match(fileOrPattern, this.emptyMatchTreatment).metadata();
            LOG.info("Matched {} files for pattern {}", (Object)fileMetadata.size(), (Object)fileOrPattern);
            ArrayList fileReaders = new ArrayList();
            for (MatchResult.Metadata metadata : fileMetadata) {
                long endOffset = metadata.sizeBytes();
                fileReaders.add(this.createForSubrangeOfFile(metadata, 0L, endOffset).createSingleFileReader(options));
            }
            LOG.debug("Creating a reader for file pattern {} took {} ms", (Object)fileOrPattern, (Object)(System.currentTimeMillis() - startTime));
            if (fileReaders.size() == 1) {
                return (BoundedSource.BoundedReader)fileReaders.get(0);
            }
            return new FilePatternReader(this, fileReaders);
        }
        return this.createSingleFileReader(options);
    }

    @Override
    public String toString() {
        switch (this.mode) {
            case FILEPATTERN: {
                return this.fileOrPatternSpec.toString();
            }
            case SINGLE_FILE_OR_SUBRANGE: {
                return this.fileOrPatternSpec + " range " + super.toString();
            }
        }
        throw new IllegalStateException("Unexpected mode: " + (Object)((Object)this.mode));
    }

    @Override
    public void validate() {
        super.validate();
        switch (this.mode) {
            case FILEPATTERN: {
                Preconditions.checkArgument(this.getStartOffset() == 0L, "FileBasedSource is based on a file pattern or a full single file but the starting offset proposed %s is not zero", this.getStartOffset());
                Preconditions.checkArgument(this.getEndOffset() == Long.MAX_VALUE, "FileBasedSource is based on a file pattern or a full single file but the ending offset proposed %s is not Long.MAX_VALUE", this.getEndOffset());
                break;
            }
            case SINGLE_FILE_OR_SUBRANGE: {
                break;
            }
            default: {
                throw new IllegalStateException("Unknown mode: " + (Object)((Object)this.mode));
            }
        }
    }

    @Override
    public final long getMaxEndOffset(PipelineOptions options) throws IOException {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
        MatchResult.Metadata metadata = this.getSingleFileMetadata();
        return metadata.sizeBytes();
    }

    private class FilePatternReader
    extends BoundedSource.BoundedReader<T> {
        private final FileBasedSource<T> source;
        private final List<FileBasedReader<T>> fileReaders;
        final ListIterator<FileBasedReader<T>> fileReadersIterator;
        @Nullable FileBasedReader<T> currentReader = null;

        public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
            this.source = source;
            this.fileReaders = fileReaders;
            this.fileReadersIterator = fileReaders.listIterator();
        }

        @Override
        public boolean start() throws IOException {
            return this.startNextNonemptyReader();
        }

        @Override
        public boolean advance() throws IOException {
            Preconditions.checkState(this.currentReader != null, "Call start() before advance()");
            if (this.currentReader.advance()) {
                return true;
            }
            return this.startNextNonemptyReader();
        }

        private boolean startNextNonemptyReader() throws IOException {
            while (this.fileReadersIterator.hasNext()) {
                this.currentReader = this.fileReadersIterator.next();
                if (this.currentReader.start()) {
                    return true;
                }
                this.currentReader.close();
            }
            return false;
        }

        @Override
        public T getCurrent() throws NoSuchElementException {
            return this.currentReader.getCurrent();
        }

        @Override
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentReader.getCurrentTimestamp();
        }

        @Override
        public void close() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            while (this.fileReadersIterator.hasNext()) {
                this.fileReadersIterator.next().close();
            }
        }

        @Override
        public FileBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override
        public FileBasedSource<T> splitAtFraction(double fraction) {
            LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
            return null;
        }

        @Override
        public Double getFractionConsumed() {
            int numReaders;
            if (this.currentReader == null) {
                return 0.0;
            }
            if (this.fileReaders.isEmpty()) {
                return 1.0;
            }
            int index = this.fileReadersIterator.previousIndex();
            if (index == (numReaders = this.fileReaders.size())) {
                return 1.0;
            }
            double before = 1.0 * (double)index / (double)numReaders;
            double after = 1.0 * (double)(index + 1) / (double)numReaders;
            Double fractionOfCurrentReader = this.currentReader.getFractionConsumed();
            if (fractionOfCurrentReader == null) {
                return before;
            }
            return before + fractionOfCurrentReader * (after - before);
        }
    }

    public static abstract class FileBasedReader<T>
    extends OffsetBasedSource.OffsetBasedReader<T> {
        private @Nullable ReadableByteChannel channel = null;

        public FileBasedReader(FileBasedSource<T> source) {
            super(source);
            Preconditions.checkArgument(source.getMode() != Mode.FILEPATTERN, "FileBasedReader does not support reading file patterns");
        }

        @Override
        public synchronized FileBasedSource<T> getCurrentSource() {
            return (FileBasedSource)super.getCurrentSource();
        }

        @Override
        protected final boolean startImpl() throws IOException {
            OffsetBasedSource source = this.getCurrentSource();
            this.channel = FileSystems.open(((FileBasedSource)source).getSingleFileMetadata().resourceId());
            if (this.channel instanceof SeekableByteChannel) {
                SeekableByteChannel seekChannel = (SeekableByteChannel)this.channel;
                seekChannel.position(source.getStartOffset());
            } else {
                Preconditions.checkArgument(((FileBasedSource)source).mode != Mode.SINGLE_FILE_OR_SUBRANGE, "Subrange-based sources must only be defined for file types that support seekable  read channels");
                Preconditions.checkArgument(source.getStartOffset() == 0L, "Start offset %s is not zero but channel for reading the file is not seekable.", source.getStartOffset());
            }
            this.startReading(this.channel);
            return this.advanceImpl();
        }

        @Override
        protected final boolean advanceImpl() throws IOException {
            return this.readNextRecord();
        }

        @Override
        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
            }
        }

        @Override
        public boolean allowsDynamicSplitting() {
            try {
                return ((FileBasedSource)this.getCurrentSource()).isSplittable();
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Error determining if %s allows dynamic splitting", this), e);
            }
        }

        protected abstract void startReading(ReadableByteChannel var1) throws IOException;

        protected abstract boolean readNextRecord() throws IOException;
    }

    public static enum Mode {
        FILEPATTERN,
        SINGLE_FILE_OR_SUBRANGE;

    }
}

