/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory$;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions$;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGlobalGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGlobalGroupAggregate$;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLocalGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
import org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule$;
import org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedAggregateRule$;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution$;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef$;
import org.apache.flink.table.planner.plan.trait.ModifyKindSetTrait$;
import org.apache.flink.table.planner.plan.trait.RelModifiedMonotonicity;
import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil$;
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils$;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import scala.Function1;
import scala.Predef$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\r4A!\u0001\u0002\u0001+\tqBk^8Ti\u0006<Wm\u00149uS6L'0\u001a3BO\u001e\u0014XmZ1uKJ+H.\u001a\u0006\u0003\u0007\u0011\taa\u001d;sK\u0006l'BA\u0003\u0007\u0003!\u0001\b._:jG\u0006d'BA\u0004\t\u0003\u0015\u0011X\u000f\\3t\u0015\tI!\"\u0001\u0003qY\u0006t'BA\u0006\r\u0003\u001d\u0001H.\u00198oKJT!!\u0004\b\u0002\u000bQ\f'\r\\3\u000b\u0005=\u0001\u0012!\u00024mS:\\'BA\t\u0013\u0003\u0019\t\u0007/Y2iK*\t1#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001-A\u0011qcG\u0007\u00021)\u0011\u0011\"\u0007\u0006\u00035A\tqaY1mG&$X-\u0003\u0002\u001d1\tQ!+\u001a7PaR\u0014V\u000f\\3\t\u000by\u0001A\u0011A\u0010\u0002\rqJg.\u001b;?)\u0005\u0001\u0003CA\u0011\u0001\u001b\u0005\u0011\u0001\"B\u0012\u0001\t\u0003\"\u0013aB7bi\u000eDWm\u001d\u000b\u0003K-\u0002\"AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\u0012qAQ8pY\u0016\fg\u000eC\u0003-E\u0001\u0007Q&\u0001\u0003dC2d\u0007CA\f/\u0013\ty\u0003D\u0001\bSK2|\u0005\u000f\u001e*vY\u0016\u001c\u0015\r\u001c7\t\u000bE\u0002A\u0011\u0002\u001a\u0002E%\u001c\u0018J\u001c9viN\u000bG/[:gsJ+\u0017/^5sK\u0012$\u0015n\u001d;sS\n,H/[8o)\r)3g\u000f\u0005\u0006iA\u0002\r!N\u0001\u0006S:\u0004X\u000f\u001e\t\u0003mej\u0011a\u000e\u0006\u0003qe\t1A]3m\u0013\tQtGA\u0004SK2tu\u000eZ3\t\u000bq\u0002\u0004\u0019A\u001f\u0002\t-,\u0017p\u001d\t\u0004My\u0002\u0015BA (\u0005\u0015\t%O]1z!\t1\u0013)\u0003\u0002CO\t\u0019\u0011J\u001c;\t\u000b\u0011\u0003A\u0011I#\u0002\u000f=tW*\u0019;dQR\u0011a)\u0013\t\u0003M\u001dK!\u0001S\u0014\u0003\tUs\u0017\u000e\u001e\u0005\u0006Y\r\u0003\r!\f\u0005\u0006\u0017\u0002!I\u0001T\u0001\u0013GJ,\u0017\r^3ESN$(/\u001b2vi&|g\u000e\u0006\u0002N'B\u0011a*U\u0007\u0002\u001f*\u0011\u0001\u000bC\u0001\u0006iJ\f\u0017\u000e^\u0005\u0003%>\u0013AC\u00127j].\u0014V\r\u001c#jgR\u0014\u0018NY;uS>t\u0007\"\u0002\u001fK\u0001\u0004it!B+\u0003\u0011\u00031\u0016A\b+x_N#\u0018mZ3PaRLW.\u001b>fI\u0006;wM]3hCR,'+\u001e7f!\t\tsKB\u0003\u0002\u0005!\u0005\u0001l\u0005\u0002X3B\u0011aEW\u0005\u00037\u001e\u0012a!\u00118z%\u00164\u0007\"\u0002\u0010X\t\u0003iF#\u0001,\t\u000f};&\u0019!C\u0001A\u0006A\u0011JT*U\u0003:\u001bU)F\u0001\u0017\u0011\u0019\u0011w\u000b)A\u0005-\u0005I\u0011JT*U\u0003:\u001bU\t\t")
public class TwoStageOptimizedAggregateRule
extends RelOptRule {
    public static RelOptRule INSTANCE() {
        return TwoStageOptimizedAggregateRule$.MODULE$.INSTANCE();
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        TableConfig tableConfig = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
        StreamPhysicalGroupAggregate agg = (StreamPhysicalGroupAggregate)call.rel(0);
        Object realInput = call.rel(2);
        boolean needRetraction = !ChangelogPlanUtils$.MODULE$.isInsertOnly((StreamPhysicalRel)realInput);
        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery());
        RelModifiedMonotonicity monotonicity = fmq.getRelModifiedMonotonicity(agg);
        boolean[] needRetractionArray = AggregateUtil$.MODULE$.deriveAggCallNeedRetractions(agg.grouping().length, agg.aggCalls(), needRetraction, monotonicity);
        AggregateInfoList aggInfoList = AggregateUtil$.MODULE$.transformToStreamAggregateInfoList(FlinkTypeFactory$.MODULE$.toLogicalRowType(agg.getInput().getRowType()), agg.aggCalls(), needRetractionArray, needRetraction, true, AggregateUtil$.MODULE$.transformToStreamAggregateInfoList$default$6());
        boolean isMiniBatchEnabled = tableConfig.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
        AggregatePhaseStrategy aggregatePhaseStrategy = TableConfigUtils.getAggPhaseStrategy(tableConfig);
        AggregatePhaseStrategy aggregatePhaseStrategy2 = AggregatePhaseStrategy.ONE_PHASE;
        boolean isTwoPhaseEnabled = aggregatePhaseStrategy == null ? aggregatePhaseStrategy2 != null : !((Object)((Object)aggregatePhaseStrategy)).equals((Object)aggregatePhaseStrategy2);
        return isMiniBatchEnabled && isTwoPhaseEnabled && AggregateUtil$.MODULE$.doAllSupportPartialMerge(aggInfoList.aggInfos()) && !this.isInputSatisfyRequiredDistribution((RelNode)realInput, agg.grouping());
    }

    private boolean isInputSatisfyRequiredDistribution(RelNode input, int[] keys) {
        FlinkRelDistribution requiredDistribution = this.createDistribution(keys);
        FlinkRelDistribution inputDistribution = input.getTraitSet().getTrait(FlinkRelDistributionTraitDef$.MODULE$.INSTANCE());
        return inputDistribution.satisfies(requiredDistribution);
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        StreamPhysicalGroupAggregate originalAgg = (StreamPhysicalGroupAggregate)call.rel(0);
        Object realInput = call.rel(2);
        boolean needRetraction = !ChangelogPlanUtils$.MODULE$.isInsertOnly((StreamPhysicalRel)realInput);
        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery());
        RelModifiedMonotonicity monotonicity = fmq.getRelModifiedMonotonicity(originalAgg);
        boolean[] aggCallNeedRetractions = AggregateUtil$.MODULE$.deriveAggCallNeedRetractions(originalAgg.grouping().length, originalAgg.aggCalls(), needRetraction, monotonicity);
        RelTraitSet localAggTraitSet = realInput.getTraitSet().plus(ModifyKindSetTrait$.MODULE$.INSERT_ONLY()).plus(UpdateKindTrait$.MODULE$.NONE());
        StreamPhysicalLocalGroupAggregate localHashAgg = new StreamPhysicalLocalGroupAggregate(originalAgg.getCluster(), localAggTraitSet, (RelNode)realInput, originalAgg.grouping(), originalAgg.aggCalls(), aggCallNeedRetractions, needRetraction, originalAgg.partialFinalType());
        int[] globalGrouping = (int[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(originalAgg.grouping())).indices().toArray(ClassTag$.MODULE$.Int());
        FlinkRelDistribution globalDistribution = this.createDistribution(globalGrouping);
        RelNode newInput = FlinkExpandConversionRule$.MODULE$.satisfyDistribution(FlinkConventions$.MODULE$.STREAM_PHYSICAL(), localHashAgg, globalDistribution);
        RelTraitSet globalAggProvidedTraitSet = originalAgg.getTraitSet();
        StreamPhysicalGlobalGroupAggregate globalAgg = new StreamPhysicalGlobalGroupAggregate(originalAgg.getCluster(), globalAggProvidedTraitSet, newInput, originalAgg.getRowType(), globalGrouping, originalAgg.aggCalls(), aggCallNeedRetractions, realInput.getRowType(), needRetraction, originalAgg.partialFinalType(), StreamPhysicalGlobalGroupAggregate$.MODULE$.$lessinit$greater$default$11());
        call.transformTo(globalAgg);
    }

    private FlinkRelDistribution createDistribution(int[] keys) {
        FlinkRelDistribution flinkRelDistribution;
        if (new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(keys)).nonEmpty()) {
            ArrayList fields = new ArrayList();
            new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(keys)).foreach((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)x$1 -> fields.add(Predef$.MODULE$.int2Integer(x$1)));
            flinkRelDistribution = FlinkRelDistribution$.MODULE$.hash(fields, FlinkRelDistribution$.MODULE$.hash$default$2());
        } else {
            flinkRelDistribution = FlinkRelDistribution$.MODULE$.SINGLETON();
        }
        return flinkRelDistribution;
    }

    public TwoStageOptimizedAggregateRule() {
        super(RelOptRule.operand(StreamPhysicalGroupAggregate.class, RelOptRule.operand(StreamPhysicalExchange.class, RelOptRule.operand(RelNode.class, RelOptRule.any()), new RelOptRuleOperand[0]), new RelOptRuleOperand[0]), "TwoStageOptimizedAggregateRule");
    }
}

