Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanketika-Obsrv/issue-tracker#234: transaction management removed #185

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions api-service/src/v2/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import { defaultDatasetConfig, defaultMasterConfig } from "../../configs/DatasetConfigDefault";
import { DatasetType } from "../../types/DatasetModels";
import { query, sequelize } from "../../connections/databaseConnection";
import { query } from "../../connections/databaseConnection";
import { ErrorObject } from "../../types/ResponseModel";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
Expand All @@ -20,7 +20,6 @@ const datasetCreate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
const transact = await sequelize.transaction()
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -66,25 +65,23 @@ const datasetCreate = async (req: Request, res: Response) => {
const datasetPayload: any = await getDefaultValue(datasetBody);
const data = { ...datasetPayload, version_key: Date.now().toString() }

const response = await DatasetDraft.create(data, { transaction: transact })
const response = await DatasetDraft.create(data)

const { dataset_config, denorm_config, transformation_config, data_schema, id, dataset_id } = data
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), data_schema, id, dataset_id, denorm_config, transformation_config, action:"create" })
await DatasourceDraft.create(datasourcePayload, { transaction: transact })
await DatasourceDraft.create(datasourcePayload)
logger.info({ apiId, message: `Datasource created successsfully for the dataset:${id}` })

const transformationConfig: any = getTransformationConfig({ transformationPayload: _.get(datasetBody, "transformations_config"), datasetId: _.get(datasetPayload, "id") })
if (!_.isEmpty(transformationConfig)) {
await DatasetTransformationsDraft.bulkCreate(transformationConfig), { transaction: transact };
await DatasetTransformationsDraft.bulkCreate(transformationConfig);
logger.info({ apiId, message: `Dataset transformations records created successsfully for dataset:${id}` })
}

await transact.commit()
const responseData = { id: _.get(response, ["dataValues", "id"]) || "", version_key: data.version_key }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset Created Successfully with id:${_.get(response, ["dataValues", "id"])}`, response: responseData })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responseData });
} catch (error: any) {
await transact.rollback()
const code = _.get(error, "code") || "DATASET_CREATION_FAILURE"
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { schemaValidation } from "../../services/ValidationService";
import StatusTransitionSchema from "./RequestValidationSchema.json";
import ReadyToPublishSchema from "./ReadyToPublishSchema.json"
import httpStatus from "http-status";
import { sequelize } from "../../connections/databaseConnection";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { DatasetSourceConfigDraft } from "../../models/DatasetSourceConfigDraft";
Expand Down Expand Up @@ -42,8 +41,6 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
try {
const transact = await sequelize.transaction()
try {
const { dataset_id, status } = _.get(requestBody, "request");
setReqDatasetId(req, dataset_id)
Expand Down Expand Up @@ -88,13 +85,11 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
}

const transitionCommands = _.get(statusTransitionCommands, status)
await executeTransition({ transitionCommands, dataset: datasetRecord, transact })
await executeTransition({ transitionCommands, dataset: datasetRecord })

await transact.commit();
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset status transition to ${status} successful with id:${dataset_id}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset status transition to ${status} successful`, dataset_id } });
} catch (error: any) {
await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error(error, apiId, msgid, code, requestBody, resmsgid)
let errorMessage = error;
Expand All @@ -104,11 +99,6 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
} catch (error: any) {
const errMessage = error.message
logger.error({ message: errMessage, msgid, resmsgid, apiId })
ResponseHandler.errorResponse(errMessage, req, res);
}
}

const fetchDataset = async (configs: Record<string, any>) => {
Expand All @@ -125,10 +115,10 @@ const fetchDataset = async (configs: Record<string, any>) => {
}

const executeTransition = async (configs: Record<string, any>) => {
const { transitionCommands, dataset, transact } = configs
const { transitionCommands, dataset } = configs
const transitionPromises = _.map(transitionCommands, async command => {
const commandWorkflow = _.get(commandExecutors, command)
return commandWorkflow({ dataset, transact })
return commandWorkflow({ dataset })
})
await Promise.all(transitionPromises)
}
Expand All @@ -150,17 +140,17 @@ const validateDataset = async (configs: Record<string, any>) => {

//DELETE_DRAFT_DATASETS
const deleteDataset = async (configs: Record<string, any>) => {
const { dataset, transact } = configs
const { dataset } = configs
const { id } = dataset
await deleteDraftRecords({ dataset_id: id, transact })
await deleteDraftRecords({ dataset_id: id })
}

const deleteDraftRecords = async (config: Record<string, any>) => {
const { dataset_id, transact } = config;
await DatasetTransformationsDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasourceDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetDraft.destroy({ where: { id: dataset_id }, transaction: transact })
const { dataset_id } = config;
await DatasetTransformationsDraft.destroy({ where: { dataset_id } })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id } })
await DatasourceDraft.destroy({ where: { dataset_id } })
await DatasetDraft.destroy({ where: { id: dataset_id } })
}

