diff --git a/api-service/src/configs/IngestionConfig.ts b/api-service/src/configs/IngestionConfig.ts index 8826b264..cfc4d1cf 100644 --- a/api-service/src/configs/IngestionConfig.ts +++ b/api-service/src/configs/IngestionConfig.ts @@ -54,6 +54,13 @@ export const rawIngestionSpecDefaults = { "type": "text", "expr": "$.obsrv_meta.syncts" }, + "hudiSynctsField": { + "name": "obsrv_meta_syncts", + "arrival_format": "text", + "data_type": "date", + "type": "text", + "expr": "$.obsrv_meta.syncts" + }, "dimensions": [ { "type": "string", @@ -64,6 +71,16 @@ export const rawIngestionSpecDefaults = { "name": "obsrv.meta.source.id" } ], + "hudi_dimensions": [ + { + "type": "string", + "name": "obsrv_meta_source_connector" + }, + { + "type": "string", + "name": "obsrv_meta_source_id" + } + ], "flattenSpec": [ { "type": "path", @@ -75,5 +92,17 @@ export const rawIngestionSpecDefaults = { "expr": "$.obsrv_meta.source.connectorInstance", "name": "obsrv.meta.source.id" } + ], + "hudi_flattenSpec": [ + { + "type": "path", + "expr": "$.obsrv_meta.source.connector", + "name": "obsrv_meta_source_connector" + }, + { + "type": "path", + "expr": "$.obsrv_meta.source.connectorInstance", + "name": "obsrv_meta_source_id" + } ] } \ No newline at end of file diff --git a/api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts b/api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts index ba426c43..fa10ef36 100644 --- a/api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts +++ b/api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts @@ -67,7 +67,9 @@ const datasetUpdate = async (req: Request, res: Response) => { const mergeDraftDataset = (datasetModel: Model | null, datasetReq: any): Record => { - const cache_config = _.get(datasetModel, ["dataset_config", "cache_config"]) + const currentSchema = _.get(datasetModel, 'data_schema') + const fieldsRemoved = (datasetReq.data_schema) ? getMissingFieldsInNewSchema(datasetReq.data_schema, currentSchema) : [] + const prev_dataset_config = _.get(datasetModel, ["dataset_config"]) const dataset: Record = { version_key: Date.now().toString(), name: datasetReq.name || _.get(datasetModel, ["name"]), @@ -77,45 +79,100 @@ const mergeDraftDataset = (datasetModel: Model | null, datasetReq: any if(datasetReq.extraction_config) dataset["extraction_config"] = datasetReq.extraction_config if(datasetReq.dedup_config) dataset["dedup_config"] = datasetReq.dedup_config if(datasetReq.data_schema) dataset["data_schema"] = datasetReq.data_schema - if(datasetReq.dataset_config) dataset["dataset_config"] = { ...datasetReq.dataset_config, cache_config } - if(datasetReq.transformations_config) - dataset["transformations_config"] = mergeTransformationsConfig(_.get(datasetModel, ["transformations_config"]), datasetReq.transformations_config) - if(datasetReq.denorm_config) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config) + if(datasetReq.dataset_config) dataset["dataset_config"] = { ...prev_dataset_config, ...datasetReq.dataset_config } + if(datasetReq.transformations_config || fieldsRemoved.length > 0) + dataset["transformations_config"] = mergeTransformationsConfig(_.get(datasetModel, ["transformations_config"]), datasetReq.transformations_config, fieldsRemoved) + if(datasetReq.denorm_config || fieldsRemoved.length > 0) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config, fieldsRemoved) if(datasetReq.connectors_config) dataset["connectors_config"] = mergeConnectorsConfig(_.get(datasetModel, ["connectors_config"]), datasetReq.connectors_config) if(datasetReq.tags) dataset["tags"] = mergeTags(_.get(datasetModel, ["tags"]), datasetReq.tags) if(datasetReq.sample_data) dataset["sample_data"] = datasetReq.sample_data if(datasetReq.type) dataset["type"] = datasetReq.type - + if(fieldsRemoved.length > 0) { + const keys_config = _.get(dataset["dataset_config"] ? dataset["dataset_config"] : prev_dataset_config, ["keys_config"]) + if(keys_config['data_key'] in fieldsRemoved) { + keys_config['data_key'] = ''; + } + if(keys_config['primary_key'] in fieldsRemoved) { + keys_config['primary_key'] = ''; + } + if(keys_config['timestamp_key'] in fieldsRemoved) { + keys_config['timestamp_key'] = ''; + } + _.set(dataset["dataset_config"], 'keys_config', keys_config) + } return dataset; } -const mergeTransformationsConfig = (currentConfigs: any, newConfigs: any) => { - const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.field_key") - const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value") +const getMissingFieldsInNewSchema = (newSchema: any, oldSchema: any) => { + + const getRemovedPropertiesFieldsNested = (oldProperties: Record, newProperties: Record, path: string[] = []): string[] => { + let removedFields: string[] = []; + for (const key in oldProperties) { + const fullPath = [...path, key].join('.'); + if (!(key in newProperties)) { + removedFields.push(fullPath); + } else if (typeof oldProperties[key] === 'object' && typeof newProperties[key] === 'object') { + removedFields = removedFields.concat( + getRemovedPropertiesFieldsNested(oldProperties[key].properties || {}, newProperties[key].properties || {}, [...path, key]) + ); + } + } + + return removedFields; + } + + const getRemovedPropertiesFields = (oldSchema: any, newSchema: any): string[] => { + const oldProperties = oldSchema.properties || {}; + const newProperties = newSchema.properties || {}; + return getRemovedPropertiesFieldsNested(oldProperties, newProperties); + } + + // Example usage + const removedFieldsNested = getRemovedPropertiesFields(oldSchema, newSchema); + return removedFieldsNested +} - return _.unionWith( - addConfigs, - _.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key)}), - (a, b) => { - return a.field_key === b.field_key - } - ) +const mergeTransformationsConfig = (currentConfigs: any, newConfigs: any, fieldsRemoved: string[]) => { + + let updatedConfigs = currentConfigs; + if(newConfigs) { + const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.field_key") + const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value") + + updatedConfigs = _.unionWith( + addConfigs, + _.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key)}), + (a, b) => { + return a.field_key === b.field_key + } + ) + } + if(fieldsRemoved.length > 0) { + updatedConfigs = _.reject(updatedConfigs, (config) => { return _.includes(fieldsRemoved, config.field_key)}) + } + return updatedConfigs } -const mergeDenormConfig = (currentConfig: any, newConfig: any) => { +const mergeDenormConfig = (currentConfig: any, newConfig: any, fieldsRemoved: string[]) => { - const removeConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "remove"}), "value.denorm_out_field") - const addConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "upsert"}), "value") + let updatedConfigs = currentConfig.denorm_fields; + if(newConfig) { + const removeConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "remove"}), "value.denorm_out_field") + const addConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "upsert"}), "value") - const denormFields = _.unionWith( - addConfigs, - _.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field)}), - (a, b) => { - return a.denorm_out_field === b.denorm_out_field - } - ) + const denormFields = _.unionWith( + addConfigs, + _.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field)}), + (a, b) => { + return a.denorm_out_field === b.denorm_out_field + } + ) + } + if(fieldsRemoved.length > 0) { + updatedConfigs = _.reject(currentConfig.denorm_fields, (config) => { return _.includes(fieldsRemoved, config.denorm_key)}) + } return { - denorm_fields: denormFields + denorm_fields: updatedConfigs } } diff --git a/api-service/src/controllers/DatasetUpdate/DatasetUpdateValidationSchema.json b/api-service/src/controllers/DatasetUpdate/DatasetUpdateValidationSchema.json index b31fe674..5d655436 100644 --- a/api-service/src/controllers/DatasetUpdate/DatasetUpdateValidationSchema.json +++ b/api-service/src/controllers/DatasetUpdate/DatasetUpdateValidationSchema.json @@ -224,7 +224,7 @@ "additionalProperties": false } }, - "required": ["indexing_config", "keys_config"], + "required": [], "additionalProperties": false }, "transformations_config": { diff --git a/api-service/src/services/DatasetService.ts b/api-service/src/services/DatasetService.ts index 206fbb5e..e8ce9a35 100644 --- a/api-service/src/services/DatasetService.ts +++ b/api-service/src/services/DatasetService.ts @@ -344,7 +344,7 @@ class DatasetService { private createHudiDataSource = async (draftDataset: Record, transaction: Transaction) => { const {created_by, updated_by} = draftDataset; - const allFields = await tableGenerator.getAllFields(draftDataset, "datalake"); + const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake"); const draftDatasource = this.createDraftDatasource(draftDataset, "datalake"); const ingestionSpec = tableGenerator.getHudiIngestionSpecForCreate(draftDataset, allFields, draftDatasource.datasource_ref); _.set(draftDatasource, "ingestion_spec", ingestionSpec) @@ -356,7 +356,7 @@ class DatasetService { private updateHudiDataSource = async (draftDataset: Record, transaction: Transaction) => { const {created_by, updated_by} = draftDataset; - const allFields = await tableGenerator.getAllFields(draftDataset, "datalake"); + const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake"); const draftDatasource = this.createDraftDatasource(draftDataset, "datalake"); const dsId = _.join([draftDataset.dataset_id, "events", "datalake"], "_") const liveDatasource = await Datasource.findOne({ where: { id: dsId }, attributes: ["ingestion_spec"], raw: true }) as unknown as Record diff --git a/api-service/src/services/TableGenerator.ts b/api-service/src/services/TableGenerator.ts index fd20afc2..e749df50 100644 --- a/api-service/src/services/TableGenerator.ts +++ b/api-service/src/services/TableGenerator.ts @@ -15,7 +15,7 @@ class BaseTableGenerator { const properties: Record[] = [] const flatten = (schema: Record, prev: string | undefined, prevExpr: string | undefined) => { _.mapKeys(schema, function (value, parentKey) { - const newKey = (prev) ? _.join([prev, parentKey], ".") : parentKey; + const newKey = (prev) ? type === "druid" ? _.join([prev, parentKey], ".") : _.join([prev, parentKey], "_") : parentKey; const newExpr = (prevExpr) ? _.join([prevExpr, ".['", parentKey, "']"], "") : _.join(["$.['", parentKey, "']"], ""); switch (value["type"]) { case "object": @@ -24,7 +24,7 @@ class BaseTableGenerator { case "array": if (type === "druid" && _.get(value, "items.type") == "object" && _.get(value, "items.properties")) { _.mapKeys(_.get(value, "items.properties"), function (value, childKey) { - const objChildKey = _.join([newKey, childKey], ".") + const objChildKey = type === "druid" ? _.join([newKey, childKey], ".") : _.join([newKey, childKey], "_") properties.push(_.merge(_.pick(value, ["type", "arrival_format", "is_deleted"]), { expr: _.join([newExpr, "[*].['", childKey, "']"], ""), name: objChildKey, data_type: "array" })) }) } else { @@ -79,6 +79,38 @@ class BaseTableGenerator { _.remove(dataFields, { is_deleted: true }) // Delete all the excluded fields return dataFields; } + + getAllFieldsHudi = async (dataset: Record, type: string): Promise[]> => { + + const { data_schema, denorm_config, transformations_config } = dataset + let dataFields = this.flattenSchema(data_schema, type); + if (!_.isEmpty(denorm_config.denorm_fields)) { + for (const denormField of denorm_config.denorm_fields) { + const denormDataset: any = await datasetService.getDataset(denormField.dataset_id, ["data_schema"], true); + const properties = this.flattenSchema(denormDataset.data_schema, type); + const transformProps = _.map(properties, (prop) => { + _.set(prop, "name", _.join([denormField.denorm_out_field, prop.name], "_")); + _.set(prop, "expr", _.replace(prop.expr, "$", "$." + denormField.denorm_out_field)); + return prop; + }); + dataFields.push(...transformProps); + } + } + if (!_.isEmpty(transformations_config)) { + const transformationFields = _.map(transformations_config, (tf) => ({ + expr: "$." + tf.field_key, + name: tf.field_key, + data_type: tf.transformation_function.datatype, + arrival_format: tf.transformation_function.datatype, + type: tf.transformation_function.datatype + })) + const originalFields = _.differenceBy(dataFields, transformationFields, "name") + dataFields = _.concat(originalFields, transformationFields) + } + dataFields.push(rawIngestionSpecDefaults.hudiSynctsField) + _.remove(dataFields, { is_deleted: true }) // Delete all the excluded fields + return dataFields; + } } class TableGenerator extends BaseTableGenerator { @@ -92,8 +124,8 @@ class TableGenerator extends BaseTableGenerator { "spec": { "dataSchema": { "dataSource": datasourceRef, - "dimensionsSpec": { "dimensions": this.getDruidDimensions(allFields, this.getTimestampKey(dataset), dataset_config.keys_config.partition_key) }, - "timestampSpec": { "column": this.getTimestampKey(dataset), "format": "auto" }, + "dimensionsSpec": { "dimensions": this.getDruidDimensions(allFields, this.getTimestampKey(dataset, "druid"), dataset_config.keys_config.partition_key) }, + "timestampSpec": { "column": this.getTimestampKey(dataset, "druid"), "format": "auto" }, "metricsSpec": [], "granularitySpec": ingestionSpecDefaults.granularitySpec }, @@ -158,12 +190,12 @@ class TableGenerator extends BaseTableGenerator { const primaryKey = this.getPrimaryKey(dataset); const partitionKey = this.getHudiPartitionKey(dataset); - const timestampKey = this.getTimestampKey(dataset); + const timestampKey = this.getTimestampKey(dataset, "datalake"); return { dataset: dataset.dataset_id, schema: { - table: _.includes(datasourceRef, '-') - ? _.replace(datasourceRef, /-/g, '_') + table: _.includes(datasourceRef, "-") + ? _.replace(datasourceRef, /-/g, "_") : datasourceRef, partitionColumn: partitionKey, timestampColumn: timestampKey, @@ -200,6 +232,7 @@ class TableGenerator extends BaseTableGenerator { return newHudiSpec; } + // eslint-disable-next-line private getHudiColumnSpec = (allFields: Record[], primaryKey: string, partitionKey: string, timestampKey: string): Record[] => { const dataFields = _.cloneDeep(allFields); @@ -211,7 +244,7 @@ class TableGenerator extends BaseTableGenerator { "index": index++ } }) - _.each(rawIngestionSpecDefaults.dimensions, (field) => { + _.each(rawIngestionSpecDefaults.hudi_dimensions, (field) => { transformFields.push({ "type": field.type, "name": field.name, @@ -278,20 +311,25 @@ class TableGenerator extends BaseTableGenerator { name: field.name } }), - rawIngestionSpecDefaults.flattenSpec + rawIngestionSpecDefaults.hudi_flattenSpec ) } private getPrimaryKey = (dataset: Record): string => { - return dataset.dataset_config.keys_config.data_key; + return _.replace(dataset.dataset_config.keys_config.data_key, ".", "_"); } private getHudiPartitionKey = (dataset: Record): string => { - return dataset.dataset_config.keys_config.partition_key || dataset.dataset_config.keys_config.timestamp_key; + const partitionKey = dataset.dataset_config.keys_config.partition_key || dataset.dataset_config.keys_config.timestamp_key; + return _.replace(partitionKey, ".", "_") } - private getTimestampKey = (dataset: Record): string => { - return dataset.dataset_config.keys_config.timestamp_key; + private getTimestampKey = (dataset: Record, type: string): string => { + const timestamp = dataset.dataset_config.keys_config.timestamp_key; + if (type === "druid") { + return timestamp; + } + return _.replace(timestamp, ".", "_"); } }