Skip to content

Commit

Permalink
Merge pull request #275 from Sanketika-Obsrv/hudi-spec-fix
Browse files Browse the repository at this point in the history
#OBS-I335: hudi spec fix
  • Loading branch information
HarishGangula authored Nov 12, 2024
2 parents fe69a8b + 02d51ba commit 1d2fec8
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 43 deletions.
29 changes: 29 additions & 0 deletions api-service/src/configs/IngestionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ export const rawIngestionSpecDefaults = {
"type": "text",
"expr": "$.obsrv_meta.syncts"
},
"hudiSynctsField": {
"name": "obsrv_meta_syncts",
"arrival_format": "text",
"data_type": "date",
"type": "text",
"expr": "$.obsrv_meta.syncts"
},
"dimensions": [
{
"type": "string",
Expand All @@ -64,6 +71,16 @@ export const rawIngestionSpecDefaults = {
"name": "obsrv.meta.source.id"
}
],
"hudi_dimensions": [
{
"type": "string",
"name": "obsrv_meta_source_connector"
},
{
"type": "string",
"name": "obsrv_meta_source_id"
}
],
"flattenSpec": [
{
"type": "path",
Expand All @@ -75,5 +92,17 @@ export const rawIngestionSpecDefaults = {
"expr": "$.obsrv_meta.source.connectorInstance",
"name": "obsrv.meta.source.id"
}
],
"hudi_flattenSpec": [
{
"type": "path",
"expr": "$.obsrv_meta.source.connector",
"name": "obsrv_meta_source_connector"
},
{
"type": "path",
"expr": "$.obsrv_meta.source.connectorInstance",
"name": "obsrv_meta_source_id"
}
]
}
111 changes: 84 additions & 27 deletions api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ const datasetUpdate = async (req: Request, res: Response) => {

const mergeDraftDataset = (datasetModel: Model<any, any> | null, datasetReq: any): Record<string, any> => {

const cache_config = _.get(datasetModel, ["dataset_config", "cache_config"])
const currentSchema = _.get(datasetModel, 'data_schema')
const fieldsRemoved = (datasetReq.data_schema) ? getMissingFieldsInNewSchema(datasetReq.data_schema, currentSchema) : []
const prev_dataset_config = _.get(datasetModel, ["dataset_config"])
const dataset: Record<string, any> = {
version_key: Date.now().toString(),
name: datasetReq.name || _.get(datasetModel, ["name"]),
Expand All @@ -77,45 +79,100 @@ const mergeDraftDataset = (datasetModel: Model<any, any> | null, datasetReq: any
if(datasetReq.extraction_config) dataset["extraction_config"] = datasetReq.extraction_config
if(datasetReq.dedup_config) dataset["dedup_config"] = datasetReq.dedup_config
if(datasetReq.data_schema) dataset["data_schema"] = datasetReq.data_schema
if(datasetReq.dataset_config) dataset["dataset_config"] = { ...datasetReq.dataset_config, cache_config }
if(datasetReq.transformations_config)
dataset["transformations_config"] = mergeTransformationsConfig(_.get(datasetModel, ["transformations_config"]), datasetReq.transformations_config)
if(datasetReq.denorm_config) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config)
if(datasetReq.dataset_config) dataset["dataset_config"] = { ...prev_dataset_config, ...datasetReq.dataset_config }
if(datasetReq.transformations_config || fieldsRemoved.length > 0)
dataset["transformations_config"] = mergeTransformationsConfig(_.get(datasetModel, ["transformations_config"]), datasetReq.transformations_config, fieldsRemoved)
if(datasetReq.denorm_config || fieldsRemoved.length > 0) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config, fieldsRemoved)
if(datasetReq.connectors_config) dataset["connectors_config"] = mergeConnectorsConfig(_.get(datasetModel, ["connectors_config"]), datasetReq.connectors_config)
if(datasetReq.tags) dataset["tags"] = mergeTags(_.get(datasetModel, ["tags"]), datasetReq.tags)
if(datasetReq.sample_data) dataset["sample_data"] = datasetReq.sample_data
if(datasetReq.type) dataset["type"] = datasetReq.type

if(fieldsRemoved.length > 0) {
const keys_config = _.get(dataset["dataset_config"] ? dataset["dataset_config"] : prev_dataset_config, ["keys_config"])
if(keys_config['data_key'] in fieldsRemoved) {
keys_config['data_key'] = '';
}
if(keys_config['primary_key'] in fieldsRemoved) {
keys_config['primary_key'] = '';
}
if(keys_config['timestamp_key'] in fieldsRemoved) {
keys_config['timestamp_key'] = '';
}
_.set(dataset["dataset_config"], 'keys_config', keys_config)
}
return dataset;
}

const mergeTransformationsConfig = (currentConfigs: any, newConfigs: any) => {
const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.field_key")
const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value")
const getMissingFieldsInNewSchema = (newSchema: any, oldSchema: any) => {

const getRemovedPropertiesFieldsNested = (oldProperties: Record<string, any>, newProperties: Record<string, any>, path: string[] = []): string[] => {
let removedFields: string[] = [];
for (const key in oldProperties) {
const fullPath = [...path, key].join('.');
if (!(key in newProperties)) {
removedFields.push(fullPath);
} else if (typeof oldProperties[key] === 'object' && typeof newProperties[key] === 'object') {
removedFields = removedFields.concat(
getRemovedPropertiesFieldsNested(oldProperties[key].properties || {}, newProperties[key].properties || {}, [...path, key])
);
}
}

return removedFields;
}

const getRemovedPropertiesFields = (oldSchema: any, newSchema: any): string[] => {
const oldProperties = oldSchema.properties || {};
const newProperties = newSchema.properties || {};
return getRemovedPropertiesFieldsNested(oldProperties, newProperties);
}

// Example usage
const removedFieldsNested = getRemovedPropertiesFields(oldSchema, newSchema);
return removedFieldsNested
}

return _.unionWith(
addConfigs,
_.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key)}),
(a, b) => {
return a.field_key === b.field_key
}
)
const mergeTransformationsConfig = (currentConfigs: any, newConfigs: any, fieldsRemoved: string[]) => {

let updatedConfigs = currentConfigs;
if(newConfigs) {
const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.field_key")
const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value")

updatedConfigs = _.unionWith(
addConfigs,
_.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key)}),
(a, b) => {
return a.field_key === b.field_key
}
)
}
if(fieldsRemoved.length > 0) {
updatedConfigs = _.reject(updatedConfigs, (config) => { return _.includes(fieldsRemoved, config.field_key)})
}
return updatedConfigs
}