//PUBLISH_DATASET
Expand Down Expand Up @@ -195,12 +185,12 @@ const checkDatasetDenorm = async (payload: Record<string, any>) => {

//SET_DATASET_TO_RETIRE
const setDatasetRetired = async (config: Record<string, any>) => {
const { dataset, transact } = config;
const { dataset } = config;
const { dataset_id } = dataset
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
}

//DELETE_SUPERVISORS
Expand Down
18 changes: 7 additions & 11 deletions api-service/src/v2/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import logger from "../../logger";
import { defaultDatasetConfig } from "../../configs/DatasetConfigDefault";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
import { generateDataSource, getDraftTransformations, getDuplicateConfigs, setReqDatasetId } from "../../services/DatasetService";
import { sequelize } from "../../connections/databaseConnection";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { ingestionConfig } from "../../configs/IngestionConfig";
import { getDefaultValue } from "../DatasetCreate/DatasetCreate";
Expand All @@ -24,7 +23,6 @@ const datasetUpdate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
const transact = await sequelize.transaction();
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -104,24 +102,22 @@ const datasetUpdate = async (req: Request, res: Response) => {
const updatedPayload = await getDefaultValue(datasetPayload)
logger.debug({ datasetPayload })
const transformationConfigs = _.get(datasetBody, "transformation_config")
await manageTransformations(transformationConfigs, dataset_id, transact);
await manageTransformations(transformationConfigs, dataset_id);

const { data_schema } = datasetBody
if (data_schema) {
const { transformation_config } = datasetBody
const { dataset_config, id, dataset_id, denorm_config } = updatedPayload
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), transformation_config, denorm_config, data_schema, id, dataset_id, action: "edit" })
await DatasourceDraft.update(datasourcePayload, { where: { id: _.get(datasourcePayload, "id") }, transaction: transact })
await DatasourceDraft.update(datasourcePayload, { where: { id: _.get(datasourcePayload, "id") }})
}

await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }, transaction: transact })
await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }})

await transact.commit();
const responsedata = { message: "Dataset is updated successfully", id: dataset_id, version_key: _.get(datasetPayload, "version_key") }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset updated successfully with id:${dataset_id}`, response: responsedata })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responsedata });
} catch (error: any) {
await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand All @@ -133,22 +129,22 @@ const datasetUpdate = async (req: Request, res: Response) => {
}
}

const manageTransformations = async (transformations: Record<string, any>, datasetId: string, transact: any) => {
const manageTransformations = async (transformations: Record<string, any>, datasetId: string) => {
if (transformations) {
const transformationConfigs = await getTransformationConfigs(transformations, datasetId);
if (transformationConfigs) {
const { addTransformation, updateTransformation, deleteTransformation } = transformationConfigs;

if (!_.isEmpty(addTransformation)) {
await DatasetTransformationsDraft.bulkCreate(addTransformation, { transaction: transact });
await DatasetTransformationsDraft.bulkCreate(addTransformation);
}

if (!_.isEmpty(updateTransformation)) {
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }, transaction: transact })));
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }})));
}

if (!_.isEmpty(deleteTransformation)) {
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }, transaction: transact });
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetTransformations, "findAll", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})


chai
.request(app)
Expand All @@ -84,9 +79,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve(null)
})
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})

chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -105,9 +98,6 @@ describe("DATASET CREATE API", () => {
}

it("Dataset creation failure: Invalid request payload provided", (done) => {
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -128,9 +118,7 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve({ datavalues: [] })
})
chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve({})
})

chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -157,12 +145,6 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "create", () => {
return Promise.resolve({ dataValues: { id: "telemetry" } })
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand All @@ -183,12 +165,6 @@ describe("DATASET CREATE API", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.reject({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/create")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { DatasetDraft } from "../../../models/DatasetDraft";
import { DatasetTransformationsDraft } from "../../../models/TransformationDraft";
import { DatasetSourceConfigDraft } from "../../../models/DatasetSourceConfigDraft";
import { DatasourceDraft } from "../../../models/DatasourceDraft";
import { sequelize } from "../../../connections/databaseConnection";


chai.use(spies);
Expand Down Expand Up @@ -42,12 +41,6 @@ describe("DATASET STATUS TRANSITION DELETE", () => {
chai.spy.on(DatasetDraft, "destroy", () => {
return Promise.resolve({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -69,9 +62,6 @@ describe("DATASET STATUS TRANSITION DELETE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import _ from "lodash";
import { apiId, errorCode } from "../../../controllers/DatasetStatusTransition/DatasetStatusTransition";
import { TestInputsForDatasetStatusTransition } from "./Fixtures";
import { DatasetDraft } from "../../../models/DatasetDraft";
import { sequelize } from "../../../connections/databaseConnection";
import { commandHttpService } from "../../../connections/commandServiceConnection";

chai.use(spies);
Expand All @@ -30,12 +29,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(commandHttpService, "post", () => {
return Promise.resolve({})
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "commit", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -57,9 +50,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -83,12 +73,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(commandHttpService, "post", () => {
return Promise.reject()
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai.spy.on(t, "rollback", () => {
return Promise.resolve({})
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand All @@ -108,9 +92,6 @@ describe("DATASET STATUS TRANSITION LIVE", () => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve({ dataset_id: "telemetry", status: "Draft" })
})
const t = chai.spy.on(sequelize, "transaction", () => {
return Promise.resolve(sequelize.transaction)
})
chai
.request(app)
.post("/v2/datasets/status-transition")
Expand Down
Loading