Skip to content

Commit

Permalink
Merge pull request #251 from Sanketika-Obsrv/1.1-RC
Browse files Browse the repository at this point in the history
1.1-RC Changes
  • Loading branch information
ravismula authored Sep 24, 2024
2 parents a161d04 + e5f0e07 commit 85eeb07
Show file tree
Hide file tree
Showing 28 changed files with 530 additions and 276 deletions.
2 changes: 1 addition & 1 deletion api-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ app.use("/v2/", v2Router);
app.use("/", druidProxyRouter);
app.use("/alerts/v1", alertsRouter);
app.use("/", metricRouter);
app.use("*", ResponseHandler.routeNotFound);
app.use(/(.*)/, ResponseHandler.routeNotFound);
app.use(obsrvErrorHandler);

app.listen(config.api_port, () => {
Expand Down
16 changes: 13 additions & 3 deletions api-service/src/controllers/Alerts/Alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const telemetryObject = { type: "alert", ver: "1.0.0" };
const createAlertHandler = async (req: Request, res: Response, next: NextFunction) => {
try {
const alertPayload = getAlertPayload(req.body);
const userID = (req as any)?.userID;
_.set(alertPayload, "created_by", userID);
_.set(alertPayload, "updated_by", userID);
const response = await Alert.create(alertPayload);
updateTelemetryAuditEvent({ request: req, object: { id: response?.dataValues?.id, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: response.dataValues.id } });
Expand All @@ -28,8 +31,11 @@ const createAlertHandler = async (req: Request, res: Response, next: NextFunctio
const publishAlertHandler = async (req: Request, res: Response, next: NextFunction) => {
try {
const { alertId } = req.params;
const rulePayload: Record<string, any> | null = await getAlertRule(alertId);
if (!rulePayload) return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND });
const ruleModel: Record<string, any> | null = await getAlertRule(alertId);
if (!ruleModel) return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND });
const rulePayload = ruleModel.toJSON();
const userID = (req as any)?.userID;
_.set(rulePayload, "updated_by", userID);
if (rulePayload.status == "live") {
await deleteAlertRule(rulePayload, false);
}
Expand Down Expand Up @@ -87,6 +93,8 @@ const deleteAlertHandler = async (req: Request, res: Response, next: NextFunctio
return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND });
}
const rulePayload = ruleModel.toJSON();
const userID = (req as any)?.userID || "SYSTEM";
_.set(rulePayload, "updated_by", userID);
await deleteAlertRule(rulePayload, hardDelete === "true");
updateTelemetryAuditEvent({ request: req, currentRecord: rulePayload, object: { id: alertId, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: alertId } });
Expand All @@ -103,12 +111,14 @@ const updateAlertHandler = async (req: Request, res: Response, next: NextFunctio
const ruleModel = await getAlertRule(alertId);
if (!ruleModel) { return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND }) }
const rulePayload = ruleModel.toJSON();
const userID = (req as any)?.userID;
if (rulePayload.status == "live") {
_.set(rulePayload, "updated_by", userID);
await deleteAlertRule(rulePayload, false);
await retireAlertSilence(alertId);
}
const updatedPayload = getAlertPayload({ ...req.body, manager: rulePayload?.manager });
await Alert.update({ ...updatedPayload, status: "draft" }, { where: { id: alertId } });
await Alert.update({ ...updatedPayload, status: "draft", updated_by: userID }, { where: { id: alertId } });
updateTelemetryAuditEvent({ request: req, currentRecord: rulePayload, object: { id: alertId, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: alertId } });
} catch (error: any) {
Expand Down
7 changes: 6 additions & 1 deletion api-service/src/controllers/Alerts/Silence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ const createHandler = async (request: Request, response: Response, next: NextFun

const start_date = new Date(startDate);
const end_date = new Date(endDate);
const userID = (request as any)?.userID;
const silenceBody = {
id: grafanaResponse.silenceId,
manager: grafanaResponse.manager,
alert_id: alertId,
start_time: start_date,
end_time: end_date,
created_by : userID,
updated_by : userID,
}
const sileneResponse = await Silence.create(silenceBody);
updateTelemetryAuditEvent({ request, object: { id: sileneResponse?.dataValues?.id, ...telemetryObject } });
Expand Down Expand Up @@ -78,10 +81,12 @@ const updateHandler = async (request: Request, response: Response, next: NextFun
await updateSilence(silenceObject, payload);
const updatedStartTime = new Date(payload.startTime);
const updatedEndTime = new Date(payload.endTime);
const userID = (request as any)?.userID;
const updatedSilence = {
...silenceObject,
start_time: updatedStartTime,
end_time: updatedEndTime
end_time: updatedEndTime,
updated_by: userID,
}
const silenceResponse = await Silence.update(updatedSilence, { where: { id } })
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: { silenceResponse } })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ export const createQueryTemplate = async (req: Request, res: Response) => {
}

const data = transformRequest(requestBody, templateName);
const userID = (req as any)?.userID;
_.set(data, "created_by", userID);
_.set(data, "updated_by", userID);
await QueryTemplate.create(data)
logger.info({ apiId, msgid, resmsgid, requestBody: req?.body, message: `Query template created successfully` })
return ResponseHandler.successResponse(req, res, { status: 200, data: { template_id: templateId, template_name: templateName, message: `The query template has been saved successfully` } });
Expand Down
35 changes: 27 additions & 8 deletions api-service/src/controllers/DataOut/QueryValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import * as _ from "lodash";
import moment from "moment";
import { getDatasourceList } from "../../services/DatasourceService";
import logger from "../../logger";
import { getDatasourceListFromDruid } from "../../connections/druidConnection";
import { druidHttpService, getDatasourceListFromDruid } from "../../connections/druidConnection";
import { apiId } from "./DataOutController";
import { ErrorObject } from "../../types/ResponseModel";
import { Parser } from "node-sql-parser";
import { obsrvError } from "../../types/ObsrvError";
const parser = new Parser();

const momentFormat = "YYYY-MM-DD HH:MM:SS";
let dataset_id: string;
let requestBody: any;
let msgid: string;
const errCode = {
notFound: "DATA_OUT_SOURCE_NOT_FOUND",
notFound: "DATASOURCE_NOT_FOUND",
invalidDateRange: "DATA_OUT_INVALID_DATE_RANGE"
}

Expand Down Expand Up @@ -155,23 +156,41 @@ const validateQueryRules = (queryPayload: any, limits: any) => {
: { message: "Invalid date range! the date range cannot be a null value", statusCode: 400, errCode: "BAD_REQUEST", code: errCode.invalidDateRange };
};

const getDataSourceRef = async (datasetId: string, granularity?: string) => {
const getDataSourceRef = async (datasetId: string, requestGranularity?: string) => {
const dataSources = await getDatasourceList(datasetId)
if (_.isEmpty(dataSources)) {
logger.error({ apiId, requestBody, msgid, dataset_id, message: `Datasource ${datasetId} not available in datasource live table`, code: errCode.notFound })
throw { message: `Datasource ${datasetId} not available for querying`, statusCode: 404, errCode: "NOT_FOUND", code: errCode.notFound } as ErrorObject;
}
const record = dataSources.filter((record: any) => {
const aggregatedRecord = _.get(record, "dataValues.metadata.aggregated")
if (granularity)
return aggregatedRecord && _.get(record, "dataValues.metadata.granularity") === granularity;
const record = dataSources.find((record: any) => {
const metadata = _.get(record, "dataValues.metadata", {});
const { aggregated, granularity } = metadata;
if (!aggregated) {
return true;
}
return aggregated && requestGranularity ? granularity === requestGranularity : false;
});
return record[0]?.dataValues?.datasource_ref
return _.get(record, ["dataValues", "datasource_ref"])
}

const checkSupervisorAvailability = async (datasourceRef: string) => {
const { data } = await druidHttpService.get("/druid/coordinator/v1/loadstatus");
const datasourceAvailability = _.get(data, datasourceRef)
if (_.isUndefined(datasourceAvailability)) {
throw obsrvError("", "DATASOURCE_NOT_AVAILABLE", "Datasource not available for querying", "NOT_FOUND", 404)
}
if (datasourceAvailability !== 100) {
throw obsrvError("", "DATASOURCE_NOT_FULLY_AVAILABLE", "Datasource not fully available for querying", "RANGE_NOT_SATISFIABLE", 416)
}
}

const setDatasourceRef = async (datasetId: string, payload: any): Promise<any> => {
const granularity = _.get(payload, "context.aggregationLevel")
const datasourceRef = await getDataSourceRef(datasetId, granularity);
if (!datasourceRef) {
throw obsrvError("", "DATASOURCE_NOT_FOUND", "Datasource not found to query", "NOT_FOUND", 404)
}
await checkSupervisorAvailability(datasourceRef)
const existingDatasources = await getDatasourceListFromDruid();

if (!_.includes(existingDatasources.data, datasourceRef)) {
Expand Down
3 changes: 3 additions & 0 deletions api-service/src/controllers/DatasetCopy/DatasetCopy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const datasetCopy = async (req: Request, res: Response) => {
validateRequest(req);
const newDatasetId = _.get(req, "body.request.destination.datasetId");
const dataset = await fetchDataset(req);
const userID = (req as any)?.userID;
_.set(dataset, "created_by", userID);
_.set(dataset, "updated_by", userID);
updateRecords(dataset, newDatasetId)
const response = await datasetService.createDraftDataset(dataset).catch(err => {
if (err?.name === "SequelizeUniqueConstraintError") {
Expand Down
3 changes: 3 additions & 0 deletions api-service/src/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const datasetCreate = async (req: Request, res: Response) => {

await validateRequest(req)
const draftDataset = getDraftDataset(req.body.request)
const userID = (req as any)?.userID;
_.set(draftDataset, "created_by", userID);
_.set(draftDataset, "updated_by", userID);
const dataset = await datasetService.createDraftDataset(draftDataset);
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: dataset });
}
Expand Down
8 changes: 6 additions & 2 deletions api-service/src/controllers/DatasetImport/DatasetImport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ const datasetImport = async (req: Request, res: Response) => {
const migratedConfigs = migrateExportedDatasetV1(requestBody)
datasetPayload = migratedConfigs;
}
const userID = (req as any)?.userID;
_.set(datasetPayload, "created_by", userID);
_.set(datasetPayload, "updated_by", userID);
const { updatedDataset, ignoredFields } = await datasetImportValidation({ ...requestBody, "request": datasetPayload })
const { successMsg, partialIgnored } = getResponseData(ignoredFields)

const dataset = await importDataset(updatedDataset, overwrite);
const dataset = await importDataset(updatedDataset, overwrite, userID);
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: successMsg, data: dataset, ...(!_.isEmpty(partialIgnored) && { ignoredFields: partialIgnored }) } });
}

const importDataset = async (dataset: Record<string, any>, overwrite: string | any) => {
const importDataset = async (dataset: Record<string, any>, overwrite: string | any, userID : string) => {
const dataset_id = _.get(dataset,"dataset_id")
const response = await datasetService.createDraftDataset(dataset).catch(err => { return err })
if (response?.name === "SequelizeUniqueConstraintError") {
if (overwrite === "true") {
_.set(dataset, "updated_by", userID);
const overwriteRes = await datasetService.updateDraftDataset(dataset).catch(()=>{
throw obsrvError(dataset_id, "DATASET_IMPORT_FAILURE", `Failed to import dataset: ${dataset_id} as overwrite failed`, "INTERNAL_SERVER_ERROR", 500);
})
Expand Down
9 changes: 5 additions & 4 deletions api-service/src/controllers/DatasetRead/DatasetRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ const datasetRead = async (req: Request, res: Response) => {
validateRequest(req);
const { dataset_id } = req.params;
const { fields, mode } = req.query;
const userID = (req as any)?.userID;
const attributes = !fields ? defaultFields : _.split(<string>fields, ",");
const dataset = (mode == "edit") ? await readDraftDataset(dataset_id, attributes) : await readDataset(dataset_id, attributes)
const dataset = (mode == "edit") ? await readDraftDataset(dataset_id, attributes, userID) : await readDataset(dataset_id, attributes)
if (!dataset) {
throw obsrvError(dataset_id, "DATASET_NOT_FOUND", `Dataset with the given dataset_id:${dataset_id} not found`, "NOT_FOUND", 404);
}
Expand All @@ -41,19 +42,19 @@ const datasetRead = async (req: Request, res: Response) => {
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: dataset });
}

const readDraftDataset = async (datasetId: string, attributes: string[]): Promise<any> => {
const readDraftDataset = async (datasetId: string, attributes: string[], userID: string): Promise<any> => {

const attrs = _.union(attributes, ["dataset_config", "api_version", "type", "id"])
const draftDataset = await datasetService.getDraftDataset(datasetId, attrs);
if (draftDataset) { // Contains a draft
const apiVersion = _.get(draftDataset, ["api_version"]);
const dataset: any = (apiVersion === "v2") ? draftDataset : await datasetService.migrateDraftDataset(datasetId, draftDataset)
const dataset: any = (apiVersion === "v2") ? draftDataset : await datasetService.migrateDraftDataset(datasetId, draftDataset, userID)
return _.pick(dataset, attributes);
}

const liveDataset = await datasetService.getDataset(datasetId, undefined, true);
if (liveDataset) {
const dataset = await datasetService.createDraftDatasetFromLive(liveDataset)
const dataset = await datasetService.createDraftDatasetFromLive(liveDataset, userID)
return _.pick(dataset, attributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ const allowedTransitions: Record<string, any> = {
Delete: [DatasetStatus.Draft, DatasetStatus.ReadyToPublish],
ReadyToPublish: [DatasetStatus.Draft],
Live: [DatasetStatus.ReadyToPublish],
Retire: [DatasetStatus.Live],
Archive: [DatasetStatus.Retired],
Purge: [DatasetStatus.Archived]
Retire: [DatasetStatus.Live]
}
const liveDatasetActions = ["Retire", "Archive", "Purge"]
const liveDatasetActions = ["Retire"]

const validateRequest = (req: Request, datasetId: any) => {
const isRequestValid: Record<string, any> = schemaValidation(req.body, StatusTransitionSchema)
Expand Down Expand Up @@ -54,28 +52,25 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
const { dataset_id, status } = _.get(req.body, "request");
validateRequest(req, dataset_id);

const dataset: Record<string, any> = (_.includes(liveDatasetActions, status)) ? await datasetService.getDataset(dataset_id, ["id", "status", "type", "api_version"], true) : await datasetService.getDraftDataset(dataset_id, ["id", "dataset_id", "status", "type", "api_version"])
const dataset: Record<string, any> = (_.includes(liveDatasetActions, status)) ? await datasetService.getDataset(dataset_id, ["id", "status", "type", "api_version", "name"], true) : await datasetService.getDraftDataset(dataset_id, ["id", "dataset_id", "status", "type", "api_version"])
const userID = (req as any)?.userID;
validateDataset(dataset, dataset_id, status);

switch (status) {
case "Delete":
await deleteDataset(dataset);
break;
case "ReadyToPublish":
await readyForPublish(dataset);
await readyForPublish(dataset, userID);
break;
case "Live":
await publishDataset(dataset);
await publishDataset(dataset, userID);
break;
case "Retire":
await retireDataset(dataset);
break;
case "Archive":
await archiveDataset(dataset);
break;
case "Purge":
await purgeDataset(dataset);
await retireDataset(dataset, userID);
break;
default:
throw obsrvError(dataset.id, "UNKNOWN_STATUS_TRANSITION", "Unknown status transition requested", "BAD_REQUEST", 400)
}

ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset status transition to ${status} successful`, dataset_id } });
Expand All @@ -90,7 +85,7 @@ const deleteDataset = async (dataset: Record<string, any>) => {
}


const readyForPublish = async (dataset: Record<string, any>) => {
const readyForPublish = async (dataset: Record<string, any>, updated_by: any) => {

const draftDataset: any = await datasetService.getDraftDataset(dataset.dataset_id)
let defaultConfigs: any = _.cloneDeep(defaultDatasetConfig)
Expand All @@ -99,7 +94,14 @@ const readyForPublish = async (dataset: Record<string, any>) => {
if (draftDataset?.type === "master") {
defaultConfigs = _.omit(defaultConfigs, "dataset_config.keys_config.data_key");
}
_.mergeWith(draftDataset, defaultConfigs, draftDataset, (objValue, srcValue) => {
_.set(draftDataset, "updated_by", updated_by);
_.mergeWith(draftDataset, defaultConfigs, draftDataset, (objValue, srcValue ,key) => {
if (key === "created_by"|| key === "updated_by") {
if (objValue !== "SYSTEM") {
return objValue;
}
return srcValue;
}
if (_.isBoolean(objValue) && _.isBoolean(srcValue)) {
return objValue;
}
Expand All @@ -126,10 +128,11 @@ const readyForPublish = async (dataset: Record<string, any>) => {
*
* @param dataset
*/
const publishDataset = async (dataset: Record<string, any>) => {
const publishDataset = async (dataset: Record<string, any>, userID: any) => {

const draftDataset: Record<string, any> = await datasetService.getDraftDataset(dataset.dataset_id) as unknown as Record<string, any>

_.set(draftDataset, ["created_by"], userID);
_.set(draftDataset, ["updated_by"], userID);
await validateAndUpdateDenormConfig(draftDataset);
await updateMasterDataConfig(draftDataset)
await datasetService.publishDataset(draftDataset)
Expand Down Expand Up @@ -208,10 +211,10 @@ const updateMasterDataConfig = async (draftDataset: Record<string, any>) => {
}
}

const retireDataset = async (dataset: Record<string, any>) => {
const retireDataset = async (dataset: Record<string, any>, updated_by: any) => {

await canRetireIfMasterDataset(dataset);
await datasetService.retireDataset(dataset);
await datasetService.retireDataset(dataset, updated_by);
await restartPipeline(dataset);
}

Expand Down Expand Up @@ -239,14 +242,4 @@ export const restartPipeline = async (dataset: Record<string, any>) => {
return executeCommand(dataset.id, "RESTART_PIPELINE")
}

const archiveDataset = async (dataset: Record<string, any>) => {

throw obsrvError(dataset.id, "ARCHIVE_NOT_IMPLEMENTED", "Archive functionality is not implemented", "NOT_IMPLEMENTED", 501)
}

const purgeDataset = async (dataset: Record<string, any>) => {

throw obsrvError(dataset.id, "PURGE_NOT_IMPLEMENTED", "Purge functionality is not implemented", "NOT_IMPLEMENTED", 501)
}

export default datasetStatusTransition;
2 changes: 2 additions & 0 deletions api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const datasetUpdate = async (req: Request, res: Response) => {
validateDataset(datasetModel, req)

const draftDataset = mergeDraftDataset(datasetModel, datasetReq);
const userID = (req as any)?.userID;
_.set(draftDataset, "updated_by", userID )
const response = await datasetService.updateDraftDataset(draftDataset);
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: response });
}
Expand Down
Loading

0 comments on commit 85eeb07

Please sign in to comment.