const mergeDenormConfig = (currentConfig: any, newConfig: any) => {
const mergeDenormConfig = (currentConfig: any, newConfig: any, fieldsRemoved: string[]) => {

const removeConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "remove"}), "value.denorm_out_field")
const addConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "upsert"}), "value")
let updatedConfigs = currentConfig.denorm_fields;
if(newConfig) {
const removeConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "remove"}), "value.denorm_out_field")
const addConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "upsert"}), "value")

const denormFields = _.unionWith(
addConfigs,
_.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field)}),
(a, b) => {
return a.denorm_out_field === b.denorm_out_field
}
)
const denormFields = _.unionWith(
addConfigs,
_.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field)}),
(a, b) => {
return a.denorm_out_field === b.denorm_out_field
}
)
}
if(fieldsRemoved.length > 0) {
updatedConfigs = _.reject(currentConfig.denorm_fields, (config) => { return _.includes(fieldsRemoved, config.denorm_key)})
}
return {
denorm_fields: denormFields
denorm_fields: updatedConfigs
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
"additionalProperties": false
}
},
"required": ["indexing_config", "keys_config"],
"required": [],
"additionalProperties": false
},
"transformations_config": {
Expand Down
4 changes: 2 additions & 2 deletions api-service/src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class DatasetService {
private createHudiDataSource = async (draftDataset: Record<string, any>, transaction: Transaction) => {

const {created_by, updated_by} = draftDataset;
const allFields = await tableGenerator.getAllFields(draftDataset, "datalake");
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake");
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
const ingestionSpec = tableGenerator.getHudiIngestionSpecForCreate(draftDataset, allFields, draftDatasource.datasource_ref);
_.set(draftDatasource, "ingestion_spec", ingestionSpec)
Expand All @@ -356,7 +356,7 @@ class DatasetService {
private updateHudiDataSource = async (draftDataset: Record<string, any>, transaction: Transaction) => {

const {created_by, updated_by} = draftDataset;
const allFields = await tableGenerator.getAllFields(draftDataset, "datalake");
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake");
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
const dsId = _.join([draftDataset.dataset_id, "events", "datalake"], "_")
const liveDatasource = await Datasource.findOne({ where: { id: dsId }, attributes: ["ingestion_spec"], raw: true }) as unknown as Record<string, any>
Expand Down
64 changes: 51 additions & 13 deletions api-service/src/services/TableGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BaseTableGenerator {
const properties: Record<string, any>[] = []
const flatten = (schema: Record<string, any>, prev: string | undefined, prevExpr: string | undefined) => {
_.mapKeys(schema, function (value, parentKey) {
const newKey = (prev) ? _.join([prev, parentKey], ".") : parentKey;
const newKey = (prev) ? type === "druid" ? _.join([prev, parentKey], ".") : _.join([prev, parentKey], "_") : parentKey;
const newExpr = (prevExpr) ? _.join([prevExpr, ".['", parentKey, "']"], "") : _.join(["$.['", parentKey, "']"], "");
switch (value["type"]) {
case "object":
Expand All @@ -24,7 +24,7 @@ class BaseTableGenerator {
case "array":
if (type === "druid" && _.get(value, "items.type") == "object" && _.get(value, "items.properties")) {
_.mapKeys(_.get(value, "items.properties"), function (value, childKey) {
const objChildKey = _.join([newKey, childKey], ".")
const objChildKey = type === "druid" ? _.join([newKey, childKey], ".") : _.join([newKey, childKey], "_")
properties.push(_.merge(_.pick(value, ["type", "arrival_format", "is_deleted"]), { expr: _.join([newExpr, "[*].['", childKey, "']"], ""), name: objChildKey, data_type: "array" }))
})
} else {
Expand Down Expand Up @@ -79,6 +79,38 @@ class BaseTableGenerator {
_.remove(dataFields, { is_deleted: true }) // Delete all the excluded fields
return dataFields;
}

getAllFieldsHudi = async (dataset: Record<string, any>, type: string): Promise<Record<string, any>[]> => {

const { data_schema, denorm_config, transformations_config } = dataset
let dataFields = this.flattenSchema(data_schema, type);
if (!_.isEmpty(denorm_config.denorm_fields)) {
for (const denormField of denorm_config.denorm_fields) {
const denormDataset: any = await datasetService.getDataset(denormField.dataset_id, ["data_schema"], true);
const properties = this.flattenSchema(denormDataset.data_schema, type);
const transformProps = _.map(properties, (prop) => {
_.set(prop, "name", _.join([denormField.denorm_out_field, prop.name], "_"));
_.set(prop, "expr", _.replace(prop.expr, "$", "$." + denormField.denorm_out_field));
return prop;
});
dataFields.push(...transformProps);
}
}
if (!_.isEmpty(transformations_config)) {
const transformationFields = _.map(transformations_config, (tf) => ({
expr: "$." + tf.field_key,
name: tf.field_key,
data_type: tf.transformation_function.datatype,
arrival_format: tf.transformation_function.datatype,
type: tf.transformation_function.datatype
}))
const originalFields = _.differenceBy(dataFields, transformationFields, "name")
dataFields = _.concat(originalFields, transformationFields)
}
dataFields.push(rawIngestionSpecDefaults.hudiSynctsField)
_.remove(dataFields, { is_deleted: true }) // Delete all the excluded fields
return dataFields;
}
}

class TableGenerator extends BaseTableGenerator {
Expand All @@ -92,8 +124,8 @@ class TableGenerator extends BaseTableGenerator {
"spec": {
"dataSchema": {
"dataSource": datasourceRef,
"dimensionsSpec": { "dimensions": this.getDruidDimensions(allFields, this.getTimestampKey(dataset), dataset_config.keys_config.partition_key) },
"timestampSpec": { "column": this.getTimestampKey(dataset), "format": "auto" },
"dimensionsSpec": { "dimensions": this.getDruidDimensions(allFields, this.getTimestampKey(dataset, "druid"), dataset_config.keys_config.partition_key) },
"timestampSpec": { "column": this.getTimestampKey(dataset, "druid"), "format": "auto" },
"metricsSpec": [],
"granularitySpec": ingestionSpecDefaults.granularitySpec
},
Expand Down Expand Up @@ -158,12 +190,12 @@ class TableGenerator extends BaseTableGenerator {

const primaryKey = this.getPrimaryKey(dataset);
const partitionKey = this.getHudiPartitionKey(dataset);
const timestampKey = this.getTimestampKey(dataset);
const timestampKey = this.getTimestampKey(dataset, "datalake");
return {
dataset: dataset.dataset_id,
schema: {
table: _.includes(datasourceRef, '-')
? _.replace(datasourceRef, /-/g, '_')
table: _.includes(datasourceRef, "-")
? _.replace(datasourceRef, /-/g, "_")
: datasourceRef,
partitionColumn: partitionKey,
timestampColumn: timestampKey,
Expand Down Expand Up @@ -200,6 +232,7 @@ class TableGenerator extends BaseTableGenerator {
return newHudiSpec;
}

// eslint-disable-next-line
private getHudiColumnSpec = (allFields: Record<string, any>[], primaryKey: string, partitionKey: string, timestampKey: string): Record<string, any>[] => {

const dataFields = _.cloneDeep(allFields);
Expand All @@ -211,7 +244,7 @@ class TableGenerator extends BaseTableGenerator {
"index": index++
}
})
_.each(rawIngestionSpecDefaults.dimensions, (field) => {
_.each(rawIngestionSpecDefaults.hudi_dimensions, (field) => {
transformFields.push({
"type": field.type,
"name": field.name,
Expand Down Expand Up @@ -278,20 +311,25 @@ class TableGenerator extends BaseTableGenerator {
name: field.name
}
}),
rawIngestionSpecDefaults.flattenSpec
rawIngestionSpecDefaults.hudi_flattenSpec
)
}

private getPrimaryKey = (dataset: Record<string, any>): string => {
return dataset.dataset_config.keys_config.data_key;
return _.replace(dataset.dataset_config.keys_config.data_key, ".", "_");
}

private getHudiPartitionKey = (dataset: Record<string, any>): string => {
return dataset.dataset_config.keys_config.partition_key || dataset.dataset_config.keys_config.timestamp_key;
const partitionKey = dataset.dataset_config.keys_config.partition_key || dataset.dataset_config.keys_config.timestamp_key;
return _.replace(partitionKey, ".", "_")
}

private getTimestampKey = (dataset: Record<string, any>): string => {
return dataset.dataset_config.keys_config.timestamp_key;
private getTimestampKey = (dataset: Record<string, any>, type: string): string => {
const timestamp = dataset.dataset_config.keys_config.timestamp_key;
if (type === "druid") {
return timestamp;
}
return _.replace(timestamp, ".", "_");
}
}

Expand Down

0 comments on commit 1d2fec8

Please sign in to comment.