/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.DynamicPartitionFilteringInfo;
import org.apache.paimon.flink.source.FileSplitEnumeratorTestBase;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.flink.source.StaticFileStoreSplitEnumerator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public abstract class StaticFileStoreSplitEnumeratorTestBase
extends FileSplitEnumeratorTestBase {
    @Test
    public void testDynamicPartitionFilteringAfterStarted() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 0; i < 4; ++i) {
            initialSplits.add(this.createSnapshotSplit(i + 1, 0, Collections.emptyList(), i / 2, i % 2));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)initialSplits, RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}), Arrays.asList("f0", "f1"));
        Map assignments = context.getSplitAssignments();
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)initialSplits.get(0)});
        Assertions.assertThat((Collection)enumerator.getSplitAssigner().remainingSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)initialSplits.get(1), (FileStoreSourceSplit)initialSplits.get(2), (FileStoreSourceSplit)initialSplits.get(3)});
        assignments.clear();
        MockDynamicFilteringData dynamicFilteringData = new MockDynamicFilteringData(org.apache.flink.table.types.logical.RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType()}), new RowData[]{GenericRowData.of((Object[])new Object[]{1, 1})});
        enumerator.handleSourceEvent(0, (SourceEvent)new DynamicFilteringEvent((DynamicFilteringData)dynamicFilteringData));
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)initialSplits.get(3)});
        Assertions.assertThat((Collection)enumerator.getSplitAssigner().remainingSplits()).isEmpty();
    }

    @Test
    public void testDynamicPartitionFilteringWithProjection() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 0; i < 4; ++i) {
            initialSplits.add(this.createSnapshotSplit(i + 1, 0, Collections.emptyList(), i / 2, i % 2));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)initialSplits, RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}), Collections.singletonList("f0"));
        Map assignments = context.getSplitAssignments();
        MockDynamicFilteringData dynamicFilteringData = new MockDynamicFilteringData(org.apache.flink.table.types.logical.RowType.of((LogicalType[])new LogicalType[]{new IntType()}), new RowData[]{GenericRowData.of((Object[])new Object[]{0})});
        enumerator.handleSourceEvent(0, (SourceEvent)new DynamicFilteringEvent((DynamicFilteringData)dynamicFilteringData));
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)initialSplits.get(0), (FileStoreSourceSplit)initialSplits.get(1)});
        Assertions.assertThat((Collection)enumerator.getSplitAssigner().remainingSplits()).isEmpty();
    }

    protected StaticFileStoreSplitEnumerator getSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, List<FileStoreSourceSplit> splits) {
        return new StaticFileStoreSplitEnumerator(context, null, StaticFileStoreSource.createSplitAssigner(context, (int)10, (FlinkConnectorOptions.SplitAssignMode)this.splitAssignMode(), splits));
    }

    private StaticFileStoreSplitEnumerator getSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, List<FileStoreSourceSplit> splits, RowType partitionRowProjection, List<String> dynamicPartitionFilteringFields) {
        FlinkConnectorOptions.SplitAssignMode mode = this.splitAssignMode();
        int splitBatchSize = mode == FlinkConnectorOptions.SplitAssignMode.FAIR ? 1 : 10;
        return new StaticFileStoreSplitEnumerator(context, null, StaticFileStoreSource.createSplitAssigner(context, (int)splitBatchSize, (FlinkConnectorOptions.SplitAssignMode)this.splitAssignMode(), splits), new DynamicPartitionFilteringInfo(partitionRowProjection, dynamicPartitionFilteringFields));
    }

    protected abstract FlinkConnectorOptions.SplitAssignMode splitAssignMode();

    private static class MockDynamicFilteringData
    extends DynamicFilteringData {
        private final org.apache.flink.table.types.logical.RowType rowType;
        private final RowData[] neededPartitions;

        public MockDynamicFilteringData(org.apache.flink.table.types.logical.RowType rowType, RowData[] neededPartitions) {
            super((TypeInformation)new GenericTypeInfo(RowData.class), rowType, Collections.emptyList(), true);
            this.rowType = rowType;
            this.neededPartitions = neededPartitions;
        }

        public boolean contains(RowData row) {
            if (!this.isFiltering()) {
                return true;
            }
            Preconditions.checkArgument((this.rowType.getFieldCount() == row.getArity() ? 1 : 0) != 0);
            for (RowData mayMatch : this.neededPartitions) {
                if (!this.matchRow(row, mayMatch)) continue;
                return true;
            }
            return false;
        }

        private boolean matchRow(RowData row, RowData mayMatch) {
            for (int i = 0; i < this.rowType.getFieldCount(); ++i) {
                if (row.getInt(i) == mayMatch.getInt(i)) continue;
                return false;
            }
            return true;
        }
    }
}

