/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal.actions;

import io.debezium.document.Array;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.relational.history.JsonTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaChanges<P extends Partition>
implements SignalAction<P> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChanges.class);
    public static final String NAME = "schema-changes";
    public static final String FIELD_CHANGES = "changes";
    public static final String FIELD_DATABASE = "database";
    public static final String FIELD_SCHEMA = "schema";
    private final JsonTableChangeSerializer serializer = new JsonTableChangeSerializer();
    private final boolean useCatalogBeforeSchema;
    private final EventDispatcher<P, TableId> dispatcher;

    public SchemaChanges(EventDispatcher<P, ? extends DataCollectionId> dispatcher, boolean useCatalogBeforeSchema) {
        this.useCatalogBeforeSchema = useCatalogBeforeSchema;
        this.dispatcher = dispatcher;
    }

    @Override
    public boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException {
        Array changes = signalPayload.data.getArray(FIELD_CHANGES);
        String database = signalPayload.data.getString(FIELD_DATABASE);
        String schema = signalPayload.data.getString(FIELD_SCHEMA);
        if (changes == null || changes.isEmpty()) {
            LOGGER.warn("Table changes signal '{}' has arrived but the requested field '{}' is missing from data", signalPayload, (Object)FIELD_CHANGES);
            return false;
        }
        if (database == null || database.isEmpty()) {
            LOGGER.warn("Table changes signal '{}' has arrived but the requested field '{}' is missing from data", signalPayload, (Object)FIELD_DATABASE);
            return false;
        }
        for (TableChanges.TableChange tableChange : this.serializer.deserialize(changes, this.useCatalogBeforeSchema)) {
            if (this.dispatcher.getHistorizedSchema() != null) {
                LOGGER.info("Executing schema change for table '{}' requested by signal '{}'", (Object)tableChange.getId(), (Object)signalPayload.id);
                this.dispatcher.dispatchSchemaChangeEvent(signalPayload.partition, signalPayload.offsetContext, tableChange.getId(), emitter -> emitter.schemaChangeEvent(SchemaChangeEvent.ofTableChange(tableChange, signalPayload.partition.getSourcePartition(), signalPayload.offsetContext.getOffset(), signalPayload.offsetContext.getSourceInfo(), database, schema)));
                continue;
            }
            if (!(this.dispatcher.getSchema() instanceof RelationalDatabaseSchema)) continue;
            LOGGER.info("Executing schema change for table '{}' requested by signal '{}'", (Object)tableChange.getId(), (Object)signalPayload.id);
            RelationalDatabaseSchema databaseSchema = (RelationalDatabaseSchema)this.dispatcher.getSchema();
            if (tableChange.getType() != TableChanges.TableChangeType.CREATE && tableChange.getType() != TableChanges.TableChangeType.ALTER) continue;
            databaseSchema.refresh(tableChange.getTable());
        }
        return true;
    }
}

