From b18679dbd03004a7494dfb6360bfc92b8b69d5cd Mon Sep 17 00:00:00 2001 From: Jonatan Nilsson Date: Thu, 6 Feb 2025 06:48:42 +0000 Subject: [PATCH] pool: Add transaction helpers and support --- lib/index.d.ts | 78 +++++++++++++++++++++++++++++++ lib/pool.js | 121 +++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 186 insertions(+), 13 deletions(-) diff --git a/lib/index.d.ts b/lib/index.d.ts index 679fec0d..70c5358a 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -315,6 +315,11 @@ declare namespace MsNodeSqlV8 { * @returns promise of bound proc to call. */ getProc: (name: string) => Promise + + beginTransaction(): Promise + commitTransaction(description: PoolDescription): Promise + rollbackTransaction(description: PoolDescription): Promise + transaction(cb: (description: PoolDescription) => any): Promise } export class Pool implements GetSetUTC, SubmitQuery { @@ -350,6 +355,44 @@ declare namespace MsNodeSqlV8 { * @param cb callback related to event subscribed */ on (event: string, cb?: sqlPoolEventType): void + beginTransaction(cb: TransactionCb): Query + commitTransaction(description: PoolDescription, cb?: QueryRawCb): void + rollbackTransaction(description: PoolDescription, cb?: QueryRawCb): void + } + + export interface PoolChunky { + params: sqlProcParamType[] | sqlQueryParamType[] + callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb + } + + export class PoolEventCaster { + isPaused (): boolean + getQueryObj (): Query + getQueryId (): Query | number + isPendingCancel (): boolean + cancelQuery (cb?: StatusCb): void + pauseQuery (): void + resumeQuery (): void + setQueryObj (q: Query, chunky: PoolChunky ): void + isPrepared (): false + /** + * event subscription + * e.g. pool.on('debug', msg => { console.log(msg) }) + * @param event one of + * + * 'debug' - a debug record showing internal state of the pool + * + * 'open' - event on the pool being opened and ready to work. + * + * 'error' - propagated from connection errors + * + * 'status' - information relating to latet operation + * + * 'submitted' - raised when query is submitted where previously was on a queue + * + * @param cb callback related to event subscribed + */ + on (event: string, cb?: sqlPoolEventType): void } export interface TableColumn { @@ -927,6 +970,8 @@ declare namespace MsNodeSqlV8 { export type GetProcCb = (err: Error, procedure?: ProcedureDefinition) => void + export type TransactionCb = (err?: Error, description?: PoolDescription) => void + export interface BulkMgrSummary { insertSignature: string whereColumns: TableColumn[] @@ -1665,6 +1710,39 @@ declare namespace MsNodeSqlV8 { callProcedure (qid: number, procedure: string, params: NativeParam[], cb: NativeQueryCb): void } + export enum workTypeEnum { + QUERY = 10, + RAW = 11, + PROC = 12, + TRANSACTION = 13, + COMMITTING = 14, + } + + export interface PoolWorkItem { + id: number + sql: string + paramsOrCallback: sqlQueryParamType[] | QueryCb | QueryRawCb | CallProcedureCb | TransactionCb, + callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb + poolNotifier: PoolEventCaster + workType: workTypeEnum + chunky: PoolChunky + } + + export interface PoolDescription { + id: number + pool: Pool + connection: Connection + heartbeatSqlResponse: any + lastActive: Date + work: PoolWorkItem + keepAliveCount: number + recreateCount: number + parkedCount: number + queriesSent: number + beganAt: null | Date + totalElapsedQueryMs: number + } + export interface SqlClient extends UserConversion { /** * helper promises allowing async style await to open connection or diff --git a/lib/pool.js b/lib/pool.js index f179ea1a..00016449 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -54,7 +54,7 @@ const poolModule = (() => { } heartbeatResponse (v) { - this.heatbeatSqlResponse = v + this.heartbeatSqlResponse = v } heartbeat () { @@ -211,6 +211,7 @@ const poolModule = (() => { class PoolPromises { constructor (pool) { + this.pool = pool this.open = util.promisify(pool.open) this.close = util.promisify(pool.close) this.query = pool.queryAggregator @@ -218,6 +219,43 @@ const poolModule = (() => { this.getUserTypeTable = pool.getUserTypeTable this.getTable = pool.getTable this.getProc = pool.getProc + this.beginTransaction = util.promisify(pool.beginTransaction) + this.commitTransaction = util.promisify(pool.commitTransaction) + this.rollbackTransaction = util.promisify(pool.rollbackTransaction) + } + + transaction(cb) { + let connectionDescription + return this.beginTransaction() + .then((description) => cb(connectionDescription = description)) + .then( + () => this.commitTransaction(connectionDescription), + err => { + // If no connectionDescription, do nothing, the beginTransaction errored + // and we can report it directly. + if (!connectionDescription) { return Promise.reject(err) } + + // Error in cb() we should notify about it + if (this.pool.listenerCount('error') > 0) { + this.pool.emit('error', err) + } + + return this.rollbackTransaction(connectionDescription) + .catch((rollbackError) => { + // We encountered error during rollback, emit an error on the pool for it + if (this.pool.listenerCount('error') > 0) { + this.pool.emit('error', rollbackError) + } + }) + .then( + () => { + // Return the original error regardless if rollback was + // successful or not. + return Promise.reject(err) + }, + ) + } + ) } } @@ -285,6 +323,8 @@ const poolModule = (() => { } function runTheQuery (q, description, work) { + let errored = false + work.poolNotifier.setQueryObj(q, work.chunky) q.on('submitted', () => { _this.emit('debug', `[${description.id}] submitted work id ${work.id}`) @@ -297,6 +337,13 @@ const poolModule = (() => { q.on('free', () => { description.free() + + // Transactions can not be freed yet if no errors occured. They need to be freed later + if (!errored && description.work && description.work.workType === workTypeEnum.TRANSACTION) { + _this.emit('debug', `[${description.id}] inside transaction from work id ${work.id}`) + return + } + checkin('work', description) _this.emit('debug', `[${description.id}] free work id ${work.id}`) work.poolNotifier.emit('free') @@ -306,6 +353,7 @@ const poolModule = (() => { }) q.on('error', (e, more) => { + errored = true sendError(e, more) setImmediate(() => { crank() @@ -322,12 +370,19 @@ const poolModule = (() => { break case workTypeEnum.RAW: + case workTypeEnum.COMMITTING: q = connection.queryRaw(work.sql, work.paramsOrCallback, work.callback) break case workTypeEnum.PROC: q = connection.callproc(work.sql, work.paramsOrCallback, work.callback) break + + case workTypeEnum.TRANSACTION: + q = connection.queryRaw(work.sql, work.paramsOrCallback, function (err) { + work.callback(err, err ? null : description) + }) + break } return q } @@ -346,20 +401,17 @@ const poolModule = (() => { poolNotifier.emit('free') } + /** Move unpaused items to queue */ function promotePause () { - const add = [] const start = pause.length - while (pause.length > 0) { - const item = pause.pop() - if (item.isPaused) { - add.unshift(item) - } else { - workQueue.push(item) + + for (let i = 0; i < pause.length; i++) { + if (!pause[i].isPaused) { + workQueue.push(pause.splice(i, 1)[0]) + i-- } } - while (add.length > 0) { - pause.unshift(add.pop()) - } + if (start !== pause.length) { setImmediate(() => { crank() }) } @@ -395,16 +447,20 @@ const poolModule = (() => { const workTypeEnum = { QUERY: 10, RAW: 11, - PROC: 12 + PROC: 12, + TRANSACTION: 13, + COMMITTING: 14, } function chunk (paramsOrCallback, callback, workType) { switch (workType) { case workTypeEnum.QUERY: case workTypeEnum.RAW: + case workTypeEnum.COMMITTING: return notifierFactory.getChunkyArgs(paramsOrCallback, callback) case workTypeEnum.PROC: + case workTypeEnum.TRANSACTION: return { params: paramsOrCallback, callback } } } @@ -456,6 +512,42 @@ const poolModule = (() => { return submit(sql, paramsOrCallback, callback, workTypeEnum.PROC) } + function beginTransaction (callback) { + if (!callback || typeof callback !== 'function') { + throw new Error('[msnodesql] Pool beginTransaction called with empty callback.') + } + return submit('BEGIN TRANSACTION', [], callback, workTypeEnum.TRANSACTION) + } + + function finishTransaction(sql, description, callback) { + if (!description instanceof PoolDscription) { + throw new Error('[msnodesql] Pool end transaction called with non-description.') + } + const work = description.work + if (!work) { + throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.') + } + + if (work.workType !== workTypeEnum.TRANSACTION && work.workType !== workTypeEnum.COMMITTING) { + throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.') + } + + _this.emit('debug', `[${description.id}] closing transaction from ${work.id} with ${sql}`) + work.callback = callback + work.sql = sql + work.workType = workTypeEnum.COMMITTING + item(description, work) + return work.poolNotifier + } + + function commitTransaction (description, callback) { + return finishTransaction('IF (@@TRANCOUNT > 0) COMMIT TRANSACTION', description, callback) + } + + function rollbackTransaction (description, callback) { + return finishTransaction('IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION', description, callback) + } + async function getUserTypeTable (name) { // the table mgr will submit query into pool as if it's a connection return checkClosedPromise().then(async () => tableMgr.promises.getUserTypeTable(name)) @@ -651,7 +743,7 @@ const poolModule = (() => { description.heartbeat() // reset by user query checkin('heartbeat', description) const inactivePeriod = description.keepAliveCount * options.heartbeatSecs - _this.emit('debug', `[${description.id}] heartbeat response = '${description.heatbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` + + _this.emit('debug', `[${description.id}] heartbeat response = '${description.heartbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` + `, keepAliveCount = ${description.keepAliveCount} inactivePeriod = ${inactivePeriod}, inactivityTimeoutSecs = ${options.inactivityTimeoutSecs}`) }) q.on('error', (e) => { @@ -735,6 +827,9 @@ const poolModule = (() => { ) } + this.beginTransaction = beginTransaction + this.commitTransaction = commitTransaction + this.rollbackTransaction = rollbackTransaction this.open = open this.close = close this.query = query