/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tajo.plan.rewrite.rules;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.AlgebraicUtil;
import org.apache.tajo.plan.expr.BinaryEval;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.expr.EvalType;
import org.apache.tajo.plan.expr.FieldEval;
import org.apache.tajo.plan.expr.IsNullEval;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.PartitionedTableScanNode;
import org.apache.tajo.plan.logical.RelationNode;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.StringUtils;

public class PartitionedTableRewriter
implements LogicalPlanRewriteRule {
    private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
    private static final String NAME = "Partitioned Table Rewriter";
    private final Rewriter rewriter = new Rewriter();

    @Override
    public String getName() {
        return NAME;
    }

    @Override
    public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
        for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
            for (RelationNode relation : block.getRelations()) {
                TableDesc table;
                if (relation.getType() != NodeType.SCAN || !(table = ((ScanNode)relation).getTableDesc()).hasPartition()) continue;
                return true;
            }
        }
        return false;
    }

    @Override
    public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
        LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
        this.rewriter.visit(queryContext, plan, rootBlock, (LogicalNode)rootBlock.getRoot(), new Stack<LogicalNode>());
        return plan;
    }

    private Path[] findFilteredPaths(OverridableConf queryContext, Schema partitionColumns, EvalNode[] conjunctiveForms, Path tablePath) throws IOException {
        FileSystem fs = tablePath.getFileSystem((Configuration)queryContext.getConf());
        PathFilter[] filters = conjunctiveForms == null ? PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns) : PartitionedTableRewriter.buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
        Path[] filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0]));
        for (int i = 1; i < partitionColumns.size(); ++i) {
            filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i]));
        }
        LOG.info((Object)("Filtered directory or files: " + filteredPaths.length));
        return filteredPaths;
    }

    private static PathFilter[] buildPathFiltersForAllLevels(Schema partitionColumns, EvalNode[] conjunctiveForms) {
        PathFilter[] filters = new PathFilter[partitionColumns.size()];
        ArrayList accumulatedFilters = Lists.newArrayList();
        for (int i = 0; i < partitionColumns.size(); ++i) {
            Column target = partitionColumns.getColumn(i);
            for (EvalNode expr : conjunctiveForms) {
                if (!EvalTreeUtil.findUniqueColumns(expr).contains(target)) continue;
                accumulatedFilters.add(expr);
            }
            if (accumulatedFilters.size() < i + 1) {
                accumulatedFilters.add(new IsNullEval(true, (EvalNode)new FieldEval(target)));
            }
            EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
            filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
        }
        return filters;
    }

    private static PathFilter[] buildAllAcceptingPathFilters(Schema partitionColumns) {
        PathFilter[] filters = new PathFilter[partitionColumns.size()];
        ArrayList accumulatedFilters = Lists.newArrayList();
        for (int i = 0; i < partitionColumns.size(); ++i) {
            Column target = partitionColumns.getColumn(i);
            accumulatedFilters.add(new IsNullEval(true, (EvalNode)new FieldEval(target)));
            EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
            filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
        }
        return filters;
    }

    private static Path[] toPathArray(FileStatus[] fileStatuses) {
        Path[] paths = new Path[fileStatuses.length];
        for (int j = 0; j < fileStatuses.length; ++j) {
            paths[j] = fileStatuses[j].getPath();
        }
        return paths;
    }

    private Path[] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException {
        TableDesc table = scanNode.getTableDesc();
        PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
        Schema paritionValuesSchema = new Schema();
        for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
            paritionValuesSchema.addColumn(column);
        }
        HashSet indexablePredicateSet = Sets.newHashSet();
        if (scanNode.hasQual()) {
            Object[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
            HashSet remainExprs = Sets.newHashSet((Object[])conjunctiveForms);
            paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
            for (Column column : paritionValuesSchema.getColumns()) {
                for (Object simpleExpr : conjunctiveForms) {
                    if (!this.checkIfIndexablePredicateOnTargetColumn((EvalNode)simpleExpr, column)) continue;
                    indexablePredicateSet.add(simpleExpr);
                }
            }
            remainExprs.removeAll(indexablePredicateSet);
            if (remainExprs.isEmpty()) {
                scanNode.setQual(null);
            } else {
                scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
            }
        }
        if (indexablePredicateSet.size() > 0) {
            return this.findFilteredPaths(queryContext, paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getPath()));
        }
        return this.findFilteredPaths(queryContext, paritionValuesSchema, null, new Path(table.getPath()));
    }

    private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
        if (this.checkIfIndexablePredicate(evalNode) || this.checkIfDisjunctiveButOneVariable(evalNode)) {
            LinkedHashSet<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
            return variables.size() == 1 && variables.contains(targetColumn);
        }
        return false;
    }

    private boolean checkIfIndexablePredicate(EvalNode evalNode) {
        return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
    }

    private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
        if (evalNode.getType() == EvalType.OR) {
            BinaryEval orEval = (BinaryEval)evalNode;
            boolean indexable = this.checkIfIndexablePredicate((EvalNode)orEval.getLeftExpr()) && this.checkIfIndexablePredicate((EvalNode)orEval.getRightExpr());
            boolean sameVariable = EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr()).equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
            return indexable && sameVariable;
        }
        return false;
    }

    private void updateTableStat(OverridableConf queryContext, PartitionedTableScanNode scanNode) throws PlanningException {
        if (scanNode.getInputPaths().length > 0) {
            try {
                FileSystem fs = scanNode.getInputPaths()[0].getFileSystem((Configuration)queryContext.getConf());
                long totalVolume = 0L;
                for (Path input : scanNode.getInputPaths()) {
                    ContentSummary summary = fs.getContentSummary(input);
                    totalVolume += summary.getLength();
                    totalVolume += summary.getFileCount();
                }
                scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
            }
            catch (IOException e) {
                throw new PlanningException(e);
            }
        }
    }

    public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath, boolean beNullIfFile) {
        int i;
        int startIdx = partitionPath.toString().indexOf(PartitionedTableRewriter.getColumnPartitionPathPrefix(partitionColumnSchema));
        if (startIdx == -1) {
            return null;
        }
        String columnValuesPart = partitionPath.toString().substring(startIdx);
        String[] columnValues = columnValuesPart.split("/");
        if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
            return null;
        }
        VTuple tuple = new VTuple(partitionColumnSchema.size());
        for (i = 0; i < columnValues.length && i < partitionColumnSchema.size(); ++i) {
            String[] parts = columnValues[i].split("=");
            if (parts.length != 2) {
                return null;
            }
            int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
            Column keyColumn = partitionColumnSchema.getColumn(columnId);
            tuple.put(columnId, DatumFactory.createFromString((TajoDataTypes.DataType)keyColumn.getDataType(), (String)StringUtils.unescapePathName((String)parts[1])));
        }
        while (i < partitionColumnSchema.size()) {
            tuple.put(i, (Datum)NullDatum.get());
            ++i;
        }
        return tuple;
    }

    private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
        StringBuilder sb = new StringBuilder();
        sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
        return sb.toString();
    }

    private final class Rewriter
    extends BasicLogicalPlanVisitor<OverridableConf, Object> {
        private Rewriter() {
        }

        @Override
        public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode, Stack<LogicalNode> stack) throws PlanningException {
            TableDesc table = scanNode.getTableDesc();
            if (!table.hasPartition()) {
                return null;
            }
            try {
                Path[] filteredPaths = PartitionedTableRewriter.this.findFilteredPartitionPaths(queryContext, scanNode);
                plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
                PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
                rewrittenScanNode.init(scanNode, filteredPaths);
                PartitionedTableRewriter.this.updateTableStat(queryContext, rewrittenScanNode);
                if (stack.empty() || ((LogicalNode)block.getRoot()).equals(scanNode)) {
                    block.setRoot(rewrittenScanNode);
                } else {
                    PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
                }
            }
            catch (IOException e) {
                throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
            }
            return null;
        }
    }

    private static class PartitionPathFilter
    implements PathFilter {
        private Schema schema;
        private EvalNode partitionFilter;

        public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
            this.schema = schema;
            this.partitionFilter = partitionFilter;
        }

        public boolean accept(Path path) {
            Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(this.schema, path, true);
            if (tuple == null) {
                return false;
            }
            return this.partitionFilter.eval(this.schema, tuple).asBool();
        }

        public String toString() {
            return this.partitionFilter.toString();
        }
    }
}

