Skip to content

Commit

Permalink
Merge branch 'develop' into update-dockerfile
Browse files Browse the repository at this point in the history
  • Loading branch information
ManojKrishnaChintaluri authored Jan 18, 2024
2 parents d6da054 + 89db459 commit b48e31a
Show file tree
Hide file tree
Showing 12 changed files with 872 additions and 607 deletions.
160 changes: 109 additions & 51 deletions api-service/postman-collection/Obsrv API Service.postman_collection.json

Large diffs are not rendered by default.

96 changes: 51 additions & 45 deletions api-service/src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
@@ -1,125 +1,131 @@
export const routesConfig = {
default: {
api_id: "obsrv.api",
api_id: "api",
validation_schema: null,
},
query: {
native_query: {
api_id: "obsrv.native.query",
api_id: "native.query",
method: "post",
path: "/obsrv/v1/data/query",
path: "/data/v1/query",
validation_schema: "QueryRequest.json",
},
native_query_with_params: {
api_id: "obsrv.native.query",
api_id: "native.query",
method: "post",
path: "/obsrv/v1/data/query/:datasetId",
path: "/data/v1/query/:datasetId",
validation_schema: "QueryRequest.json",
},
sql_query: {
api_id: "obsrv.sql.query",
api_id: "sql.query",
method: "post",
path: "/obsrv/v1/data/sql-query",
path: "/data/v1/sql-query",
validation_schema: "QueryRequest.json",
},
sql_query_with_params: {
api_id: "obsrv.sql.query",
api_id: "sql.query",
method: "post",
path: "/obsrv/v1/data/sql-query/:datasetId",
path: "/data/v1/sql-query/:datasetId",
validation_schema: "QueryRequest.json",
},
},
config: {
dataset: {
save: {
api_id: "obsrv.config.dataset.create",
api_id: "config.dataset.create",
method: "post",
path: "/obsrv/v1/datasets/create",
path: "/datasets/v1/create",
validation_schema: "DatasetCreateReq.json",
},
read: {
api_id: "obsrv.config.dataset.read",
api_id: "config.dataset.read",
method: "get",
path: "/obsrv/v1/datasets/get/:datasetId",
path: "/datasets/v1/get/:datasetId",
validation_schema: null,
},
update: {
api_id: "obsrv.config.dataset.update",
api_id: "config.dataset.update",
method: "patch",
path: "/obsrv/v1/datasets/update",
path: "/datasets/v1/update",
validation_schema: "DatasetUpdateReq.json",
},
list: {
api_id: "obsrv.config.dataset.list",
api_id: "config.dataset.list",
method: "post",
path: "/obsrv/v1/datasets/list",
path: "/datasets/v1/list",
validation_schema: "DatasetListReq.json",
},
},
datasource: {
save: {
api_id: "obsrv.config.datasource.create",
api_id: "config.datasource.create",
method: "post",
path: "/obsrv/v1/datasources/create",
path: "/datasources/v1/create",
validation_schema: "DatasourceSaveReq.json",
},
read: {
api_id: "obsrv.config.datasource.read",
api_id: "config.datasource.read",
method: "get",
path: "/obsrv/v1/datasources/get/:datasourceId",
path: "/datasources/v1/get/:datasourceId",
validation_schema: null,
},
update: {
api_id: "obsrv.config.datasource.update",
api_id: "config.datasource.update",
method: "patch",
path: "/obsrv/v1/datasources/update",
path: "/datasources/v1/update",
validation_schema: "DatasourceUpdateReq.json",
},
list: {
api_id: "obsrv.config.datasource.list",
api_id: "config.datasource.list",
method: "post",
path: "/obsrv/v1/datasources/list",
path: "/datasources/v1/list",
validation_schema: "DatasetListReq.json",
},
},
dataset_source_config: {
save: {
api_id: "obsrv.config.dataset.source.config.create",
api_id: "config.dataset.source.config.create",
method: "post",
path: "/obsrv/v1/datasets/source/config/create",
path: "/datasets/v1/source/config/create",
validation_schema: "DatasetSourceConfigSaveReq.json",
},
read: {
api_id: "obsrv.config.dataset.source.config.read",
api_id: "config.dataset.source.config.read",
method: "get",
path: "/obsrv/v1/datasets/source/config/get/:datasetId",
path: "/datasets/v1/source/config/get/:datasetId",
validation_schema: null,
},
update: {
api_id: "obsrv.config.dataset.source.config.update",
api_id: "config.dataset.source.config.update",
method: "patch",
path: "/obsrv/v1/datasets/source/config/update",
path: "/datasets/v1/source/config/update",
validation_schema: "DatasetSourceConfigUpdateReq.json",
},
list: {
api_id: "obsrv.config.dataset.source.config.list",
api_id: "config.dataset.source.config.list",
method: "post",
path: "/obsrv/v1/datasets/source/config/list",
path: "/datasets/v1/source/config/list",
validation_schema: "DatasetListReq.json",
},
}

},
data_ingest: {
api_id: "obsrv.dataset.data.in",
api_id: "dataset.data.in",
method: "post",
path: "/obsrv/v1/data/create/:datasetId",
path: "/data/v1/in/:datasetId",
validation_schema: "DataIngestionReq.json",
},
tenant_ingest: {
api_id: "dataset.data.in",
method: "post",
path: "/data/tenant/in/:datasetId",
validation_schema: "DataIngestionReq.json",
},
exhaust: {
api_id: "obsrv.dataset.data.exhaust",
api_id: "dataset.data.exhaust",
method: "get",
path: "/obsrv/v1/data/exhaust/:datasetId",
path: "/data/v1/exhaust/:datasetId",
validation_schema: "DataExhaustReq.json"
},
prometheus: {
Expand All @@ -128,34 +134,34 @@ export const routesConfig = {
validation_schema: null,
},
submit_ingestion: {
api_id: "obsrv.submit.ingestion",
api_id: "submit.ingestion",
method: "post",
path: "/obsrv/v1/data/submit/ingestion",
path: "/data/v1/submit/ingestion",
validation_schema: "SubmitIngestionReq.json"
},
query_wrapper: {
sql_wrapper: {
api_id: "obsrv.query.wrapper.sql.query",
api_id: "query.wrapper.sql.query",
method: "post",
path: "/obsrv/v1/sql",
path: "/v1/sql",
},
native_post: {
api_id: "obsrv.query.wrapper.native.post",
api_id: "query.wrapper.native.post",
method: "post",
path: /\/druid\/v2.*/,
},
native_get: {
api_id: "obsrv.query.wrapper.native.get",
api_id: "query.wrapper.native.get",
method: "get",
path: /\/druid\/v2.*/
},
native_delete: {
api_id: "obsrv.query.wrapper.native.delete",
api_id: "query.wrapper.native.delete",
method: "delete",
path: "/druid/v2/:queryId"
},
druid_status: {
api_id: "obsrv.query.wrapper.status",
api_id: "query.wrapper.status",
method: "get",
path: "/status"
}
Expand Down
21 changes: 21 additions & 0 deletions api-service/src/helpers/Datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { defaultConfig } from '../resources/schemas/DatasetConfigDefault'
import { SchemaMerger } from '../generators/SchemaMerger'
import { config } from '../configs/Config'
import { DatasetStatus } from '../models/DatasetModels'
import constants from "../resources/Constants.json";

let schemaMerger = new SchemaMerger()
export class Datasets {
private id: string
Expand Down Expand Up @@ -47,10 +49,12 @@ export class Datasets {
}

public getValues() {
this.validateDenormConfig();
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, type: this.type, name: this.name, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, dataset_config: this.dataset_config, tags: this.tags, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
this.validateDenormConfig();
return schemaMerger.mergeSchema(this.getDefaults(), this.getValues())
}

Expand All @@ -69,4 +73,21 @@ export class Datasets {
return {...defaultConfig.dataset}
}
}

private validateDenormConfig() {
if (this.denorm_config && _.has(this.denorm_config, 'denorm_fields')) {
let duplicatesExist = false;
let denormFields: any = _.get(this.denorm_config, 'denorm_fields', []);
denormFields = _.map(denormFields, (denormField: Record<string, string | number>) => _.get(denormField, 'denorm_out_field'));
denormFields.map(
(denormField: string | number) => {
if(_.indexOf(denormFields, denormField) !== _.lastIndexOf(denormFields, denormField))
duplicatesExist = true;
}
);
if(duplicatesExist) {
throw constants.DUPLICATE_DENORM_FIELD;
}
}
}
}
5 changes: 5 additions & 0 deletions api-service/src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
"status": 404,
"code": "NOT_FOUND"
},
"DUPLICATE_DENORM_FIELD": {
"message": "Duplicate found for denorm output key",
"status": 400,
"code": "BAD_REQUEST"
},
"INGESTION_SUBMITTED": "ingestion spec has been submitted successfully",
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved"
}
1 change: 1 addition & 0 deletions api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_q

