Skip to content

Commit 7dc66bd

Browse files
committed
pool: Add transaction helpers and support
1 parent c9444c7 commit 7dc66bd

File tree

2 files changed

+185
-13
lines changed

2 files changed

+185
-13
lines changed

lib/index.d.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,11 @@ declare namespace MsNodeSqlV8 {
315315
* @returns promise of bound proc to call.
316316
*/
317317
getProc: (name: string) => Promise<ProcedureDefinition>
318+
319+
beginTransaction(): Promise<PoolDescription>
320+
commitTransaction(description: PoolDescription): Promise<void>
321+
rollbackTransaction(description: PoolDescription): Promise<void>
322+
transaction(cb: (description: PoolDescription) => any): Promise<void>
318323
}
319324

320325
export class Pool implements GetSetUTC, SubmitQuery {
@@ -350,6 +355,44 @@ declare namespace MsNodeSqlV8 {
350355
* @param cb callback related to event subscribed
351356
*/
352357
on (event: string, cb?: sqlPoolEventType): void
358+
beginTransaction(cb: TransactionCb): Query
359+
commitTransaction(description: PoolDescription, cb?: QueryRawCb): void
360+
rollbackTransaction(description: PoolDescription, cb?: QueryRawCb): void
361+
}
362+
363+
export interface PoolChunky {
364+
params: sqlProcParamType[] | sqlQueryParamType[]
365+
callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb
366+
}
367+
368+
export class PoolEventCaster {
369+
isPaused (): boolean
370+
getQueryObj (): Query
371+
getQueryId (): Query | number
372+
isPendingCancel (): boolean
373+
cancelQuery (cb?: StatusCb): void
374+
pauseQuery (): void
375+
resumeQuery (): void
376+
setQueryObj (q: Query, chunky: PoolChunky ): void
377+
isPrepared (): false
378+
/**
379+
* event subscription
380+
* e.g. pool.on('debug', msg => { console.log(msg) })
381+
* @param event one of
382+
*
383+
* 'debug' - a debug record showing internal state of the pool
384+
*
385+
* 'open' - event on the pool being opened and ready to work.
386+
*
387+
* 'error' - propagated from connection errors
388+
*
389+
* 'status' - information relating to latet operation
390+
*
391+
* 'submitted' - raised when query is submitted where previously was on a queue
392+
*
393+
* @param cb callback related to event subscribed
394+
*/
395+
on (event: string, cb?: sqlPoolEventType): void
353396
}
354397

355398
export interface TableColumn {
@@ -927,6 +970,8 @@ declare namespace MsNodeSqlV8 {
927970

928971
export type GetProcCb = (err: Error, procedure?: ProcedureDefinition) => void
929972

973+
export type TransactionCb = (err?: Error, description?: PoolDescription) => void
974+
930975
export interface BulkMgrSummary {
931976
insertSignature: string
932977
whereColumns: TableColumn[]
@@ -1665,6 +1710,38 @@ declare namespace MsNodeSqlV8 {
16651710
callProcedure (qid: number, procedure: string, params: NativeParam[], cb: NativeQueryCb): void
16661711
}
16671712

1713+
export enum workTypeEnum {
1714+
QUERY = 10,
1715+
RAW = 11,
1716+
PROC = 12,
1717+
TRANSACTION = 13,
1718+
}
1719+
1720+
export interface PoolWorkItem {
1721+
id: number
1722+
sql: string
1723+
paramsOrCallback: sqlQueryParamType[] | QueryCb | QueryRawCb | CallProcedureCb | TransactionCb,
1724+
callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb
1725+
poolNotifier: PoolEventCaster
1726+
workType: workTypeEnum
1727+
chunky: PoolChunky
1728+
}
1729+
1730+
export interface PoolDescription {
1731+
id: number
1732+
pool: Pool
1733+
connection: Connection
1734+
heartbeatSqlResponse: any
1735+
lastActive: Date
1736+
work: PoolWorkItem
1737+
keepAliveCount: number
1738+
recreateCount: number
1739+
parkedCount: number
1740+
queriesSent: number
1741+
beganAt: null | Date
1742+
totalElapsedQueryMs: number
1743+
}
1744+
16681745
export interface SqlClient extends UserConversion {
16691746
/**
16701747
* helper promises allowing async style await to open connection or

lib/pool.js

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ const poolModule = (() => {
5454
}
5555

5656
heartbeatResponse (v) {
57-
this.heatbeatSqlResponse = v
57+
this.heartbeatSqlResponse = v
5858
}
5959

6060
heartbeat () {
@@ -211,13 +211,51 @@ const poolModule = (() => {
211211

212212
class PoolPromises {
213213
constructor (pool) {
214+
this.pool = pool
214215
this.open = util.promisify(pool.open)
215216
this.close = util.promisify(pool.close)
216217
this.query = pool.queryAggregator
217218
this.callProc = pool.callprocAggregator
218219
this.getUserTypeTable = pool.getUserTypeTable
219220
this.getTable = pool.getTable
220221
this.getProc = pool.getProc
222+
this.beginTransaction = util.promisify(pool.beginTransaction)
223+
this.commitTransaction = util.promisify(pool.commitTransaction)
224+
this.rollbackTransaction = util.promisify(pool.rollbackTransaction)
225+
}
226+
227+
transaction(cb) {
228+
let connectionDescription
229+
return this.beginTransaction()
230+
.then((description) => cb(connectionDescription = description))
231+
.then(
232+
() => this.commitTransaction(connectionDescription),
233+
err => {
234+
// If no connectionDescription, do nothing, the beginTransaction errored
235+
// and we can report it directly.
236+
if (!connectionDescription) { return Promise.reject(err) }
237+
238+
// Error in cb() we should notify about it
239+
if (this.pool.listenerCount('error') > 0) {
240+
this.pool.emit('error', err)
241+
}
242+
243+
return this.rollbackTransaction(connectionDescription)
244+
.catch((rollbackError) => {
245+
// We encountered error during rollback, emit an error on the pool for it
246+
if (this.pool.listenerCount('error') > 0) {
247+
this.pool.emit('error', rollbackError)
248+
}
249+
})
250+
.then(
251+
() => {
252+
// Return the original error regardless if rollback was
253+
// successful or not.
254+
return Promise.reject(err)
255+
},
256+
)
257+
}
258+
)
221259
}
222260
}
223261

@@ -285,6 +323,8 @@ const poolModule = (() => {
285323
}
286324

287325
function runTheQuery (q, description, work) {
326+
let errored = false
327+
288328
work.poolNotifier.setQueryObj(q, work.chunky)
289329
q.on('submitted', () => {
290330
_this.emit('debug', `[${description.id}] submitted work id ${work.id}`)
@@ -297,6 +337,13 @@ const poolModule = (() => {
297337

298338
q.on('free', () => {
299339
description.free()
340+
341+
// Transactions can not be freed yet if no errors occured. They need to be freed later
342+
if (!errored && description.work && description.work.workType === workTypeEnum.TRANSACTION) {
343+
_this.emit('debug', `[${description.id}] inside transaction from work id ${work.id}`)
344+
return
345+
}
346+
300347
checkin('work', description)
301348
_this.emit('debug', `[${description.id}] free work id ${work.id}`)
302349
work.poolNotifier.emit('free')
@@ -306,6 +353,7 @@ const poolModule = (() => {
306353
})
307354

308355
q.on('error', (e, more) => {
356+
errored = true
309357
sendError(e, more)
310358
setImmediate(() => {
311359
crank()
@@ -322,12 +370,19 @@ const poolModule = (() => {
322370
break
323371

324372
case workTypeEnum.RAW:
373+
case workTypeEnum.COMMITTING:
325374
q = connection.queryRaw(work.sql, work.paramsOrCallback, work.callback)
326375
break
327376

328377
case workTypeEnum.PROC:
329378
q = connection.callproc(work.sql, work.paramsOrCallback, work.callback)
330379
break
380+
381+
case workTypeEnum.TRANSACTION:
382+
q = connection.queryRaw(work.sql, work.paramsOrCallback, function (err) {
383+
work.callback(err, err ? null : description)
384+
})
385+
break
331386
}
332387
return q
333388
}
@@ -346,20 +401,17 @@ const poolModule = (() => {
346401
poolNotifier.emit('free')
347402
}
348403

404+
/** Move unpaused items to queue */
349405
function promotePause () {
350-
const add = []
351406
const start = pause.length
352-
while (pause.length > 0) {
353-
const item = pause.pop()
354-
if (item.isPaused) {
355-
add.unshift(item)
356-
} else {
357-
workQueue.push(item)
407+
408+
for (let i = 0; i < pause.length; i++) {
409+
if (!pause[i].isPaused) {
410+
workQueue.push(pause.splice(i, 1)[0])
411+
i--
358412
}
359413
}
360-
while (add.length > 0) {
361-
pause.unshift(add.pop())
362-
}
414+
363415
if (start !== pause.length) {
364416
setImmediate(() => { crank() })
365417
}
@@ -395,16 +447,20 @@ const poolModule = (() => {
395447
const workTypeEnum = {
396448
QUERY: 10,
397449
RAW: 11,
398-
PROC: 12
450+
PROC: 12,
451+
TRANSACTION: 13,
452+
COMMITTING: 14,
399453
}
400454

401455
function chunk (paramsOrCallback, callback, workType) {
402456
switch (workType) {
403457
case workTypeEnum.QUERY:
404458
case workTypeEnum.RAW:
459+
case workTypeEnum.COMMITTING:
405460
return notifierFactory.getChunkyArgs(paramsOrCallback, callback)
406461

407462
case workTypeEnum.PROC:
463+
case workTypeEnum.TRANSACTION:
408464
return { params: paramsOrCallback, callback }
409465
}
410466
}
@@ -456,6 +512,42 @@ const poolModule = (() => {
456512
return submit(sql, paramsOrCallback, callback, workTypeEnum.PROC)
457513
}
458514

515+
function beginTransaction (callback) {
516+
if (!callback || typeof callback !== 'function') {
517+
throw new Error('[msnodesql] Pool beginTransaction called with empty callback.')
518+
}
519+
return submit('BEGIN TRANSACTION', [], callback, workTypeEnum.TRANSACTION)
520+
}
521+
522+
function finishTransaction(sql, description, callback) {
523+
if (!description instanceof PoolDscription) {
524+
throw new Error('[msnodesql] Pool end transaction called with non-description.')
525+
}
526+
const work = description.work
527+
if (!work) {
528+
throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.')
529+
}
530+
531+
if (work.workType !== workTypeEnum.TRANSACTION && work.workType !== workTypeEnum.COMMITTING) {
532+
throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.')
533+
}
534+
535+
_this.emit('debug', `[${description.id}] closing transaction from ${work.id} with ${sql}`)
536+
work.callback = callback
537+
work.sql = sql
538+
work.workType = workTypeEnum.COMMITTING
539+
item(description, work)
540+
return work.poolNotifier
541+
}
542+
543+
function commitTransaction (description, callback) {
544+
return finishTransaction('IF (@@TRANCOUNT > 0) COMMIT TRANSACTION', description, callback)
545+
}
546+
547+
function rollbackTransaction (description, callback) {
548+
return finishTransaction('IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION', description, callback)
549+
}
550+
459551
async function getUserTypeTable (name) {
460552
// the table mgr will submit query into pool as if it's a connection
461553
return checkClosedPromise().then(async () => tableMgr.promises.getUserTypeTable(name))
@@ -651,7 +743,7 @@ const poolModule = (() => {
651743
description.heartbeat() // reset by user query
652744
checkin('heartbeat', description)
653745
const inactivePeriod = description.keepAliveCount * options.heartbeatSecs
654-
_this.emit('debug', `[${description.id}] heartbeat response = '${description.heatbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` +
746+
_this.emit('debug', `[${description.id}] heartbeat response = '${description.heartbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` +
655747
`, keepAliveCount = ${description.keepAliveCount} inactivePeriod = ${inactivePeriod}, inactivityTimeoutSecs = ${options.inactivityTimeoutSecs}`)
656748
})
657749
q.on('error', (e) => {
@@ -735,6 +827,9 @@ const poolModule = (() => {
735827
)
736828
}
737829

830+
this.beginTransaction = beginTransaction
831+
this.commitTransaction = commitTransaction
832+
this.rollbackTransaction = rollbackTransaction
738833
this.open = open
739834
this.close = close
740835
this.query = query

0 commit comments

Comments
 (0)