/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem;

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.filesystem.AbstractFileSystemTable;
import org.apache.flink.table.filesystem.DeserializationSchemaAdapter;
import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.filesystem.PartitionComputer;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.filesystem.SerializationSchemaAdapter;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.StreamingSink;
import org.apache.flink.table.filesystem.stream.compact.CompactBulkReader;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.filesystem.stream.compact.FileInputFormatCompactReader;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

public class FileSystemTableSink
extends AbstractFileSystemTable
implements DynamicTableSink,
SupportsPartitioning,
SupportsOverwrite {
    @Nullable
    private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat;
    @Nullable
    private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat;
    @Nullable
    private final FileSystemFormatFactory formatFactory;
    @Nullable
    private final EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat;
    @Nullable
    private final EncodingFormat<SerializationSchema<RowData>> serializationFormat;
    private boolean overwrite = false;
    private boolean dynamicGrouping = false;
    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap();
    @Nullable
    private Integer configuredParallelism;

    FileSystemTableSink(DynamicTableFactory.Context context, @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat, @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat, @Nullable FileSystemFormatFactory formatFactory, @Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat, @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) {
        super(context);
        this.bulkReaderFormat = bulkReaderFormat;
        this.deserializationFormat = deserializationFormat;
        this.formatFactory = formatFactory;
        if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory).allMatch(Objects::isNull)) {
            Configuration options = Configuration.fromMap((Map)context.getCatalogTable().getOptions());
            String identifier = (String)options.get(FactoryUtil.FORMAT);
            throw new ValidationException(String.format("Could not find any format factory for identifier '%s' in the classpath.", identifier));
        }
        this.bulkWriterFormat = bulkWriterFormat;
        this.serializationFormat = serializationFormat;
        this.configuredParallelism = (Integer)this.tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context sinkContext) {
        return dataStream -> this.consume((DataStream<RowData>)dataStream, sinkContext);
    }

    private DataStreamSink<?> consume(DataStream<RowData> dataStream, DynamicTableSink.Context sinkContext) {
        int inputParallelism = dataStream.getParallelism();
        int parallelism = Optional.ofNullable(this.configuredParallelism).orElse(inputParallelism);
        if (sinkContext.isBounded()) {
            return this.createBatchSink(dataStream, sinkContext, parallelism);
        }
        if (this.overwrite) {
            throw new IllegalStateException("Streaming mode not support overwrite.");
        }
        return this.createStreamingSink(dataStream, sinkContext, parallelism);
    }

    private RowDataPartitionComputer partitionComputer() {
        return new RowDataPartitionComputer(this.defaultPartName, this.schema.getFieldNames(), this.schema.getFieldDataTypes(), this.partitionKeys.toArray(new String[0]));
    }

    private DataStreamSink<RowData> createBatchSink(DataStream<RowData> inputStream, DynamicTableSink.Context sinkContext, int parallelism) {
        FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<RowData>();
        builder.setPartitionComputer(this.partitionComputer());
        builder.setDynamicGrouped(this.dynamicGrouping);
        builder.setPartitionColumns(this.partitionKeys.toArray(new String[0]));
        builder.setFormatFactory(this.createOutputFormatFactory(sinkContext));
        builder.setMetaStoreFactory(new EmptyMetaStoreFactory(this.path));
        builder.setOverwrite(this.overwrite);
        builder.setStaticPartitions(this.staticPartitions);
        builder.setTempPath(this.toStagingPath());
        builder.setOutputFileConfig(OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID().toString()).build());
        return inputStream.writeUsingOutputFormat(builder.build()).setParallelism(parallelism).name("Filesystem");
    }

    private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, DynamicTableSink.Context sinkContext, int parallelism) {
        DataStream<PartitionCommitInfo> writerStream;
        FileSystemFactory fsFactory = FileSystem::get;
        RowDataPartitionComputer computer = this.partitionComputer();
        boolean autoCompaction = this.tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
        Object writer = this.createWriter(sinkContext);
        boolean isEncoder = writer instanceof Encoder;
        TableBucketAssigner assigner = new TableBucketAssigner(computer);
        TableRollingPolicy rollingPolicy = new TableRollingPolicy(!isEncoder || autoCompaction, ((MemorySize)this.tableOptions.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration)this.tableOptions.get(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis());
        String randomPrefix = "part-" + UUID.randomUUID().toString();
        OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder();
        fileNamingBuilder = autoCompaction ? fileNamingBuilder.withPartPrefix(CompactOperator.convertToUncompacted(randomPrefix)) : fileNamingBuilder.withPartPrefix(randomPrefix);
        OutputFileConfig fileNamingConfig = fileNamingBuilder.build();
        Object bucketsBuilder = isEncoder ? ((StreamingFileSink.DefaultRowFormatBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)this.path, (Encoder)new ProjectionEncoder((Encoder)writer, computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(fileNamingConfig)).withRollingPolicy((RollingPolicy)rollingPolicy) : ((StreamingFileSink.DefaultBulkFormatBuilder)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)this.path, (BulkWriter.Factory)new ProjectionBulkFactory((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(fileNamingConfig)).withRollingPolicy((CheckpointRollingPolicy)rollingPolicy);
        long bucketCheckInterval = ((Duration)this.tableOptions.get(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis();
        if (autoCompaction) {
            long compactionSize = ((MemorySize)this.tableOptions.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE).orElse(this.tableOptions.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE))).getBytes();
            CompactReader.Factory<RowData> reader = this.createCompactReaderFactory(sinkContext).orElseThrow(() -> new TableException("Please implement available reader for compaction: BulkFormat, FileInputFormat."));
            writerStream = StreamingSink.compactionWriter(dataStream, bucketCheckInterval, bucketsBuilder, fsFactory, this.path, reader, compactionSize, parallelism);
        } else {
            writerStream = StreamingSink.writer(dataStream, bucketCheckInterval, bucketsBuilder, parallelism);
        }
        return StreamingSink.sink(writerStream, this.path, this.tableIdentifier, this.partitionKeys, new EmptyMetaStoreFactory(this.path), fsFactory, this.tableOptions);
    }

    private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(DynamicTableSink.Context context) {
        DataType producedDataType = this.schema.toRowDataType();
        if (this.bulkReaderFormat != null) {
            BulkFormat format = (BulkFormat)this.bulkReaderFormat.createRuntimeDecoder(this.createSourceContext(context), producedDataType);
            return Optional.of(CompactBulkReader.factory(format));
        }
        if (this.formatFactory != null) {
            InputFormat format = this.formatFactory.createReader(this.createReaderContext());
            if (format instanceof FileInputFormat) {
                return Optional.of(FileInputFormatCompactReader.factory((FileInputFormat)format));
            }
        } else if (this.deserializationFormat != null) {
            DeserializationSchema decoder = (DeserializationSchema)this.deserializationFormat.createRuntimeDecoder(this.createSourceContext(context), this.getFormatDataType());
            int[] projectedFields = IntStream.range(0, this.schema.getFieldCount()).toArray();
            DeserializationSchemaAdapter format = new DeserializationSchemaAdapter((DeserializationSchema<RowData>)decoder, this.schema, projectedFields, this.partitionKeys, this.defaultPartName);
            return Optional.of(CompactBulkReader.factory(format));
        }
        return Optional.empty();
    }

    private DynamicTableSource.Context createSourceContext(final DynamicTableSink.Context context) {
        return new DynamicTableSource.Context(){

            public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
                return context.createTypeInformation(producedDataType);
            }

            public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType producedDataType) {
                throw new TableException("Compaction reader not support DataStructure converter.");
            }
        };
    }

    private FileSystemFormatFactory.ReaderContext createReaderContext() {
        return new FileSystemFormatFactory.ReaderContext(){

            public TableSchema getSchema() {
                return FileSystemTableSink.this.schema;
            }

            public ReadableConfig getFormatOptions() {
                return FileSystemTableSink.this.formatOptions(FileSystemTableSink.this.formatFactory.factoryIdentifier());
            }

            public List<String> getPartitionKeys() {
                return FileSystemTableSink.this.partitionKeys;
            }

            public String getDefaultPartName() {
                return FileSystemTableSink.this.defaultPartName;
            }

            public Path[] getPaths() {
                return new Path[]{FileSystemTableSink.this.path};
            }

            public int[] getProjectFields() {
                return IntStream.range(0, FileSystemTableSink.this.schema.getFieldCount()).toArray();
            }

            public long getPushedDownLimit() {
                return Long.MAX_VALUE;
            }

            public List<ResolvedExpression> getPushedDownFilters() {
                return Collections.emptyList();
            }
        };
    }

    private Path toStagingPath() {
        Path stagingDir = new Path(this.path, ".staging_" + System.currentTimeMillis());
        try {
            FileSystem fs = stagingDir.getFileSystem();
            Preconditions.checkState((fs.exists(stagingDir) || fs.mkdirs(stagingDir) ? 1 : 0) != 0, (Object)("Failed to create staging dir " + stagingDir));
            return stagingDir;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private OutputFormatFactory<RowData> createOutputFormatFactory(DynamicTableSink.Context sinkContext) {
        Object writer = this.createWriter(sinkContext);
        return writer instanceof Encoder ? path -> FileSystemTableSink.createEncoderOutputFormat((Encoder<RowData>)((Encoder)writer), path) : path -> FileSystemTableSink.createBulkWriterOutputFormat((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), path);
    }

    private Object createWriter(DynamicTableSink.Context sinkContext) {
        if (this.bulkWriterFormat != null) {
            return this.bulkWriterFormat.createRuntimeEncoder(sinkContext, this.getFormatDataType());
        }
        if (this.serializationFormat != null) {
            return new SerializationSchemaAdapter((SerializationSchema<RowData>)((SerializationSchema)this.serializationFormat.createRuntimeEncoder(sinkContext, this.getFormatDataType())));
        }
        throw new TableException("Can not find format factory.");
    }

    private void checkConfiguredParallelismAllowed(ChangelogMode requestChangelogMode) {
        Integer parallelism = this.configuredParallelism;
        if (parallelism == null) {
            return;
        }
        if (!requestChangelogMode.containsOnly(RowKind.INSERT)) {
            throw new ValidationException(String.format("Currently, filesystem sink doesn't support setting parallelism (%d) by '%s' when the input stream is not INSERT only. The row kinds of input stream are [%s]", parallelism, FileSystemOptions.SINK_PARALLELISM.key(), requestChangelogMode.getContainedKinds().stream().map(RowKind::shortString).collect(Collectors.joining(","))));
        }
    }

    private static OutputFormat<RowData> createBulkWriterOutputFormat(final BulkWriter.Factory<RowData> factory, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient BulkWriter<RowData> writer;
            private transient FSDataOutputStream stream;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
                this.writer = factory.create(this.stream);
            }

            public void writeRecord(RowData record) throws IOException {
                this.writer.addElement((Object)record);
            }

            public void close() throws IOException {
                this.writer.flush();
                this.writer.finish();
                this.stream.close();
            }
        };
    }

    private static OutputFormat<RowData> createEncoderOutputFormat(final Encoder<RowData> encoder, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient FSDataOutputStream output;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.output = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
            }

            public void writeRecord(RowData record) throws IOException {
                encoder.encode((Object)record, (OutputStream)this.output);
            }

            public void close() throws IOException {
                this.output.flush();
                this.output.close();
            }
        };
    }

    private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String, String> part) {
        LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
        for (String partitionKey : this.partitionKeys) {
            if (!part.containsKey(partitionKey)) continue;
            partSpec.put(partitionKey, part.get(partitionKey));
        }
        return partSpec;
    }

    public boolean requiresPartitionGrouping(boolean supportsGrouping) {
        this.dynamicGrouping = supportsGrouping;
        return this.dynamicGrouping;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        this.checkConfiguredParallelismAllowed(requestedMode);
        if (this.bulkWriterFormat != null) {
            return this.bulkWriterFormat.getChangelogMode();
        }
        if (this.serializationFormat != null) {
            return this.serializationFormat.getChangelogMode();
        }
        throw new TableException("Can not find format factory.");
    }

    public DynamicTableSink copy() {
        FileSystemTableSink sink = new FileSystemTableSink(this.context, this.bulkReaderFormat, this.deserializationFormat, this.formatFactory, this.bulkWriterFormat, this.serializationFormat);
        sink.overwrite = this.overwrite;
        sink.dynamicGrouping = this.dynamicGrouping;
        sink.staticPartitions = this.staticPartitions;
        return sink;
    }

    public String asSummaryString() {
        return "Filesystem";
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.staticPartitions = this.toPartialLinkedPartSpec(partition);
    }

    public static class ProjectionBulkFactory
    implements BulkWriter.Factory<RowData> {
        private final BulkWriter.Factory<RowData> factory;
        private final RowDataPartitionComputer computer;

        public ProjectionBulkFactory(BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
            this.factory = factory;
            this.computer = computer;
        }

        public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
            final BulkWriter writer = this.factory.create(out);
            return new BulkWriter<RowData>(){

                public void addElement(RowData element) throws IOException {
                    writer.addElement((Object)computer.projectColumnsToWrite(element));
                }

                public void flush() throws IOException {
                    writer.flush();
                }

                public void finish() throws IOException {
                    writer.finish();
                }
            };
        }
    }

    private static class ProjectionEncoder
    implements Encoder<RowData> {
        private final Encoder<RowData> encoder;
        private final RowDataPartitionComputer computer;

        private ProjectionEncoder(Encoder<RowData> encoder, RowDataPartitionComputer computer) {
            this.encoder = encoder;
            this.computer = computer;
        }

        public void encode(RowData element, OutputStream stream) throws IOException {
            this.encoder.encode((Object)this.computer.projectColumnsToWrite(element), stream);
        }
    }

    public static class TableRollingPolicy
    extends CheckpointRollingPolicy<RowData, String> {
        private final boolean rollOnCheckpoint;
        private final long rollingFileSize;
        private final long rollingTimeInterval;

        public TableRollingPolicy(boolean rollOnCheckpoint, long rollingFileSize, long rollingTimeInterval) {
            this.rollOnCheckpoint = rollOnCheckpoint;
            Preconditions.checkArgument((rollingFileSize > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((rollingTimeInterval > 0L ? 1 : 0) != 0);
            this.rollingFileSize = rollingFileSize;
            this.rollingTimeInterval = rollingTimeInterval;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
            try {
                return this.rollOnCheckpoint || partFileState.getSize() > this.rollingFileSize;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) throws IOException {
            return partFileState.getSize() > this.rollingFileSize;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
            return currentTime - partFileState.getCreationTime() >= this.rollingTimeInterval;
        }
    }

    public static class TableBucketAssigner
    implements BucketAssigner<RowData, String> {
        private final PartitionComputer<RowData> computer;

        public TableBucketAssigner(PartitionComputer<RowData> computer) {
            this.computer = computer;
        }

        public String getBucketId(RowData element, BucketAssigner.Context context) {
            try {
                return PartitionPathUtils.generatePartitionPath(this.computer.generatePartValues(element));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }
}