/** Ingestor API */
router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesConfig.data_ingest.api_id), telemetryAuditStart({ action: telemetryActions.ingestEvents, operationType: OperationType.CREATE }), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.create);
router.post(`${routesConfig.tenant_ingest.path}`, ResponseHandler.setApiId(routesConfig.tenant_ingest.api_id), telemetryAuditStart({ action: telemetryActions.ingestEvents, operationType: OperationType.CREATE }), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.tenant);

/** Dataset APIs */
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), telemetryAuditStart({ action: telemetryActions.createDataset, operationType: OperationType.CREATE }), validationService.validateRequestBody, datasetService.save);
Expand Down
14 changes: 14 additions & 0 deletions api-service/src/services/IngestorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ export class IngestorService {
ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.DATASET.CREATED } });
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) }
}

public tenant = async (req: Request, res: Response, next: NextFunction) => {
try {
let datasetId = this.getDatasetId(req);
const tenantId = _.get(req.headers, 'x-tenant-id', "default");
datasetId = `${tenantId}-${datasetId}`;
const validData = await this.validateData(req.body.data, datasetId);
req.body = { ...req.body.data, dataset: datasetId };
const topic = await this.getTopic(datasetId);
await this.kafkaConnector.execute(req, res, topic);
ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.DATASET.CREATED } });
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) }
}

