-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: automated test isolation #952
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,192 @@ | |
const infer = require('../infer') | ||
const cds = require('@sap/cds') | ||
|
||
const databaseModel = cds.linked({ | ||
definitions: { | ||
schemas: new cds.entity({ | ||
kind: 'entity', | ||
elements: { | ||
tenant: { type: 'String', key: true }, | ||
source: { type: 'String' }, | ||
available: { type: 'Boolean' }, | ||
started: { type: 'Timestamp' }, | ||
}, | ||
}), | ||
} | ||
}) | ||
databaseModel.meta = {} | ||
|
||
/** @typedef {unknown} DatabaseDriver */ | ||
|
||
class DatabaseService extends cds.Service { | ||
|
||
init() { | ||
async init() { | ||
cds.on('shutdown', () => this.disconnect()) | ||
if (Object.getOwnPropertyDescriptor(cds, 'test') || this.options.isolate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't cds.test always available? So we would also run thus code for productive server start,, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should such test DB setup not be a module outside of the productive code? |
||
await this._isolate() | ||
} | ||
return super.init() | ||
} | ||
|
||
async _isolate() { | ||
const { options = {} } = cds | ||
const fts = cds.requires.toggles && cds.resolve(cds.env.features.folders) | ||
const src = [options.from || '*', ...(fts || [])] | ||
const fullSrc = [cds.root, ...src.flat()] | ||
|
||
const isolate = { src } | ||
if (typeof this.database === 'function' && typeof this.tenant === 'function') { | ||
const hash = str => { | ||
const { createHash } = require('crypto') | ||
const hash = createHash('sha1') | ||
hash.update(str) | ||
return hash.digest('hex') | ||
} | ||
|
||
const isolation = process.env.TRAVIS_JOB_ID || process.env.GITHUB_RUN_ID || require('os').userInfo().username || 'test_db' | ||
const srchash = hash(fullSrc.join('/')) | ||
// Create one database for each overall test execution | ||
isolate.database = 'D' + hash(isolation) | ||
// Create one tenant for each model source definition | ||
isolate.tenant = 'T' + srchash | ||
// Track source definition hash | ||
isolate.source = srchash | ||
|
||
// Create new database isolation | ||
await this.database(isolate) | ||
|
||
let isnew = false | ||
try { | ||
await this.tx(tx => tx.run(CREATE(databaseModel.definitions.schemas))).catch(() => { }) | ||
await this.tx(async tx => { | ||
tx.model = databaseModel | ||
// await tx.run(DELETE.from('schemas').where`true=true`) | ||
// If insert works the schema does not yet exist and this client has won the race and can deploy the contents | ||
await tx.run(INSERT({ tenant: isolate.tenant, source: isolate.source, available: false, started: new Date() }).into('schemas')) | ||
isnew = true | ||
}) | ||
} catch (err) { | ||
// If the schema already exists wait for the row to be updated with available=true | ||
let available = [] | ||
while (available.length === 0) { | ||
await this.tx(async tx => { | ||
tx.model = databaseModel | ||
available = await tx.run(SELECT.from('schemas').where`tenant=${isolate.tenant} and available=${true}`) | ||
}) | ||
} | ||
} | ||
|
||
// Create/Activate tenant isolation in database | ||
await this.tenant(isolate) | ||
|
||
if (isnew) { | ||
await this._isolate_deploy(isolate) | ||
await this.database(isolate) | ||
await this.tx(async tx => { | ||
tx.model = databaseModel | ||
await tx.run(UPDATE('schemas').where`tenant=${isolate.tenant}`.with({ available: true, started: new Date() })) | ||
}) | ||
await this.tenant(isolate) | ||
} | ||
} else { | ||
await this._isolate_deploy(isolate) | ||
} | ||
|
||
if (typeof this.database === 'function' && typeof this.tenant === 'function') { | ||
this._modified = {} | ||
this.before(['*'], async (req) => { | ||
if ( | ||
!req.query || | ||
req.query?.SELECT || | ||
(typeof req.query === 'string' && /^(BEGIN|COMMIT|ROLLBACK|SELECT)/i.test(req.query)) | ||
) return // Ignore reading requests | ||
if (req.target) this._modified[req.target.name] = true | ||
if (req.tx._isolating) return req.tx._isolating | ||
if (this._isolating) return | ||
|
||
// Add modification tracking for deep-queries internal calls | ||
for (const fn of ['onSIMPLE', 'onUPDATE', 'onINSERT']) { | ||
const org = this[fn] | ||
this[fn] = function (req) { | ||
if (req.query?.target) this._modified[req.query.target.name] = true | ||
return org.apply(this, arguments) | ||
} | ||
} | ||
|
||
this._isolating = true | ||
return (req.tx._isolating = req.tx.commit() | ||
.then(() => this._isolate_write(isolate)) | ||
.then(() => { | ||
return req.tx.begin() | ||
})) | ||
}) | ||
} | ||
} | ||
|
||
async _isolate_write(isolate) { | ||
await this.database(isolate) | ||
|
||
let isnew = false | ||
await this.tx(async tx => { | ||
tx.model = databaseModel | ||
const schemas = await tx.run(SELECT.from('schemas').where`tenant!=${isolate.tenant} and source=${isolate.source} and available=${true}`.forUpdate().limit(1)) | ||
if (schemas.length) { | ||
const tenant = isolate.tenant = schemas[0].tenant | ||
await tx.run(UPDATE('schemas').where`tenant=${tenant}`.with({ available: false, started: new Date() })) | ||
} else { | ||
isolate.tenant = 'T' + cds.utils.uuid() | ||
await tx.run(INSERT({ tenant: isolate.tenant, source: isolate.source, available: false, started: new Date() }).into('schemas')) | ||
isnew = true | ||
} | ||
delete this._modified.schemas // REVISIT: make sure to not track schemas modifications | ||
}) | ||
|
||
await this.tenant(isolate) | ||
|
||
if (isnew) await this._isolate_deploy(isolate) | ||
|
||
// Release schema for follow up test runs | ||
cds.on('shutdown', async () => { | ||
try { | ||
// Clean tenant entities | ||
await this.tx(async tx => { | ||
await tx.begin() | ||
for (const entity in this._modified) { | ||
const query = DELETE(entity).where`true=true` | ||
if (!query.target._unresolved) await tx.onSIMPLE({ query }) // Skip deep delete | ||
} | ||
// UPSERT all data sources again | ||
await cds.deploy.data(tx, tx.model, { schema_evolution: 'auto' }) | ||
}) | ||
|
||
await this.database(isolate) // switch back to database level | ||
await this.tx(tx => { | ||
tx.model = databaseModel | ||
return UPDATE('schemas').where`tenant=${isolate.tenant}`.with({ available: true }) | ||
}) | ||
await this.disconnect() | ||
} catch (err) { | ||
// if an shutdown handler throws an error it goes into an infinite loop | ||
debugger | ||
} | ||
}) | ||
} | ||
|
||
async _isolate_deploy(isolate) { | ||
await this.tx(async () => { | ||
try { | ||
const src = isolate.src | ||
const { options = {} } = cds | ||
const m = await cds.load(src, options).then(cds.minify) | ||
// options.schema_evolution = 'auto' | ||
await cds.deploy(m).to(this, options) | ||
} catch (err) { | ||
if (err.code === 'MODEL_NOT_FOUND' || err.code === 288) return | ||
throw err | ||
} | ||
}) | ||
} | ||
|
||
/** | ||
* Dictionary of connection pools per tenant | ||
*/ | ||
|
@@ -47,7 +224,7 @@ | |
* transaction with `BEGIN` | ||
* @returns this | ||
*/ | ||
async begin (min) { | ||
async begin(min) { | ||
// We expect tx.begin() being called for an txed db service | ||
const ctx = this.context | ||
|
||
|
@@ -129,9 +306,9 @@ | |
} | ||
|
||
// REVISIT: should happen automatically after a configurable time | ||
async disconnect (tenant) { | ||
async disconnect(tenant) { | ||
const tenants = tenant ? [tenant] : Object.keys(this.pools) | ||
await Promise.all (tenants.map (async t => { | ||
await Promise.all(tenants.map(async t => { | ||
const pool = this.pools[t]; if (!pool) return | ||
delete this.pools[t] | ||
await pool.drain() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a more distinct name, like
cds_test_schemas
to avoid clashes?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The model is inside the
database
isolation. It actually clashes withSYS.SCHEMAS
inHANA
, but as this is isolated it doesn't matter.