/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.DynamicBucketSink;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.MathUtils;
import org.apache.paimon.utils.SerializableFunction;

public abstract class CdcDynamicBucketSinkBase<T>
extends DynamicBucketSink<T> {
    public CdcDynamicBucketSinkBase(FileStoreTable table) {
        super(table, null);
    }

    protected ChannelComputer<T> assignerChannelComputer(Integer numAssigners) {
        return new AssignerChannelComputer(numAssigners);
    }

    protected ChannelComputer<Tuple2<T, Integer>> channelComputer2() {
        return new RecordWithBucketChannelComputer();
    }

    protected SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction() {
        return (SerializableFunction & Serializable)schema -> {
            final KeyAndBucketExtractor extractor = this.createExtractor((TableSchema)schema);
            return new PartitionKeyExtractor<T>(){

                public BinaryRow partition(T record) {
                    extractor.setRecord(record);
                    return extractor.partition();
                }

                public BinaryRow trimmedPrimaryKey(T record) {
                    extractor.setRecord(record);
                    return extractor.trimmedPrimaryKey();
                }
            };
        };
    }

    protected abstract KeyAndBucketExtractor<T> createExtractor(TableSchema var1);

    private class RecordWithBucketChannelComputer
    implements ChannelComputer<Tuple2<T, Integer>> {
        private transient int numChannels;
        private transient KeyAndBucketExtractor<T> extractor;

        private RecordWithBucketChannelComputer() {
        }

        public void setup(int numChannels) {
            this.numChannels = numChannels;
            this.extractor = CdcDynamicBucketSinkBase.this.createExtractor(CdcDynamicBucketSinkBase.this.table.schema());
        }

        public int channel(Tuple2<T, Integer> record) {
            this.extractor.setRecord(record.f0);
            return ChannelComputer.select((BinaryRow)this.extractor.partition(), (int)((Integer)record.f1), (int)this.numChannels);
        }

        public String toString() {
            return "shuffle by partition & bucket";
        }
    }

    private class AssignerChannelComputer
    implements ChannelComputer<T> {
        private Integer numAssigners;
        private transient int numChannels;
        private transient KeyAndBucketExtractor<T> extractor;

        public AssignerChannelComputer(Integer numAssigners) {
            this.numAssigners = numAssigners;
        }

        public void setup(int numChannels) {
            this.numChannels = numChannels;
            this.numAssigners = MathUtils.min((Integer)this.numAssigners, (Integer)numChannels);
            this.extractor = CdcDynamicBucketSinkBase.this.createExtractor(CdcDynamicBucketSinkBase.this.table.schema());
        }

        public int channel(T record) {
            this.extractor.setRecord(record);
            int partitionHash = this.extractor.partition().hashCode();
            int keyHash = this.extractor.trimmedPrimaryKey().hashCode();
            return BucketAssigner.computeAssigner((int)partitionHash, (int)keyHash, (int)this.numChannels, (int)this.numAssigners);
        }

        public String toString() {
            return "shuffle by key hash";
        }
    }
}

