/*
 * Decompiled with CFR 0.152.
 */
package org.apache.parquet.pig;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.WeakHashMap;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.pig.PigSchemaConverter;
import org.apache.parquet.pig.TupleReadSupport;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.parser.ParserException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetLoader
extends LoadFunc
implements LoadMetadata,
LoadPushDown,
LoadPredicatePushdown {
    private static final Logger LOG = LoggerFactory.getLogger(ParquetLoader.class);
    public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable";
    private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false;
    static final Map<String, Reference<ParquetInputFormat<Tuple>>> inputFormatCache = new WeakHashMap<String, Reference<ParquetInputFormat<Tuple>>>();
    private Schema requestedSchema;
    private boolean columnIndexAccess;
    private String location;
    private boolean setLocationHasBeenCalled = false;
    private RecordReader<Void, Tuple> reader;
    private ParquetInputFormat<Tuple> parquetInputFormat;
    private Schema schema;
    private LoadPushDown.RequiredFieldList requiredFieldList = null;
    protected String signature;

    public ParquetLoader() {
        this(null);
    }

    public ParquetLoader(String requestedSchemaStr) {
        this(PigSchemaConverter.parsePigSchema(requestedSchemaStr), false);
    }

    public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
        this(PigSchemaConverter.parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
    }

    public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
        this.requestedSchema = requestedSchema;
        this.columnIndexAccess = columnIndexAccess;
    }

    public void setLocation(String location, Job job) throws IOException {
        if (LOG.isDebugEnabled()) {
            String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
            LOG.debug("LoadFunc.setLocation({}, {})", (Object)location, (Object)jobToString);
        }
        this.setInput(location, job);
    }

    private void setInput(String location, Job job) throws IOException {
        this.setLocationHasBeenCalled = true;
        this.location = location;
        FileInputFormat.setInputPaths((Job)job, (String)location);
        if (UDFContext.getUDFContext().isFrontend()) {
            this.storeInUDFContext("parquet.private.pig.column.index.access", Boolean.toString(this.columnIndexAccess));
        }
        this.schema = PigSchemaConverter.parsePigSchema(this.getPropertyFromUDFContext("parquet.pig.schema"));
        this.requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(this.getPropertyFromUDFContext("parquet.private.pig.required.fields"));
        this.columnIndexAccess = Boolean.parseBoolean(this.getPropertyFromUDFContext("parquet.private.pig.column.index.access"));
        this.initSchema(job);
        if (UDFContext.getUDFContext().isFrontend()) {
            this.storeInUDFContext("parquet.pig.schema", PigSchemaConverter.pigSchemaToString(this.schema));
            this.storeInUDFContext("parquet.private.pig.required.fields", PigSchemaConverter.serializeRequiredFieldList(this.requiredFieldList));
        }
        ContextUtil.getConfiguration((JobContext)job).set("parquet.pig.schema", PigSchemaConverter.pigSchemaToString(this.schema));
        ContextUtil.getConfiguration((JobContext)job).set("parquet.private.pig.required.fields", PigSchemaConverter.serializeRequiredFieldList(this.requiredFieldList));
        ContextUtil.getConfiguration((JobContext)job).set("parquet.private.pig.column.index.access", Boolean.toString(this.columnIndexAccess));
        FilterPredicate filterPredicate = (FilterPredicate)this.getFromUDFContext("parquet.private.read.filter.predicate");
        if (filterPredicate != null) {
            ParquetInputFormat.setFilterPredicate(ContextUtil.getConfiguration((JobContext)job), filterPredicate);
        }
    }

    public InputFormat<Void, Tuple> getInputFormat() throws IOException {
        LOG.debug("LoadFunc.getInputFormat()");
        return this.getParquetInputFormat();
    }

    private void checkSetLocationHasBeenCalled() {
        if (!this.setLocationHasBeenCalled) {
            throw new IllegalStateException("setLocation() must be called first");
        }
    }

    private ParquetInputFormat<Tuple> getParquetInputFormat() throws ParserException {
        this.checkSetLocationHasBeenCalled();
        if (this.parquetInputFormat == null) {
            Reference<ParquetInputFormat<Tuple>> ref = inputFormatCache.get(this.location);
            ParquetInputFormat<Tuple> parquetInputFormat = this.parquetInputFormat = ref == null ? null : ref.get();
            if (this.parquetInputFormat == null) {
                this.parquetInputFormat = new UnregisteringParquetInputFormat(this.location);
                inputFormatCache.put(this.location, new SoftReference<ParquetInputFormat<Tuple>>(this.parquetInputFormat));
            }
        }
        return this.parquetInputFormat;
    }

    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
        LOG.debug("LoadFunc.prepareToRead({}, {})", (Object)reader, (Object)split);
        this.reader = reader;
    }

    public Tuple getNext() throws IOException {
        try {
            if (this.reader.nextKeyValue()) {
                return (Tuple)this.reader.getCurrentValue();
            }
            return null;
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ParquetDecodingException("Interrupted", e);
        }
    }

    public String[] getPartitionKeys(String location, Job job) throws IOException {
        if (LOG.isDebugEnabled()) {
            String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
            LOG.debug("LoadMetadata.getPartitionKeys({}, {})", (Object)location, (Object)jobToString);
        }
        this.setInput(location, job);
        return null;
    }

    public ResourceSchema getSchema(String location, Job job) throws IOException {
        if (LOG.isDebugEnabled()) {
            String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
            LOG.debug("LoadMetadata.getSchema({}, {})", (Object)location, (Object)jobToString);
        }
        this.setInput(location, job);
        return new ResourceSchema(this.schema);
    }

    private void initSchema(Job job) throws IOException {
        if (this.schema != null) {
            return;
        }
        if (this.schema == null && this.requestedSchema != null) {
            this.schema = this.requestedSchema;
        }
        if (this.schema == null) {
            GlobalMetaData globalMetaData = this.getParquetInputFormat().getGlobalMetaData((JobContext)job);
            this.schema = TupleReadSupport.getPigSchemaFromMultipleFiles(globalMetaData.getSchema(), globalMetaData.getKeyValueMetaData());
        }
        if (this.isElephantBirdCompatible(job)) {
            this.convertToElephantBirdCompatibleSchema(this.schema);
        }
    }

    private void convertToElephantBirdCompatibleSchema(Schema schema) {
        if (schema == null) {
            return;
        }
        for (Schema.FieldSchema fieldSchema : schema.getFields()) {
            if (fieldSchema.type == 5) {
                fieldSchema.type = (byte)10;
            }
            this.convertToElephantBirdCompatibleSchema(fieldSchema.schema);
        }
    }

    private boolean isElephantBirdCompatible(Job job) {
        return ContextUtil.getConfiguration((JobContext)job).getBoolean("parquet.pig.elephantbird.compatible", false);
    }

    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
        if (LOG.isDebugEnabled()) {
            String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
            LOG.debug("LoadMetadata.getStatistics({}, {})", (Object)location, (Object)jobToString);
        }
        this.setInput(location, job);
        long length = 0L;
        try {
            for (InputSplit split : this.getParquetInputFormat().getSplits((JobContext)job)) {
                length += split.getLength();
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted: ", (Throwable)e);
            return null;
        }
        ResourceStatistics stats = new ResourceStatistics();
        stats.setmBytes(Long.valueOf(length / 1024L / 1024L));
        return stats;
    }

    public void setPartitionFilter(Expression expression) throws IOException {
        LOG.debug("LoadMetadata.setPartitionFilter({})", (Object)expression);
    }

    public List<LoadPushDown.OperatorSet> getFeatures() {
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
    }

    protected String getPropertyFromUDFContext(String key) {
        UDFContext udfContext = UDFContext.getUDFContext();
        return udfContext.getUDFProperties(((Object)((Object)this)).getClass(), new String[]{this.signature}).getProperty(key);
    }

    protected Object getFromUDFContext(String key) {
        UDFContext udfContext = UDFContext.getUDFContext();
        return udfContext.getUDFProperties(((Object)((Object)this)).getClass(), new String[]{this.signature}).get(key);
    }

    protected void storeInUDFContext(String key, Object value) {
        UDFContext udfContext = UDFContext.getUDFContext();
        Properties props = udfContext.getUDFProperties(((Object)((Object)this)).getClass(), new String[]{this.signature});
        props.put(key, value);
    }

    public LoadPushDown.RequiredFieldResponse pushProjection(LoadPushDown.RequiredFieldList requiredFieldList) throws FrontendException {
        this.requiredFieldList = requiredFieldList;
        if (requiredFieldList == null) {
            return null;
        }
        this.schema = this.getSchemaFromRequiredFieldList(this.schema, requiredFieldList.getFields());
        this.storeInUDFContext("parquet.pig.schema", PigSchemaConverter.pigSchemaToString(this.schema));
        this.storeInUDFContext("parquet.private.pig.required.fields", PigSchemaConverter.serializeRequiredFieldList(requiredFieldList));
        return new LoadPushDown.RequiredFieldResponse(true);
    }

    public void setUDFContextSignature(String signature) {
        this.signature = signature;
    }

    private Schema getSchemaFromRequiredFieldList(Schema schema, List<LoadPushDown.RequiredField> fieldList) throws FrontendException {
        Schema s = new Schema();
        for (LoadPushDown.RequiredField rf : fieldList) {
            Schema.FieldSchema f;
            try {
                f = schema.getField(rf.getAlias()).clone();
            }
            catch (CloneNotSupportedException e) {
                throw new FrontendException("Clone not supported for the fieldschema", (Throwable)e);
            }
            if (rf.getSubFields() == null) {
                s.add(f);
                continue;
            }
            Schema innerSchema = this.getSchemaFromRequiredFieldList(f.schema, rf.getSubFields());
            if (innerSchema == null) {
                return null;
            }
            f.schema = innerSchema;
            s.add(f);
        }
        return s;
    }

    public List<String> getPredicateFields(String s, Job job) throws IOException {
        if (!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, false)) {
            return null;
        }
        ArrayList<String> fields = new ArrayList<String>();
        for (Schema.FieldSchema field : this.schema.getFields()) {
            switch (field.type) {
                case 5: 
                case 10: 
                case 15: 
                case 20: 
                case 25: 
                case 55: {
                    fields.add(field.alias);
                    break;
                }
            }
        }
        return fields;
    }

    public List<Expression.OpType> getSupportedExpressionTypes() {
        Expression.OpType[] supportedTypes = new Expression.OpType[]{Expression.OpType.OP_EQ, Expression.OpType.OP_NE, Expression.OpType.OP_GT, Expression.OpType.OP_GE, Expression.OpType.OP_LT, Expression.OpType.OP_LE, Expression.OpType.OP_AND, Expression.OpType.OP_OR, Expression.OpType.OP_NOT};
        return Arrays.asList(supportedTypes);
    }

    public void setPushdownPredicate(Expression e) throws IOException {
        LOG.info("Pig pushdown expression: {}", (Object)e);
        FilterPredicate pred = this.buildFilter(e);
        LOG.info("Parquet filter predicate expression: {}", (Object)pred);
        this.storeInUDFContext("parquet.private.read.filter.predicate", pred);
    }

    private FilterPredicate buildFilter(Expression e) {
        Expression.OpType op = e.getOpType();
        if (e instanceof Expression.BinaryExpression) {
            Expression lhs = ((Expression.BinaryExpression)e).getLhs();
            Expression rhs = ((Expression.BinaryExpression)e).getRhs();
            switch (op) {
                case OP_AND: {
                    return FilterApi.and(this.buildFilter(lhs), this.buildFilter(rhs));
                }
                case OP_OR: {
                    return FilterApi.or(this.buildFilter(lhs), this.buildFilter(rhs));
                }
                case OP_BETWEEN: {
                    Expression.BetweenExpression between = (Expression.BetweenExpression)rhs;
                    return FilterApi.and(this.buildFilter(Expression.OpType.OP_GE, (Expression.Column)lhs, (Expression.Const)between.getLower()), this.buildFilter(Expression.OpType.OP_LE, (Expression.Column)lhs, (Expression.Const)between.getUpper()));
                }
                case OP_IN: {
                    FilterPredicate current = null;
                    for (Object value : ((Expression.InExpression)rhs).getValues()) {
                        FilterPredicate next = this.buildFilter(Expression.OpType.OP_EQ, (Expression.Column)lhs, (Expression.Const)value);
                        if (current != null) {
                            current = FilterApi.or(current, next);
                            continue;
                        }
                        current = next;
                    }
                    return current;
                }
            }
            if (lhs instanceof Expression.Column && rhs instanceof Expression.Const) {
                return this.buildFilter(op, (Expression.Column)lhs, (Expression.Const)rhs);
            }
            if (lhs instanceof Expression.Const && rhs instanceof Expression.Column) {
                return this.buildFilter(op, (Expression.Column)rhs, (Expression.Const)lhs);
            }
        } else if (e instanceof Expression.UnaryExpression && op == Expression.OpType.OP_NOT) {
            return LogicalInverseRewriter.rewrite(FilterApi.not(this.buildFilter(((Expression.UnaryExpression)e).getExpression())));
        }
        throw new RuntimeException("Could not build filter for expression: " + e);
    }

    private FilterPredicate buildFilter(Expression.OpType op, Expression.Column col, Expression.Const value) {
        String name = col.getName();
        try {
            Schema.FieldSchema f = this.schema.getField(name);
            switch (f.type) {
                case 5: {
                    Operators.BooleanColumn boolCol = FilterApi.booleanColumn(name);
                    switch (op) {
                        case OP_EQ: {
                            return FilterApi.eq(boolCol, ParquetLoader.getValue(value, boolCol.getColumnType()));
                        }
                        case OP_NE: {
                            return FilterApi.notEq(boolCol, ParquetLoader.getValue(value, boolCol.getColumnType()));
                        }
                    }
                    throw new RuntimeException("Operation " + op + " not supported for boolean column: " + name);
                }
                case 10: {
                    Operators.IntColumn intCol = FilterApi.intColumn(name);
                    return ParquetLoader.op(op, intCol, value);
                }
                case 15: {
                    Operators.LongColumn longCol = FilterApi.longColumn(name);
                    return ParquetLoader.op(op, longCol, value);
                }
                case 20: {
                    Operators.FloatColumn floatCol = FilterApi.floatColumn(name);
                    return ParquetLoader.op(op, floatCol, value);
                }
                case 25: {
                    Operators.DoubleColumn doubleCol = FilterApi.doubleColumn(name);
                    return ParquetLoader.op(op, doubleCol, value);
                }
                case 55: {
                    Operators.BinaryColumn binaryCol = FilterApi.binaryColumn(name);
                    return ParquetLoader.op(op, binaryCol, value);
                }
            }
            throw new RuntimeException("Unsupported type " + f.type + " for field: " + name);
        }
        catch (FrontendException e) {
            throw new RuntimeException("Error processing pushdown for column:" + col, e);
        }
    }

    private static <C extends Comparable<C>, COL extends Operators.Column<C>> FilterPredicate op(Expression.OpType op, COL col, Expression.Const valueExpr) {
        C value = ParquetLoader.getValue(valueExpr, col.getColumnType());
        switch (op) {
            case OP_EQ: {
                return FilterApi.eq(col, value);
            }
            case OP_NE: {
                return FilterApi.notEq(col, value);
            }
            case OP_GT: {
                return FilterApi.gt(col, value);
            }
            case OP_GE: {
                return FilterApi.gtEq(col, value);
            }
            case OP_LT: {
                return FilterApi.lt(col, value);
            }
            case OP_LE: {
                return FilterApi.ltEq(col, value);
            }
        }
        return null;
    }

    private static <C extends Comparable<C>> C getValue(Expression.Const valueExpr, Class<C> type) {
        Object value = valueExpr.getValue();
        if (value instanceof String) {
            value = Binary.fromString((String)value);
        }
        return (C)((Comparable)type.cast(value));
    }

    private static class UnregisteringParquetInputFormat
    extends ParquetInputFormat<Tuple> {
        private final String location;

        public UnregisteringParquetInputFormat(String location) {
            super(TupleReadSupport.class);
            this.location = location;
        }

        @Override
        public RecordReader<Void, Tuple> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            inputFormatCache.remove(this.location);
            return super.createRecordReader(inputSplit, taskAttemptContext);
        }
    }
}

