Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.0.6-GA #163

Merged
merged 14 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions api-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"multiparty": "4.2.1",
"pg": "^8.8.0",
"prom-client": "^14.2.0",
"trino-client": "^0.2.2",
"uuid": "3.1.0",
"winston": "~2.4.3",
"winston-daily-rotate-file": "~3.2.1"
Expand Down

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ export const config = {
"version": "1.0",
"query_api": {
"druid": {
"queryType": "realtime",
"host": process.env.druid_host || "http://localhost",
"port": process.env.druid_port || 8888,
"sql_query_path": "/druid/v2/sql/",
"native_query_path": "/druid/v2",
"list_datasources_path": "/druid/v2/datasources",
"submit_ingestion": "druid/indexer/v1/supervisor"
},
"lakehouse": {
"queryType": "datalake",
"host": process.env.lakehouse_host || "http://localhost",
"port": process.env.lakehouse_port || 8080,
"catalog": process.env.lakehouse_catalog || "hudi_connector",
"schema": process.env.lakehouse_schema || "obsrv",
"default_user": process.env.lakehouse_default_user || "admin"
}
},
"db_connector_config": {
Expand Down Expand Up @@ -58,6 +67,10 @@ export const config = {
normalDataset: "dataset",
masterDataset: "master-dataset"
},
"datasource_storage_types": {
druid: "druid",
datalake: "datalake"
},
"redis_config": {
"redis_host": process.env.redis_host || 'localhost',
"redis_port": process.env.redis_port || 6379
Expand Down
6 changes: 6 additions & 0 deletions api-service/src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ export const routesConfig = {
api_id: "api.health",
method: "get",
path: "/health"
},
schema_validator:{
api_id: "api.schema.validate",
method: "post",
path: "/data/v1/schema/validate",
validation_schema: "SchemaValidatorReq.json"
}
}

10 changes: 5 additions & 5 deletions api-service/src/connectors/DbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class DbConnector implements IConnector {

public async insertRecord(table: string, fields: any) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).insert(fields).on('query-error', (error: any) => {
this.log_error(OP_TYPES.INSERT, error, table, fields);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -89,7 +89,7 @@ export class DbConnector implements IConnector {
const existingRecord = await this.pool(table).select().where(filters).first()
if (!_.isUndefined(existingRecord)) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code}
Expand All @@ -100,7 +100,7 @@ export class DbConnector implements IConnector {
})
} else {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).insert(values).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -144,8 +144,8 @@ export class DbConnector implements IConnector {
})
}

private async submit_ingestion(ingestion_spec: Record<string, any>, table: string) {
if (appConfig.table_names.datasources === table) {
private async submit_ingestion(ingestion_spec: Record<string, any>, table: string, storage_type: string) {
if (appConfig.table_names.datasources === table && storage_type === appConfig.datasource_storage_types.druid) {
return await wrapperService.submitIngestion(ingestion_spec)
.catch((error: any) => {
console.error(constants.INGESTION_FAILED_ON_SAVE)
Expand Down
4 changes: 3 additions & 1 deletion api-service/src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class Datasources {
private id: string
private dataset_id: string
private ingestion_spec: object
private type: string
private datasource: string
private datasource_ref: string
private retention_period: object
Expand All @@ -30,6 +31,7 @@ export class Datasources {
}
this.dataset_id = payload.dataset_id
this.ingestion_spec = payload.ingestion_spec
this.type = payload.type
this.datasource = payload.datasource
this.datasource_ref = payload.datasource_ref
this.retention_period = payload.retentionPeriod
Expand All @@ -44,7 +46,7 @@ export class Datasources {
this.metadata = payload.metadata
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, type: this.type, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
}

public setValues() {
Expand Down
51 changes: 51 additions & 0 deletions api-service/src/helpers/LakehouseUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Trino, BasicAuth } from 'trino-client';
import _ from 'lodash';
import { config } from '../configs/Config';

const trino: Trino = Trino.create({
server: `${config.query_api.lakehouse.host}:${config.query_api.lakehouse.port}`,
catalog: config.query_api.lakehouse.catalog,
schema: config.query_api.lakehouse.schema,
auth: new BasicAuth(config.query_api.lakehouse.default_user),
});


const getFormattedData = (data: any[], columnData: any[]) => {
const formattedData: any[] = [];
for (let i = 0; i < data.length; i++) {
const row = data[ i ];
const jsonRow: any = {};
for (let j = 0; j < row.length; j++) {
// assign column only if doesn't start with _hoodie_
const colName = columnData[ j ];
if (_.startsWith(colName, "_hoodie_")) {
continue;
}
jsonRow[ colName ] = row[ j ];
}
formattedData.push(jsonRow);
}
return formattedData;
}


export const executeLakehouseQuery = async (query: string) => {
const iter = await trino.query(query);
let queryResult: any = []
for await (let data of iter) {
if(!_.isEmpty(data.error)){
throw {
status: 400,
message: data.error.message.replace(/line.*: /, ''),
code: "BAD_REQUEST"
}
}
queryResult = [ ...queryResult, ...(data?.data?.length ? data.data : []) ]
}
let columns = await iter.map((r: any) => r.columns ?? []).next();
let finalColumns = columns.value.map((column: any) => {
return column.name;
});
const formattedData = getFormattedData(queryResult, finalColumns);
return formattedData
}
12 changes: 12 additions & 0 deletions api-service/src/helpers/ValidationService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Ajv from "ajv";
const validator = new Ajv({ strict: false });

export const schemaValidation = (payload: Record<string, any>, schema: Record<string, any>): Record<string, any> => {
const isValid = validator.validate(schema, payload)
if (!isValid) {
const error: any = validator.errors;
const errorMessage = error[0]?.schemaPath?.replace("/", "") + " " + error[0]?.message || "Invalid Request Body";
return { isValid, message: errorMessage }
}
return { isValid, message: "success" }
}
7 changes: 6 additions & 1 deletion api-service/src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,10 @@
"code": "BAD_REQUEST"
},
"INGESTION_SUBMITTED": "ingestion spec has been submitted successfully",
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved"
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved",
"FAILED_EXECUTING_QUERY": {
"status": 500,
"message": "Something went wrong while executing the query. Please try again later.",
"code": "INTERNAL_SERVER_ERROR"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"metadata": {
"aggregated": false,
"granularity": "day"
}
},
"type": "druid"
}
Loading