Skip to content

Commit

Permalink
feat: forUpdate and forShareLock (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobdenOs authored Mar 14, 2024
1 parent 3790ec0 commit 99a1170
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 16 deletions.
7 changes: 6 additions & 1 deletion db-service/lib/SQLService.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ class SQLService extends DatabaseService {
if (!query.target) {
try { this.infer(query) } catch (e) { /**/ }
}
if (query.target && !query.target._unresolved) {
if (
query.target
&& !query.target._unresolved
&& !query.SELECT.forUpdate
&& !query.SELECT.forShareLock
) {
// Will return multiple rows with objects inside
query.SELECT.expand = 'root'
}
Expand Down
2 changes: 1 addition & 1 deletion db-service/lib/common/DatabaseService.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ class DatabaseService extends cds.Service {
const tenants = tenant ? [tenant] : Object.keys(this.pools)
await Promise.all (tenants.map (async t => {
const pool = this.pools[t]; if (!pool) return
delete this.pools[t]
await pool.drain()
await pool.clear()
delete this.pools[t]
}))
}

Expand Down
31 changes: 30 additions & 1 deletion db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class CQN2SQLRenderer {
* @param {import('./infer/cqn').SELECT} q
*/
SELECT(q) {
let { from, expand, where, groupBy, having, orderBy, limit, one, distinct, localized } = q.SELECT
let { from, expand, where, groupBy, having, orderBy, limit, one, distinct, localized, forUpdate, forShareLock } =
q.SELECT
// REVISIT: When selecting from an entity that is not in the model the from.where are not normalized (as cqn4sql is skipped)
if (!where && from?.ref?.length === 1 && from.ref[0]?.where) where = from.ref[0]?.where
let columns = this.SELECT_columns(q)
Expand All @@ -219,6 +220,8 @@ class CQN2SQLRenderer {
if (!_empty(orderBy)) sql += ` ORDER BY ${this.orderBy(orderBy, localized)}`
if (one) limit = Object.assign({}, limit, { rows: { val: 1 } })
if (limit) sql += ` LIMIT ${this.limit(limit)}`
if (forUpdate) sql += ` ${this.forUpdate(forUpdate)}`
else if (forShareLock) sql += ` ${this.forShareLock(forShareLock)}`
// Expand cannot work without an inferred query
if (expand) {
if ('elements' in q) sql = this.SELECT_expand(q, sql)
Expand Down Expand Up @@ -382,6 +385,32 @@ class CQN2SQLRenderer {
return !offset ? rows.val : `${rows.val} OFFSET ${offset.val}`
}

/**
* Renders an forUpdate clause into generic SQL
* @param {import('./infer/cqn').SELECT["SELECT"]["forUpdate"]} update
* @returns {string} SQL
*/
forUpdate(update) {
const { wait, of } = update
let sql = 'FOR UPDATE'
if (!_empty(of)) sql += ` OF ${of.map(x => this.expr(x)).join(', ')}`
if (typeof wait === 'number') sql += ` WAIT ${wait}`
return sql
}

/**
* Renders an forShareLock clause into generic SQL
* @param {import('./infer/cqn').SELECT["SELECT"]["forShareLock"]} update
* @returns {string} SQL
*/
forShareLock(lock) {
const { wait, of } = lock
let sql = 'FOR SHARE LOCK'
if (!_empty(of)) sql += ` OF ${of.map(x => this.expr(x)).join(', ')}`
if (typeof wait === 'number') sql += ` WAIT ${wait}`
return sql
}

// INSERT Statements ------------------------------------------------

/**
Expand Down
7 changes: 6 additions & 1 deletion db-service/lib/cqn4sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,12 @@ function cqn4sql(originalQuery, model) {
* @returns true if the given definition shall be localized
*/
function isLocalized(definition) {
return inferred.SELECT?.localized && definition['@cds.localized'] !== false
return (
inferred.SELECT?.localized &&
definition['@cds.localized'] !== false &&
!inferred.SELECT.forUpdate &&
!inferred.SELECT.forShareLock
)
}

/** returns the CSN definition for the given name from the model */
Expand Down
8 changes: 7 additions & 1 deletion hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,16 @@ class HANAService extends SQLService {

async onSELECT(req) {
const { query, data } = req

if (!query.target) {
try { this.infer(query) } catch (e) { /**/ }
}
if (!query.target || query.target._unresolved) {
if (
!query.target
|| query.target._unresolved
|| query.SELECT.forUpdate
|| query.SELECT.forShareLock
) {
return super.onSELECT(req)
}

Expand Down
14 changes: 14 additions & 0 deletions postgres/lib/PostgresService.js
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ GROUP BY k
else return super.operator(x, i, xpr)
}

// Postgres does not support locking columns only tables which makes of unapplicable
// Postgres does not support "wait n" it only supports "nowait"
forUpdate(update) {
const { wait } = update
if (wait === 0) return 'FOR UPDATE NOWAIT'
return 'FOR UPDATE'
}

forShareLock(lock) {
const { wait } = lock
if (wait === 0) return 'FOR SHARE NOWAIT'
return 'FOR SHARE'
}

defaultValue(defaultValue = this.context.timestamp.toISOString()) {
return this.string(`${defaultValue}`)
}
Expand Down
8 changes: 8 additions & 0 deletions sqlite/lib/SQLiteService.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ class SQLiteService extends SQLService {
return super.val(v)
}

forUpdate() {
return ''
}

forShareLock() {
return ''
}

// Used for INSERT statements
static InputConverters = {
...super.InputConverters,
Expand Down
138 changes: 127 additions & 11 deletions test/compliance/SELECT.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const cds = require('../cds.js')
// Call default cds.test API

describe('SELECT', () => {
const { data } = cds.test(__dirname + '/resources')
const { data, expect } = cds.test(__dirname + '/resources')
data.autoIsolation(true)

describe('from', () => {
Expand Down Expand Up @@ -74,12 +74,12 @@ describe('SELECT', () => {

test('like regex uses native regex support', async () => {
let ret = await SELECT.from('basic.projection.string').where('string like', /ye./)
expect(ret.length).toBe(1)
expect(ret.length).to.eq(1)
})

test('= regex behaves like string', async () => {
await expect(SELECT.from('basic.projection.string').where('string =', /ye./)).resolves.toHaveProperty('length', 0)
await expect(SELECT.from('basic.projection.string').where('string =', /yes/)).resolves.toHaveProperty('length', 1)
expect(await SELECT.from('basic.projection.string').where('string =', /ye./)).to.have.property('length', 0)
expect(await SELECT.from('basic.projection.string').where('string =', /yes/)).to.have.property('length', 1)
})

test('from select', async () => {
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('SELECT', () => {
`
cqn.SELECT.columns[0].val = function () { }

await assert.rejects(cds.run(cqn))
await expect(cds.run(cqn)).rejected
})

test.skip('select xpr', async () => {
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('SELECT', () => {
})

test.skip('invalid cast (wrong)', async () => {
await assert.rejects(
await expect(
cds.run(CQL`
SELECT
'String' as ![string] : cds.DoEsNoTeXiSt
Expand All @@ -214,7 +214,7 @@ describe('SELECT', () => {
{
message: 'Not supported type: cds.DoEsNoTeXiSt',
},
)
).rejected
})
})

Expand Down Expand Up @@ -309,16 +309,132 @@ describe('SELECT', () => {
})
})

const generalLockTest = (lock, shared = false) => {
const lock4 = bool => {
return lock.clone().where([{ ref: ['bool'] }, '=', { val: bool }])
}

const isSQLite = () => cds.db.options.impl === '@cap-js/sqlite'

const setMax = max => {
let oldMax
beforeAll(async () => {
if (isSQLite()) return
await cds.db.disconnect()
oldMax = cds.db.pools._factory.options.max
cds.db.pools._factory.options.max = max
})

afterAll(async () => {
if (isSQLite()) return
cds.db.pools._factory.options.max = oldMax
})
}

let oldTimeout
beforeAll(async () => {
oldTimeout = cds.db.pools._factory.options.acquireTimeoutMillis
cds.db.pools.undefined._config.acquireTimeoutMillis =
cds.db.pools._factory.options.acquireTimeoutMillis = 1000
})

afterAll(() => {
cds.db.pools.undefined._config.acquireTimeoutMillis =
cds.db.pools._factory.options.acquireTimeoutMillis = oldTimeout
})

describe('pool max = 1', () => {
setMax(1)
test('two locks on a single table', async () => {
const tx1 = await cds.tx()
const tx2 = await cds.tx()

try {
// Lock true
await tx1.run(lock4(true))

// Lock false
await expect(tx2.run(lock4(false))).rejected
} finally {
await Promise.allSettled([tx1.commit(), tx2.commit()])
}
})

test('same lock twice on a single table', async () => {
const tx1 = await cds.tx()
const tx2 = await cds.tx()

try {
// Lock false
await tx1.run(lock4(false))

// Lock false
await expect(tx2.run(lock4(false))).rejected
} finally {
await Promise.allSettled([tx1.commit(), tx2.commit()])
}
})
})

describe('pool max > 1', () => {
setMax(2)
test('two locks on a single table', async () => {
if (isSQLite()) return

const tx1 = await cds.tx()
const tx2 = await cds.tx()

try {
// Lock true
await tx1.run(lock4(true))

// Lock false
await tx2.run(lock4(false))
} finally {
await Promise.allSettled([tx1.commit(), tx2.commit()])
}
})

test('same lock twice on a single table', async () => {
if (isSQLite()) return

const tx1 = await cds.tx()
const tx2 = await cds.tx()

try {
// Lock false
await tx1.run(lock4(false))

// Lock false
if (shared) {
const ret = await tx2.run(lock4(false))
expect(ret).is.not.undefined
} else {
await expect(tx2.run(lock4(false))).rejected
}
} finally {
await Promise.allSettled([tx1.commit(), tx2.commit()])
}
})
})
}

describe('forUpdate', () => {
test.skip('missing', () => {
throw new Error('not supported')
const lock = SELECT.from('basic.projection.globals').forUpdate({
of: ['bool'],
wait: 0,
})

generalLockTest(lock)
})

describe('forShareLock', () => {
test.skip('missing', () => {
throw new Error('not supported')
const lock = SELECT.from('basic.projection.globals').forShareLock({
of: ['bool'],
wait: 0,
})

generalLockTest(lock, true)
})

describe('search', () => {
Expand Down

0 comments on commit 99a1170

Please sign in to comment.