public submitIngestion = async (req: Request, res: Response, next: NextFunction) => {
try {
await wrapperService.submitIngestion(req.body)
Expand Down
42 changes: 40 additions & 2 deletions api-service/src/test/DatasetTestService.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe("Dataset create API", () => {
res.body.params.status.should.be.eq(constants.STATUS.FAILURE)
done();
});
})
});
it("should not insert record when given invalid schema", (done) => {
chai
.request(app)
Expand All @@ -123,7 +123,45 @@ describe("Dataset create API", () => {
res.body.params.status.should.be.eq(constants.STATUS.FAILURE)
done();
});
})
});
it("should not insert the record when there's a duplicate denorm out field", (done) => {
chai.spy.on(dbConnector, "execute", () => {
return Promise.resolve([])
})
chai
.request(app)
.post(config.apiDatasetSaveEndPoint)
.send(TestDataset.DUPLICATE_DENORM_OUT_FIELD)
.end((err, res) => {
res.should.have.status(httpStatus.BAD_REQUEST);
res.body.should.be.a("object")
res.body.responseCode.should.be.eq(httpStatus["400_NAME"]);
res.body.should.have.property("result");
res.body.id.should.be.eq(routesConfig.config.dataset.save.api_id);
res.body.params.status.should.be.eq(constants.STATUS.FAILURE);
chai.spy.restore(dbConnector, "execute");
done();
});
});
it("should insert the record when there's no duplicate denorm out field", (done) => {
chai.spy.on(dbConnector, "execute", () => {
return Promise.resolve([])
})
chai
.request(app)
.post(config.apiDatasetSaveEndPoint)
.send(TestDataset.VALID_DENORM_OUT_FIELD)
.end((err, res) => {
res.should.have.status(httpStatus.OK);
res.body.should.be.a("object")
res.body.responseCode.should.be.eq(httpStatus["200_NAME"]);
res.body.should.have.property("result");
res.body.id.should.be.eq(routesConfig.config.dataset.save.api_id);
res.body.params.status.should.be.eq(constants.STATUS.SUCCESS);
chai.spy.restore(dbConnector, "execute");
done();
});
});
})
describe("Dataset update API", () => {
beforeEach(() => {
Expand Down
Loading

0 comments on commit b48e31a

Please sign in to comment.