Skip to content

Commit

Permalink
Sanketika-Obsrv/issue-tracker#234: transaction management removed
Browse files Browse the repository at this point in the history
  • Loading branch information
JeraldJF committed Jun 11, 2024
1 parent e6c250e commit 387c1ec
Show file tree
Hide file tree
Showing 16 changed files with 34 additions and 396 deletions.
11 changes: 4 additions & 7 deletions api-service/src/v2/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import { defaultDatasetConfig, defaultMasterConfig } from "../../configs/DatasetConfigDefault";
import { DatasetType } from "../../types/DatasetModels";
import { query, sequelize } from "../../connections/databaseConnection";
import { query } from "../../connections/databaseConnection";
import { ErrorObject } from "../../types/ResponseModel";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
Expand All @@ -20,7 +20,6 @@ const datasetCreate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
const transact = await sequelize.transaction()
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -66,25 +65,23 @@ const datasetCreate = async (req: Request, res: Response) => {
const datasetPayload: any = await getDefaultValue(datasetBody);
const data = { ...datasetPayload, version_key: Date.now().toString() }

const response = await DatasetDraft.create(data, { transaction: transact })
const response = await DatasetDraft.create(data)

const { dataset_config, denorm_config, transformation_config, data_schema, id, dataset_id } = data
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), data_schema, id, dataset_id, denorm_config, transformation_config, action:"create" })
await DatasourceDraft.create(datasourcePayload, { transaction: transact })
await DatasourceDraft.create(datasourcePayload)
logger.info({ apiId, message: `Datasource created successsfully for the dataset:${id}` })

const transformationConfig: any = getTransformationConfig({ transformationPayload: _.get(datasetBody, "transformations_config"), datasetId: _.get(datasetPayload, "id") })
if (!_.isEmpty(transformationConfig)) {
await DatasetTransformationsDraft.bulkCreate(transformationConfig), { transaction: transact };
await DatasetTransformationsDraft.bulkCreate(transformationConfig);
logger.info({ apiId, message: `Dataset transformations records created successsfully for dataset:${id}` })
}

await transact.commit()
const responseData = { id: _.get(response, ["dataValues", "id"]) || "", version_key: data.version_key }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset Created Successfully with id:${_.get(response, ["dataValues", "id"])}`, response: responseData })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responseData });
} catch (error: any) {
await transact.rollback()
const code = _.get(error, "code") || "DATASET_CREATION_FAILURE"
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { schemaValidation } from "../../services/ValidationService";
import StatusTransitionSchema from "./RequestValidationSchema.json";
import ReadyToPublishSchema from "./ReadyToPublishSchema.json"
import httpStatus from "http-status";
import { sequelize } from "../../connections/databaseConnection";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { DatasetSourceConfigDraft } from "../../models/DatasetSourceConfigDraft";
Expand Down Expand Up @@ -42,8 +41,6 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
try {
const transact = await sequelize.transaction()
try {
const { dataset_id, status } = _.get(requestBody, "request");
setReqDatasetId(req, dataset_id)
Expand Down Expand Up @@ -88,13 +85,11 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
}

const transitionCommands = _.get(statusTransitionCommands, status)
await executeTransition({ transitionCommands, dataset: datasetRecord, transact })
await executeTransition({ transitionCommands, dataset: datasetRecord })

await transact.commit();
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset status transition to ${status} successful with id:${dataset_id}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset status transition to ${status} successful`, dataset_id } });
} catch (error: any) {
await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error(error, apiId, msgid, code, requestBody, resmsgid)
let errorMessage = error;
Expand All @@ -104,11 +99,6 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
} catch (error: any) {
const errMessage = error.message
logger.error({ message: errMessage, msgid, resmsgid, apiId })
ResponseHandler.errorResponse(errMessage, req, res);
}
}

const fetchDataset = async (configs: Record<string, any>) => {
Expand All @@ -125,10 +115,10 @@ const fetchDataset = async (configs: Record<string, any>) => {
}

const executeTransition = async (configs: Record<string, any>) => {
const { transitionCommands, dataset, transact } = configs
const { transitionCommands, dataset } = configs
const transitionPromises = _.map(transitionCommands, async command => {
const commandWorkflow = _.get(commandExecutors, command)
return commandWorkflow({ dataset, transact })
return commandWorkflow({ dataset })
})
await Promise.all(transitionPromises)
}
Expand All @@ -150,17 +140,17 @@ const validateDataset = async (configs: Record<string, any>) => {

//DELETE_DRAFT_DATASETS
const deleteDataset = async (configs: Record<string, any>) => {
const { dataset, transact } = configs
const { dataset } = configs
const { id } = dataset
await deleteDraftRecords({ dataset_id: id, transact })
await deleteDraftRecords({ dataset_id: id })
}

const deleteDraftRecords = async (config: Record<string, any>) => {
const { dataset_id, transact } = config;

Check failure on line 149 in api-service/src/v2/controllers/DatasetStatusTransition/DatasetStatusTransition.ts

View workflow job for this annotation

GitHub Actions / test-cases

'transact' is assigned a value but never used
await DatasetTransformationsDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasourceDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetDraft.destroy({ where: { id: dataset_id }, transaction: transact })
await DatasetTransformationsDraft.destroy({ where: { dataset_id } })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id } })
await DatasourceDraft.destroy({ where: { dataset_id } })
await DatasetDraft.destroy({ where: { id: dataset_id } })
}

