/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.parquet2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.CommonParquetRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.exec.store.parquet2.DrillParquetRecordMaterializer;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ColumnChunkIncReadStore;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DrillParquetReader
extends CommonParquetRecordReader {
    private static final Logger logger = LoggerFactory.getLogger(DrillParquetReader.class);
    private MessageType schema;
    private DrillFileSystem drillFileSystem;
    private RowGroupReadEntry entry;
    private ColumnChunkIncReadStore pageReadStore;
    private RecordReader<Void> recordReader;
    private DrillParquetRecordMaterializer recordMaterializer;
    private final int recordsPerBatch;
    private List<NullableIntVector> nullFilledVectors;
    private long totalRead = 0L;
    private boolean noColumnsFound;
    private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates;
    private final long numRecordsToRead;

    public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates, long recordsToRead) {
        super(footer, fragmentContext);
        this.containsCorruptedDates = containsCorruptedDates;
        this.drillFileSystem = fileSystem;
        this.entry = entry;
        this.setColumns(columns);
        this.recordsPerBatch = (int)fragmentContext.getOptions().getLong("store.parquet.complex.batch.num_records");
        this.numRecordsToRead = this.initNumRecordsToRead(recordsToRead, entry.getRowGroupIndex(), footer);
    }

    private static MessageType getProjection(MessageType schema, Collection<SchemaPath> projectionColumns, List<SchemaPath> columnsNotFound) {
        projectionColumns = DrillParquetReader.adaptColumnsToParquetSchema(projectionColumns, schema);
        List<SchemaPath> schemaColumns = DrillParquetReader.getAllColumnsFrom(schema);
        Set<SchemaPath> selectedSchemaPaths = DrillParquetReader.matchProjectionWithSchemaColumns(projectionColumns, schemaColumns, columnsNotFound);
        return DrillParquetReader.convertSelectedColumnsToMessageType(schema, selectedSchemaPaths);
    }

    private static List<SchemaPath> adaptColumnsToParquetSchema(Collection<SchemaPath> columns, MessageType schema) {
        LinkedList<SchemaPath> modifiedColumns = new LinkedList<SchemaPath>();
        for (SchemaPath path : columns) {
            ArrayList<String> segments = new ArrayList<String>();
            MessageType segmentType = schema;
            for (PathSegment seg = path.getRootSegment(); seg != null; seg = seg.getChild()) {
                if (((PathSegment)seg).isNamed()) {
                    segments.add(((PathSegment)seg).getNameSegment().getPath());
                }
                if ((segmentType = DrillParquetReader.getSegmentType((Type)segmentType, seg)) == null || segmentType.isPrimitive()) continue;
                GroupType segGroupType = segmentType.asGroupType();
                if (ParquetReaderUtility.isLogicalMapType(segGroupType)) break;
                if (!ParquetReaderUtility.isLogicalListType(segGroupType)) continue;
                String listName = segGroupType.getType(0).getName();
                String elementName = segGroupType.getType(0).asGroupType().getType(0).getName();
                segments.add(listName);
                segments.add(elementName);
            }
            modifiedColumns.add(SchemaPath.getCompoundPath(segments.toArray(new String[0])));
        }
        return modifiedColumns;
    }

    private static MessageType convertSelectedColumnsToMessageType(MessageType schema, Set<SchemaPath> selectedSchemaPaths) {
        MessageType projection = null;
        String messageName = schema.getName();
        for (SchemaPath schemaPath : selectedSchemaPaths) {
            ArrayList<String> segments = new ArrayList<String>();
            PathSegment seg = schemaPath.getRootSegment();
            do {
                segments.add(((PathSegment)seg).getNameSegment().getPath());
            } while ((seg = seg.getChild()) != null);
            String[] pathSegments = new String[segments.size()];
            segments.toArray(pathSegments);
            Type t = DrillParquetReader.getSegmentType(pathSegments, 0, schema);
            if (projection == null) {
                projection = new MessageType(messageName, new Type[]{t});
                continue;
            }
            projection = projection.union(new MessageType(messageName, new Type[]{t}));
        }
        return projection;
    }

    private static Set<SchemaPath> matchProjectionWithSchemaColumns(Collection<SchemaPath> projectionColumns, List<SchemaPath> schemaColumns, List<SchemaPath> columnsNotFound) {
        LinkedHashSet<SchemaPath> selectedSchemaPaths = new LinkedHashSet<SchemaPath>();
        for (SchemaPath projectionColumn : projectionColumns) {
            boolean notFound = true;
            for (SchemaPath schemaColumn : schemaColumns) {
                if (!schemaColumn.contains(projectionColumn)) continue;
                selectedSchemaPaths.add(schemaColumn);
                notFound = false;
            }
            if (!notFound) continue;
            columnsNotFound.add(projectionColumn);
        }
        return selectedSchemaPaths;
    }

    private static List<SchemaPath> getAllColumnsFrom(MessageType schema) {
        LinkedList<SchemaPath> schemaPaths = new LinkedList<SchemaPath>();
        for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
            String[] schemaColDesc = Arrays.copyOf(columnDescriptor.getPath(), columnDescriptor.getPath().length);
            SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
            schemaPaths.add(schemaPath);
        }
        return schemaPaths;
    }

    private static Type getSegmentType(Type parentSegmentType, PathSegment segment) {
        Type segmentType = null;
        if (parentSegmentType != null && !parentSegmentType.isPrimitive()) {
            GroupType groupType = parentSegmentType.asGroupType();
            if (segment.isNamed()) {
                String fieldName = segment.getNameSegment().getPath();
                segmentType = groupType.getFields().stream().filter(f -> f.getName().equalsIgnoreCase(fieldName)).findAny().map(field -> groupType.getType(field.getName())).orElse(null);
            } else if (ParquetReaderUtility.isLogicalListType(parentSegmentType.asGroupType())) {
                segmentType = groupType.getType(0).asGroupType().getType(0);
            }
        }
        return segmentType;
    }

    @Override
    public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
        try {
            for (ValueVector v : vectorMap.values()) {
                AllocationHelper.allocate(v, 65535, 50, 10);
            }
        }
        catch (NullPointerException e) {
            throw new OutOfMemoryException();
        }
    }

    @Override
    public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
        try {
            MessageType projection;
            this.operatorContext = context;
            this.schema = this.footer.getFileMetaData().getSchema();
            ArrayList<SchemaPath> columnsNotFound = new ArrayList<SchemaPath>(this.getColumns().size());
            if (this.isStarQuery()) {
                projection = this.schema;
            } else {
                projection = DrillParquetReader.getProjection(this.schema, this.getColumns(), columnsNotFound);
                if (projection == null) {
                    projection = this.schema;
                }
                if (!columnsNotFound.isEmpty()) {
                    this.nullFilledVectors = new ArrayList<NullableIntVector>(columnsNotFound.size());
                    for (SchemaPath col : columnsNotFound) {
                        this.nullFilledVectors.add(output.addField(MaterializedField.create(col.toExpr(), org.apache.drill.common.types.Types.OPTIONAL_INT), NullableIntVector.class));
                    }
                    this.noColumnsFound = columnsNotFound.size() == this.getColumns().size();
                }
            }
            logger.debug("Requesting schema {}", (Object)projection);
            if (!this.noColumnsFound) {
                Collection columns = columnsNotFound.isEmpty() ? this.getColumns() : CollectionUtils.subtract(this.getColumns(), columnsNotFound);
                this.recordMaterializer = new DrillParquetRecordMaterializer(output, projection, columns, this.fragmentContext.getOptions(), this.containsCorruptedDates);
            }
            if (this.numRecordsToRead == 0L || this.noColumnsFound) {
                return;
            }
            ColumnIOFactory factory = new ColumnIOFactory(false);
            MessageColumnIO columnIO = factory.getColumnIO(projection, this.schema);
            BlockMetaData blockMetaData = (BlockMetaData)this.footer.getBlocks().get(this.entry.getRowGroupIndex());
            Map paths = blockMetaData.getColumns().stream().collect(Collectors.toMap(ColumnChunkMetaData::getPath, Function.identity(), (o, n) -> n));
            BufferAllocator allocator = this.operatorContext.getAllocator();
            CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(this.drillFileSystem.getConf(), new ParquetDirectByteBufferAllocator(allocator), 0);
            this.pageReadStore = new ColumnChunkIncReadStore(this.numRecordsToRead, ccf, allocator, (FileSystem)this.drillFileSystem, this.entry.getPath());
            for (String[] path : this.schema.getPaths()) {
                Type type = this.schema.getType(path);
                if (!type.isPrimitive()) continue;
                ColumnChunkMetaData md = (ColumnChunkMetaData)paths.get(ColumnPath.get((String[])path));
                this.pageReadStore.addColumn(this.schema.getColumnDescription(path), md);
            }
            this.recordReader = columnIO.getRecordReader((PageReadStore)this.pageReadStore, (RecordMaterializer)this.recordMaterializer);
        }
        catch (Exception e) {
            throw this.handleAndRaise("Failure in setting up reader", e);
        }
    }

    private static Type getSegmentType(String[] pathSegments, int depth, MessageType schema) {
        int nextDepth = depth + 1;
        Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, nextDepth));
        if (nextDepth == pathSegments.length) {
            return type;
        }
        Preconditions.checkState(!type.isPrimitive());
        return (Type)((Types.GroupBuilder)((Types.GroupBuilder)Types.buildGroup((Type.Repetition)type.getRepetition()).as(type.getOriginalType())).addField(DrillParquetReader.getSegmentType(pathSegments, nextDepth, schema))).named(type.getName());
    }

    @Override
    public int next() {
        if (this.noColumnsFound) {
            if (this.totalRead == this.numRecordsToRead) {
                return 0;
            }
            for (ValueVector valueVector : this.nullFilledVectors) {
                valueVector.getMutator().setValueCount((int)this.numRecordsToRead);
            }
            this.totalRead = this.numRecordsToRead;
            return (int)this.numRecordsToRead;
        }
        int count = 0;
        while (count < this.recordsPerBatch && this.totalRead < this.numRecordsToRead) {
            this.recordMaterializer.setPosition(count);
            this.recordReader.read();
            ++count;
            ++this.totalRead;
        }
        this.recordMaterializer.setValueCount(count);
        if (this.nullFilledVectors != null) {
            for (ValueVector valueVector : this.nullFilledVectors) {
                valueVector.getMutator().setValueCount(count);
            }
        }
        return count;
    }

    @Override
    public void close() {
        this.closeStats(logger, this.entry.getPath());
        this.footer = null;
        this.drillFileSystem = null;
        this.entry = null;
        this.recordReader = null;
        this.recordMaterializer = null;
        this.nullFilledVectors = null;
        try {
            if (this.pageReadStore != null) {
                this.pageReadStore.close();
                this.pageReadStore = null;
            }
        }
        catch (IOException e) {
            logger.warn("Failure while closing PageReadStore", (Throwable)e);
        }
    }

    @Override
    public String toString() {
        StringJoiner stringJoiner = new StringJoiner(", ", DrillParquetReader.class.getSimpleName() + "[", "]").add("schema=" + this.schema).add("numRecordsToRead=" + this.numRecordsToRead);
        if (this.pageReadStore != null) {
            stringJoiner.add("pageReadStore=" + this.pageReadStore);
        }
        return stringJoiner.toString();
    }
}

