Skip to content

Commit

Permalink
#OBS-I119: reading entire table name from the context of request body (
Browse files Browse the repository at this point in the history
  • Loading branch information
yashashkumar authored Jul 15, 2024
1 parent f8acc9b commit 639a3a3
Showing 1 changed file with 17 additions and 33 deletions.
50 changes: 17 additions & 33 deletions api-service/src/validators/QueryValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class QueryValidator implements IValidator {
switch (id) {
case routesConfig.query.native_query.api_id:
validationStatus = await this.validateNativeQuery(data)
dataSource = this.getDataSource(data)
dataSource = data?.context?.dataSource || this.getDataSource(data)
shouldSkip = _.includes(config.exclude_datasource_validation, dataSource);
return validationStatus.isValid ? (shouldSkip ? validationStatus : this.setDatasourceRef(dataSource, data)) : validationStatus
case routesConfig.query.sql_query.api_id:
Expand Down Expand Up @@ -63,8 +63,8 @@ export class QueryValidator implements IValidator {
if (!isFromClausePresent) {
return { isValid: false, message: "Invalid SQL Query", code: httpStatus["400_NAME"] };
}
const fromIndex = query.search(fromClause);
const dataset = query.substring(fromIndex + 4).trim().split(/\s+/)[0].replace(/\\/g, "");
const fromIndex = query.search(fromClause);
const dataset = query.substring(fromIndex + 4).trim().split(/\s+/)[0].replace(/\\/g, "");
if (_.isEmpty(dataset)) {
return { isValid: false, message: "Dataset name must be present in the SQL Query", code: httpStatus["400_NAME"] };
}
Expand All @@ -73,7 +73,7 @@ export class QueryValidator implements IValidator {
let dataSourceLimits = this.getDataSourceLimits(datasource);
return (!_.isEmpty(dataSourceLimits)) ? this.validateQueryRules(data, dataSourceLimits.queryRules.scan) : { isValid: true };
} catch (error: any) {
return { isValid: false, message: error.message || "error ocuured while validating SQL query", code: error.code || httpStatus[ "500_NAME" ] };
return { isValid: false, message: error.message || "error ocuured while validating SQL query", code: error.code || httpStatus["500_NAME"] };
}
}

Expand All @@ -86,7 +86,7 @@ export class QueryValidator implements IValidator {
fromDate = moment(extractedDateRange[0], this.momentFormat);
toDate = moment(extractedDateRange[1], this.momentFormat);
} else {
let query = queryPayload.querySql.query;
let query = queryPayload.querySql.query;
query = query.toUpperCase().replace(/\s+/g, " ").trim();
let vocabulary = query.split(/\s+/);
let fromDateIndex = vocabulary.indexOf("TIMESTAMP");
Expand Down Expand Up @@ -149,7 +149,7 @@ export class QueryValidator implements IValidator {
const vocabulary = queryPayload.querySql.query.split(/\s+/); // Splitting the query by whitespace
const queryLimitIndex = vocabulary.findIndex(word => limitClause.test(word));
const queryLimit = Number(vocabulary[queryLimitIndex + 1]);

if (isNaN(queryLimit)) {
// If "LIMIT" clause doesn't exist or its value is not a number, update the query
const updatedVocabulary = [...vocabulary, "LIMIT", limits.maxResultRowLimit];
Expand All @@ -173,55 +173,39 @@ export class QueryValidator implements IValidator {
}
public async setDatasourceRef(dataSource: string, payload: any): Promise<ValidationStatus> {
try {
const granularity = _.get(payload, 'context.granularity')
const dataSourceType = _.get(payload, 'context.dataSourceType', config.query_api.druid.queryType)
let dataSourceRef = await this.getDataSourceRef(dataSource, granularity, dataSourceType);
if(dataSourceType === config.query_api.druid.queryType) await this.validateDatasource(dataSourceRef)
let dataSourceRef = await this.getDataSourceRef(dataSource, dataSourceType);
if (dataSourceType === config.query_api.druid.queryType) await this.validateDatasource(dataSourceRef)
if (payload?.querySql && dataSourceType === config.query_api.druid.queryType) {
payload.querySql.query = payload.querySql.query.replace(dataSource, dataSourceRef)
}
else if(payload?.querySql && dataSourceType === config.query_api.lakehouse.queryType) {
else if (payload?.querySql && dataSourceType === config.query_api.lakehouse.queryType) {
// hudi tables doesn't support table names contain '-' so we need to replace it with '_'
payload.querySql.query = payload.querySql.query.replace(dataSource, dataSourceRef).replace(/"/g, "").replace(/-/g, "_")
let modifiedDataSource = dataSourceRef.replace(/-/g, "_")
payload.querySql.query = payload.querySql.query.replace(dataSource, modifiedDataSource).replace(/"/g, "")
}
else {
payload.query.dataSource = dataSourceRef
}
return { isValid: true };
} catch (error: any) {
return { isValid: false, message: error.message || "error ocuured while fetching datasource record", code: error.code || httpStatus[ "400_NAME" ] };
return { isValid: false, message: error.message || "error ocuured while fetching datasource record", code: error.code || httpStatus["400_NAME"] };
}
}

public async getDataSourceRef(datasource: string, granularity: string | undefined, dataSourceType: string): Promise<string> {
public async getDataSourceRef(datasource: string, dataSourceType: string): Promise<string> {
let storageType = dataSourceType === config.query_api.lakehouse.queryType ? config.datasource_storage_types.datalake : config.datasource_storage_types.druid
const records: any = await dbConnector.readRecords("datasources", { "filters": { "dataset_id": datasource, "type": storageType } })
const records: any = await dbConnector.readRecords("datasources", { "filters": { "datasource": datasource, "type": storageType } })
if (records.length == 0) {
const error = { ...constants.INVALID_DATASOURCE }
error.message = error.message.replace('${datasource}', datasource)
throw error
}
if(storageType === config.datasource_storage_types.datalake) return `${config.query_api.lakehouse.catalog}.${config.query_api.lakehouse.schema}.${records[0].datasource_ref}_ro`
const record = records.filter((record: any) => {
const aggregatedRecord = _.get(record, "metadata.aggregated")
if(granularity)
return aggregatedRecord && _.get(record, "metadata.granularity") === granularity;
else
return !aggregatedRecord
});

if (record.length == 0) {
const error = { ...constants.INVALID_DATASOURCE }
error.message = error.message.replace('${datasource}', datasource)
if (granularity !== undefined) {
error.message = `Aggregate ${error.message}`
}
throw error
}
return record[0].datasource_ref
if (storageType === config.datasource_storage_types.datalake) return `${config.query_api.lakehouse.catalog}.${config.query_api.lakehouse.schema}.${records[0].datasource_ref}_ro`
return records[0].datasource_ref
}

private getIntervals(payload: any) {
return (typeof payload.intervals == 'object' && !Array.isArray(payload.intervals)) ? payload.intervals.intervals : payload.intervals
}
}
}

0 comments on commit 639a3a3

Please sign in to comment.