Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ declare namespace MsNodeSqlV8 {
* @returns promise of bound proc to call.
*/
getProc: (name: string) => Promise<ProcedureDefinition>

beginTransaction(): Promise<PoolDescription>
commitTransaction(description: PoolDescription): Promise<void>
rollbackTransaction(description: PoolDescription): Promise<void>
transaction(cb: (description: PoolDescription) => any): Promise<void>
}

export class Pool implements GetSetUTC, SubmitQuery {
Expand Down Expand Up @@ -350,6 +355,44 @@ declare namespace MsNodeSqlV8 {
* @param cb callback related to event subscribed
*/
on (event: string, cb?: sqlPoolEventType): void
beginTransaction(cb: TransactionCb): Query
commitTransaction(description: PoolDescription, cb?: QueryRawCb): void
rollbackTransaction(description: PoolDescription, cb?: QueryRawCb): void
}

export interface PoolChunky {
params: sqlProcParamType[] | sqlQueryParamType[]
callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb
}

export class PoolEventCaster {
isPaused (): boolean
getQueryObj (): Query
getQueryId (): Query | number
isPendingCancel (): boolean
cancelQuery (cb?: StatusCb): void
pauseQuery (): void
resumeQuery (): void
setQueryObj (q: Query, chunky: PoolChunky ): void
isPrepared (): false
/**
* event subscription
* e.g. pool.on('debug', msg => { console.log(msg) })
* @param event one of
*
* 'debug' - a debug record showing internal state of the pool
*
* 'open' - event on the pool being opened and ready to work.
*
* 'error' - propagated from connection errors
*
* 'status' - information relating to latet operation
*
* 'submitted' - raised when query is submitted where previously was on a queue
*
* @param cb callback related to event subscribed
*/
on (event: string, cb?: sqlPoolEventType): void
}

export interface TableColumn {
Expand Down Expand Up @@ -927,6 +970,8 @@ declare namespace MsNodeSqlV8 {

export type GetProcCb = (err: Error, procedure?: ProcedureDefinition) => void

export type TransactionCb = (err?: Error, description?: PoolDescription) => void

export interface BulkMgrSummary {
insertSignature: string
whereColumns: TableColumn[]
Expand Down Expand Up @@ -1665,6 +1710,39 @@ declare namespace MsNodeSqlV8 {
callProcedure (qid: number, procedure: string, params: NativeParam[], cb: NativeQueryCb): void
}

export enum workTypeEnum {
QUERY = 10,
RAW = 11,
PROC = 12,
TRANSACTION = 13,
COMMITTING = 14,
}

export interface PoolWorkItem {
id: number
sql: string
paramsOrCallback: sqlQueryParamType[] | QueryCb | QueryRawCb | CallProcedureCb | TransactionCb,
callback: QueryCb | QueryRawCb | CallProcedureCb | TransactionCb
poolNotifier: PoolEventCaster
workType: workTypeEnum
chunky: PoolChunky
}

export interface PoolDescription {
id: number
pool: Pool
connection: Connection
heartbeatSqlResponse: any
lastActive: Date
work: PoolWorkItem
keepAliveCount: number
recreateCount: number
parkedCount: number
queriesSent: number
beganAt: null | Date
totalElapsedQueryMs: number
}

export interface SqlClient extends UserConversion {
/**
* helper promises allowing async style await to open connection or
Expand Down
121 changes: 108 additions & 13 deletions lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const poolModule = (() => {
}

heartbeatResponse (v) {
this.heatbeatSqlResponse = v
this.heartbeatSqlResponse = v
}

heartbeat () {
Expand Down Expand Up @@ -211,13 +211,51 @@ const poolModule = (() => {

class PoolPromises {
constructor (pool) {
this.pool = pool
this.open = util.promisify(pool.open)
this.close = util.promisify(pool.close)
this.query = pool.queryAggregator
this.callProc = pool.callprocAggregator
this.getUserTypeTable = pool.getUserTypeTable
this.getTable = pool.getTable
this.getProc = pool.getProc
this.beginTransaction = util.promisify(pool.beginTransaction)
this.commitTransaction = util.promisify(pool.commitTransaction)
this.rollbackTransaction = util.promisify(pool.rollbackTransaction)
}

transaction(cb) {
let connectionDescription
return this.beginTransaction()
.then((description) => cb(connectionDescription = description))
.then(
() => this.commitTransaction(connectionDescription),
err => {
// If no connectionDescription, do nothing, the beginTransaction errored
// and we can report it directly.
if (!connectionDescription) { return Promise.reject(err) }

// Error in cb() we should notify about it
if (this.pool.listenerCount('error') > 0) {
this.pool.emit('error', err)
}

return this.rollbackTransaction(connectionDescription)
.catch((rollbackError) => {
// We encountered error during rollback, emit an error on the pool for it
if (this.pool.listenerCount('error') > 0) {
this.pool.emit('error', rollbackError)
}
})
.then(
() => {
// Return the original error regardless if rollback was
// successful or not.
return Promise.reject(err)
},
)
}
)
}
}

Expand Down Expand Up @@ -285,6 +323,8 @@ const poolModule = (() => {
}

function runTheQuery (q, description, work) {
let errored = false

work.poolNotifier.setQueryObj(q, work.chunky)
q.on('submitted', () => {
_this.emit('debug', `[${description.id}] submitted work id ${work.id}`)
Expand All @@ -297,6 +337,13 @@ const poolModule = (() => {

q.on('free', () => {
description.free()

// Transactions can not be freed yet if no errors occured. They need to be freed later
if (!errored && description.work && description.work.workType === workTypeEnum.TRANSACTION) {
_this.emit('debug', `[${description.id}] inside transaction from work id ${work.id}`)
return
}

checkin('work', description)
_this.emit('debug', `[${description.id}] free work id ${work.id}`)
work.poolNotifier.emit('free')
Expand All @@ -306,6 +353,7 @@ const poolModule = (() => {
})

q.on('error', (e, more) => {
errored = true
sendError(e, more)
setImmediate(() => {
crank()
Expand All @@ -322,12 +370,19 @@ const poolModule = (() => {
break

case workTypeEnum.RAW:
case workTypeEnum.COMMITTING:
q = connection.queryRaw(work.sql, work.paramsOrCallback, work.callback)
break

case workTypeEnum.PROC:
q = connection.callproc(work.sql, work.paramsOrCallback, work.callback)
break

case workTypeEnum.TRANSACTION:
q = connection.queryRaw(work.sql, work.paramsOrCallback, function (err) {
work.callback(err, err ? null : description)
})
break
}
return q
}
Expand All @@ -346,20 +401,17 @@ const poolModule = (() => {
poolNotifier.emit('free')
}

/** Move unpaused items to queue */
function promotePause () {
const add = []
const start = pause.length
while (pause.length > 0) {
const item = pause.pop()
if (item.isPaused) {
add.unshift(item)
} else {
workQueue.push(item)

for (let i = 0; i < pause.length; i++) {
if (!pause[i].isPaused) {
workQueue.push(pause.splice(i, 1)[0])
i--
}
}
while (add.length > 0) {
pause.unshift(add.pop())
}

if (start !== pause.length) {
setImmediate(() => { crank() })
}
Expand Down Expand Up @@ -395,16 +447,20 @@ const poolModule = (() => {
const workTypeEnum = {
QUERY: 10,
RAW: 11,
PROC: 12
PROC: 12,
TRANSACTION: 13,
COMMITTING: 14,
}

function chunk (paramsOrCallback, callback, workType) {
switch (workType) {
case workTypeEnum.QUERY:
case workTypeEnum.RAW:
case workTypeEnum.COMMITTING:
return notifierFactory.getChunkyArgs(paramsOrCallback, callback)

case workTypeEnum.PROC:
case workTypeEnum.TRANSACTION:
return { params: paramsOrCallback, callback }
}
}
Expand Down Expand Up @@ -456,6 +512,42 @@ const poolModule = (() => {
return submit(sql, paramsOrCallback, callback, workTypeEnum.PROC)
}

function beginTransaction (callback) {
if (!callback || typeof callback !== 'function') {
throw new Error('[msnodesql] Pool beginTransaction called with empty callback.')
}
return submit('BEGIN TRANSACTION', [], callback, workTypeEnum.TRANSACTION)
}

function finishTransaction(sql, description, callback) {
if (!description instanceof PoolDscription) {
throw new Error('[msnodesql] Pool end transaction called with non-description.')
}
const work = description.work
if (!work) {
throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.')
}

if (work.workType !== workTypeEnum.TRANSACTION && work.workType !== workTypeEnum.COMMITTING) {
throw new Error('[msnodesql] Pool end transaction called with unknown or finished transaction.')
}

_this.emit('debug', `[${description.id}] closing transaction from ${work.id} with ${sql}`)
work.callback = callback
work.sql = sql
work.workType = workTypeEnum.COMMITTING
item(description, work)
return work.poolNotifier
}

function commitTransaction (description, callback) {
return finishTransaction('IF (@@TRANCOUNT > 0) COMMIT TRANSACTION', description, callback)
}

function rollbackTransaction (description, callback) {
return finishTransaction('IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION', description, callback)
}

async function getUserTypeTable (name) {
// the table mgr will submit query into pool as if it's a connection
return checkClosedPromise().then(async () => tableMgr.promises.getUserTypeTable(name))
Expand Down Expand Up @@ -651,7 +743,7 @@ const poolModule = (() => {
description.heartbeat() // reset by user query
checkin('heartbeat', description)
const inactivePeriod = description.keepAliveCount * options.heartbeatSecs
_this.emit('debug', `[${description.id}] heartbeat response = '${description.heatbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` +
_this.emit('debug', `[${description.id}] heartbeat response = '${description.heartbeatSqlResponse}', ${description.lastActive.toLocaleTimeString()}` +
`, keepAliveCount = ${description.keepAliveCount} inactivePeriod = ${inactivePeriod}, inactivityTimeoutSecs = ${options.inactivityTimeoutSecs}`)
})
q.on('error', (e) => {
Expand Down Expand Up @@ -735,6 +827,9 @@ const poolModule = (() => {
)
}

this.beginTransaction = beginTransaction
this.commitTransaction = commitTransaction
this.rollbackTransaction = rollbackTransaction
this.open = open
this.close = close
this.query = query
Expand Down