//PUBLISH_DATASET
Expand Down Expand Up @@ -195,12 +185,12 @@ const checkDatasetDenorm = async (payload: Record<string, any>) => {

//SET_DATASET_TO_RETIRE
const setDatasetRetired = async (config: Record<string, any>) => {
const { dataset, transact } = config;
const { dataset } = config;
const { dataset_id } = dataset
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
}

//DELETE_SUPERVISORS
Expand Down
18 changes: 7 additions & 11 deletions api-service/src/v2/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import logger from "../../logger";
import { defaultDatasetConfig } from "../../configs/DatasetConfigDefault";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
import { generateDataSource, getDraftTransformations, getDuplicateConfigs, setReqDatasetId } from "../../services/DatasetService";
import { sequelize } from "../../connections/databaseConnection";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { ingestionConfig } from "../../configs/IngestionConfig";
import { getDefaultValue } from "../DatasetCreate/DatasetCreate";
Expand All @@ -24,7 +23,6 @@ const datasetUpdate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
const transact = await sequelize.transaction();
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -104,24 +102,22 @@ const datasetUpdate = async (req: Request, res: Response) => {
const updatedPayload = await getDefaultValue(datasetPayload)
logger.debug({ datasetPayload })
const transformationConfigs = _.get(datasetBody, "transformation_config")
await manageTransformations(transformationConfigs, dataset_id, transact);
await manageTransformations(transformationConfigs, dataset_id);

const { data_schema } = datasetBody
if (data_schema) {
const { transformation_config } = datasetBody
const { dataset_config, id, dataset_id, denorm_config } = updatedPayload
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), transformation_config, denorm_config, data_schema, id, dataset_id, action: "edit" })
await DatasourceDraft.update(datasourcePayload, { where: { id: _.get(datasourcePayload, "id") }, transaction: transact })
await DatasourceDraft.update(datasourcePayload, { where: { id: _.get(datasourcePayload, "id") }})
}

await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }, transaction: transact })
await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }})

await transact.commit();
const responsedata = { message: "Dataset is updated successfully", id: dataset_id, version_key: _.get(datasetPayload, "version_key") }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset updated successfully with id:${dataset_id}`, response: responsedata })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responsedata });
} catch (error: any) {
await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand All @@ -133,22 +129,22 @@ const datasetUpdate = async (req: Request, res: Response) => {
}
}

const manageTransformations = async (transformations: Record<string, any>, datasetId: string, transact: any) => {
const manageTransformations = async (transformations: Record<string, any>, datasetId: string) => {
if (transformations) {
const transformationConfigs = await getTransformationConfigs(transformations, datasetId);
if (transformationConfigs) {
const { addTransformation, updateTransformation, deleteTransformation } = transformationConfigs;

if (!_.isEmpty(addTransformation)) {
await DatasetTransformationsDraft.bulkCreate(addTransformation, { transaction: transact });
await DatasetTransformationsDraft.bulkCreate(addTransformation);
}

if (!_.isEmpty(updateTransformation)) {
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }, transaction: transact })));
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }})));
}

if (!_.isEmpty(deleteTransformation)) {
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }, transaction: transact });
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetTransformations, "findAll", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})


chai
.request(app)
Expand All @@ -84,9 +79,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve(null)
})
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})

chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -105,9 +98,6 @@ describe("DATASET CREATE API", () => {
}

it("Dataset creation failure: Invalid request payload provided", (done) => {
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -128,9 +118,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve({ datavalues: [] })
})
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})

chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -157,12 +145,6 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "create", () => {
return Promise.resolve({ dataValues: { id: "telemetry" } })
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -183,12 +165,6 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.reject({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { DatasetDraft } from "../../../models/DatasetDraft";
import { DatasetTransformationsDraft } from "../../../models/TransformationDraft";
import { DatasetSourceConfigDraft } from "../../../models/DatasetSourceConfigDraft";
import { DatasourceDraft } from "../../../models/DatasourceDraft";
import { sequelize } from "../../../connections/databaseConnection";


chai.use(spies);
Expand Down Expand Up @@ -42,12 +41,6 @@ describe("DATASET STATUS TRANSITION DELETE", () => {
chai.spy.on(DatasetDraft, "destroy", () => {
return Promise.resolve({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -69,9 +62,6 @@ describe("DATASET STATUS TRANSITION DELETE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import _ from "lodash";
import { apiId, errorCode } from "../../../controllers/DatasetStatusTransition/DatasetStatusTransition";
import { TestInputsForDatasetStatusTransition } from "./Fixtures";
import { DatasetDraft } from "../../../models/DatasetDraft";
import { sequelize } from "../../../connections/databaseConnection";
import { commandHttpService } from "../../../connections/commandServiceConnection";

chai.use(spies);
Expand All @@ -30,12 +29,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(commandHttpService, "post", () => {
return Promise.resolve({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -57,9 +50,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -83,12 +73,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(commandHttpService, "post", () => {
return Promise.reject()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -108,9 +92,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve({ dataset_id: "telemetry", status: "Draft" })
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand Down
Loading

0 comments on commit 387c1ec

Please sign in to comment.