Skip to content

Commit

Permalink
feat: SELECT returns LargeBinaries as streams unless feature flag "st…
Browse files Browse the repository at this point in the history
…ream_compat" is set (#251)

Co-authored-by: D070615 <olena.timrova@sap.com>
Co-authored-by: Bob den Os <bob.den.os@sap.com>
Co-authored-by: Olena <etimryaka@gmail.com>
Co-authored-by: Patrice Bender <patrice.bender@sap.com>
  • Loading branch information
5 people authored Jan 12, 2024
1 parent 251575b commit 8165a4a
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 391 deletions.
1 change: 0 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"DELETE": true,
"CREATE": true,
"DROP": true,
"STREAM": true,
"CDL": true,
"CQL": true,
"CXL": true
Expand Down
169 changes: 87 additions & 82 deletions db-service/lib/SQLService.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const cds = require('@sap/cds/lib'),
DEBUG = cds.debug('sql|db')
const { Readable } = require('stream')
const { resolveView } = require('@sap/cds/libx/_runtime/common/utils/resolveView')
const DatabaseService = require('./common/DatabaseService')
const cqn4sql = require('./cqn4sql')
Expand All @@ -14,11 +15,8 @@ const cqn4sql = require('./cqn4sql')
*/

class SQLService extends DatabaseService {

init() {
this.on(['SELECT'], this.transformStreamFromCQN)
this.on(['UPDATE'], this.transformStreamIntoCQN)
this.on(['INSERT', 'UPSERT', 'UPDATE'], require('./fill-in-keys')) // REVISIT: should be replaced by correct input processing eventually
this.on(['INSERT', 'UPSERT', 'UPDATE'], require('./fill-in-keys')) // REVISIT should be replaced by correct input processing eventually
this.on(['INSERT', 'UPSERT', 'UPDATE'], require('./deep-queries').onDeep)
this.on(['SELECT'], this.onSELECT)
this.on(['INSERT'], this.onINSERT)
Expand All @@ -27,62 +25,79 @@ class SQLService extends DatabaseService {
this.on(['DELETE'], this.onDELETE)
this.on(['CREATE ENTITY', 'DROP ENTITY'], this.onSIMPLE)
this.on(['BEGIN', 'COMMIT', 'ROLLBACK'], this.onEVENT)
this.on(['STREAM'], this.onSTREAM)
this.on(['*'], this.onPlainSQL)
return super.init()
}

/** @type {Handler} */
async transformStreamFromCQN({ query }, next) {
if (!query._streaming) return next()
const cqn = STREAM.from(query.SELECT.from).column(query.SELECT.columns[0].ref[0])
if (query.SELECT.where) cqn.STREAM.where = query.SELECT.where
const stream = await this.run(cqn)
return stream && { value: stream }
}
_changeToStreams(columns, rows, one, compat) {
if (!rows || !columns) return
if (!Array.isArray(rows)) rows = [rows]
if (!rows.length || !Object.keys(rows[0]).length) return

/** @type {Handler} */
async transformStreamIntoCQN({ query, data, target }, next) {
let col, type, etag
const elements = query._target?.elements || target?.elements
if (!elements) next()
for (const key in elements) {
const element = elements[key]
if (element['@Core.MediaType'] && data[key]?.pipe) col = key
if (element['@Core.IsMediaType'] && data[key]) type = key
if (element['@odata.etag'] && data[key]) etag = key
// REVISIT: remove after removing stream_compat feature flag
if (compat) {
rows[0][Object.keys(rows[0])[0]] = this._stream(Object.values(rows[0])[0])
return
}

if (!col) return next()

const cqn = STREAM.into(query.UPDATE.entity).column(col).data(data[col])
if (query.UPDATE.where) cqn.STREAM.where = query.UPDATE.where
const result = await this.run(cqn)
if (type || etag) {
const d = { ...data }
delete d[col]
const cqn = UPDATE.entity(query.UPDATE.entity).with(d)
if (query.UPDATE.where) cqn.UPDATE.where = query.UPDATE.where
await this.run(cqn)
for (let col of columns) {
const name = col.as || col.ref?.[col.ref.length - 1] || (typeof col === 'string' && col)
if (col.element?.isAssociation) {
if (one) this._changeToStreams(col.SELECT.columns, rows[0][name], false, compat)
else
rows.forEach(row => {
this._changeToStreams(col.SELECT.columns, row[name], false, compat)
})
} else if (col.element?.type === 'cds.LargeBinary') {
if (one) rows[0][name] = this._stream(rows[0][name])
else
rows.forEach(row => {
row[name] = this._stream(row[name])
})
}
}
}

return result
_stream(val) {
if (val === null) return null
// Buffer.from only applies encoding when the input is a string
let raw = typeof val === 'string' ? Buffer.from(val.toString(), 'base64') : val
return new Readable({
read(size) {
if (raw.length === 0) return this.push(null)
const chunk = raw.slice(0, size)
raw = raw.slice(size)
this.push(chunk)
},
})
}

/**
* Handler for SELECT
* @type {Handler}
*/
async onSELECT({ query, data }) {
query.SELECT.expand = 'root'
const { sql, values, cqn } = this.cqn2sql(query, data)
let ps = await this.prepare(sql)
let rows = await ps.all(values)
if (rows.length)
if (cqn.SELECT.expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r))

if (cds.env.features.stream_compat) {
if (query._streaming) {
this._changeToStreams(cqn.SELECT.columns, rows, true, true)
return rows.length ? { value: Object.values(rows[0])[0] } : undefined
}
} else {
this._changeToStreams(cqn.SELECT.columns, rows, query.SELECT.one, false)
}

if (cqn.SELECT.count) {
// REVISIT: the runtime always expects that the count is preserved with .map, required for renaming in mocks
return SQLService._arrayWithCount(rows, await this.count(query, rows))
}

return cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 ? rows[0] : rows
}

Expand Down Expand Up @@ -126,22 +141,6 @@ class SQLService extends DatabaseService {
return this.onSIMPLE(req)
}

/**
* Handler for Stream
* @type {Handler}
*/
async onSTREAM(req) {
const { one, sql, values } = this.cqn2sql(req.query)
// writing stream
if (req.query.STREAM.into) {
const ps = await this.prepare(sql)
return (await ps.run(values)).changes
}
// reading stream
const ps = await this.prepare(sql)
return ps.stream(values, one)
}

/**
* Handler for CREATE, DROP, UPDATE, DELETE, with simple CQN
* @type {Handler}
Expand All @@ -154,7 +153,7 @@ class SQLService extends DatabaseService {

get onDELETE() {
// REVISIT: It's not yet 100 % clear under which circumstances we can rely on db constraints
return super.onDELETE = /* cds.env.features.assert_integrity === 'db' ? this.onSIMPLE : */ deep_delete
return (super.onDELETE = /* cds.env.features.assert_integrity === 'db' ? this.onSIMPLE : */ deep_delete)
async function deep_delete(/** @type {Request} */ req) {
let { compositions } = req.target
if (compositions) {
Expand All @@ -163,24 +162,29 @@ class SQLService extends DatabaseService {
if (typeof from === 'string') from = { ref: [from] }
if (where) {
let last = from.ref.at(-1)
if (last.where) [ last, where ] = [ last.id, [ { xpr: last.where }, 'and', { xpr: where } ] ]
from = {ref:[ ...from.ref.slice(0,-1), { id: last, where }]}
if (last.where) [last, where] = [last.id, [{ xpr: last.where }, 'and', { xpr: where }]]
from = { ref: [...from.ref.slice(0, -1), { id: last, where }] }
}
// Process child compositions depth-first
let { depth=0, visited=[] } = req
visited.push (req.target.name)
await Promise.all (Object.values(compositions).map(c => {
if (c._target['@cds.persistence.skip'] === true) return
if (c._target === req.target) { // the Genre.children case
if (++depth > (c['@depth'] || 3)) return
} else if (visited.includes(c._target.name)) throw new Error(
`Transitive circular composition detected: \n\n`+
` ${visited.join(' > ')} > ${c._target.name} \n\n`+
`These are not supported by deep delete.`)
// Prepare and run deep query, à la CQL`DELETE from Foo[pred]:comp1.comp2...`
const query = DELETE.from({ref:[ ...from.ref, c.name ]})
return this.onDELETE({ query, depth, visited: [...visited], target: c._target })
}))
let { depth = 0, visited = [] } = req
visited.push(req.target.name)
await Promise.all(
Object.values(compositions).map(c => {
if (c._target['@cds.persistence.skip'] === true) return
if (c._target === req.target) {
// the Genre.children case
if (++depth > (c['@depth'] || 3)) return
} else if (visited.includes(c._target.name))
throw new Error(
`Transitive circular composition detected: \n\n` +
` ${visited.join(' > ')} > ${c._target.name} \n\n` +
`These are not supported by deep delete.`,
)
// Prepare and run deep query, à la CQL`DELETE from Foo[pred]:comp1.comp2...`
const query = DELETE.from({ ref: [...from.ref, c.name] })
return this.onDELETE({ query, depth, visited: [...visited], target: c._target })
}),
)
}
return this.onSIMPLE(req)
}
Expand Down Expand Up @@ -233,8 +237,8 @@ class SQLService extends DatabaseService {
// REVISIT: made uppercase count because of HANA reserved word quoting
const cq = SELECT.one([{ func: 'count', as: 'COUNT' }]).from(
cds.ql.clone(query, {
localized: false,
expand: false,
localized: false,
expand: false,
limit: undefined,
orderBy: undefined,
}),
Expand All @@ -259,10 +263,12 @@ class SQLService extends DatabaseService {
// preserves $count for .map calls on array
static _arrayWithCount = function (a, count) {
const _map = a.map
const map = function (..._) { return SQLService._arrayWithCount(_map.call(a, ..._), count) }
const map = function (..._) {
return SQLService._arrayWithCount(_map.call(a, ..._), count)
}
return Object.defineProperties(a, {
$count: { value: count, enumerable: false, configurable: true, writable: true },
map: { value: map, enumerable: false, configurable: true, writable: true }
map: { value: map, enumerable: false, configurable: true, writable: true },
})
}

Expand All @@ -280,10 +286,8 @@ class SQLService extends DatabaseService {
*/
cqn2sql(query, values) {
let q = this.cqn4sql(query)
if (q.SELECT && 'elements' in q) q.SELECT.expand ??= 'root'

let kind = q.kind || Object.keys(q)[0]
if (kind in { INSERT: 1, DELETE: 1, UPSERT: 1, UPDATE: 1 } || q.STREAM?.into) {
if (kind in { INSERT: 1, DELETE: 1, UPSERT: 1, UPDATE: 1 }) {
q = resolveView(q, this.model, this) // REVISIT: before resolveView was called on flat cqn obtained from cqn4sql -> is it correct to call on original q instead?
let target = q[kind]._transitions?.[0].target
if (target) q.target = target // REVISIT: Why isn't that done in resolveView?
Expand All @@ -297,7 +301,8 @@ class SQLService extends DatabaseService {
* @returns {import('./infer/cqn').Query}
*/
cqn4sql(q) {
if (!q.SELECT?.from?.join && !q.SELECT?.from?.SELECT && !this.model?.definitions[_target_name4(q)]) return _unquirked(q)
if (!q.SELECT?.from?.join && !q.SELECT?.from?.SELECT && !this.model?.definitions[_target_name4(q)])
return _unquirked(q)
return cqn4sql(q, this.model)
}

Expand Down Expand Up @@ -330,7 +335,6 @@ class SQLService extends DatabaseService {
* @interface
*/
class PreparedStatement {

/**
* Executes a prepared DML query, i.e., INSERT, UPDATE, DELETE, CREATE, DROP
* @abstract
Expand Down Expand Up @@ -380,9 +384,7 @@ const _target_name4 = q => {
q.UPDATE?.entity ||
q.DELETE?.from ||
q.CREATE?.entity ||
q.DROP?.entity ||
q.STREAM?.from ||
q.STREAM?.into
q.DROP?.entity
if (target?.SET?.op === 'union') throw new cds.error('UNION-based queries are not supported')
if (!target?.ref) return target
const [first] = target.ref
Expand All @@ -401,8 +403,11 @@ const _unquirked = q => {
return q
}


const sqls = new class extends SQLService { get factory() { return null } }
const sqls = new (class extends SQLService {
get factory() {
return null
}
})()
cds.extend(cds.ql.Query).with(
class {
forSQL() {
Expand Down
73 changes: 3 additions & 70 deletions db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ class CQN2SQLRenderer {
* @returns {string} SQL
*/
UPDATE(q) {
const { entity, with: _with, data, where } = q.UPDATE
const { entity, with: _with, data, where } = q.UPDATE
const elements = q.target?.elements
let sql = `UPDATE ${this.name(entity.ref?.[0] || entity)}`
if (entity.as) sql += ` AS ${entity.as}`
Expand Down Expand Up @@ -700,73 +700,6 @@ class CQN2SQLRenderer {
return (this.sql = sql)
}

// STREAM Statement -------------------------------------------------

/**
* Renders a STREAM query into generic SQL
* @param {import('./infer/cqn').STREAM} q
* @returns {string} SQL
*/
STREAM(q) {
const { STREAM } = q
return STREAM.from
? this.STREAM_from(q)
: STREAM.into
? this.STREAM_into(q)
: cds.error`Missing .form or .into in ${q}`
}

/**
* Renders a STREAM.into query into generic SQL
* @param {import('./infer/cqn').STREAM} q
* @returns {string} SQL
*/
STREAM_into(q) {
const { into, column, where, data } = q.STREAM

let sql
if (!_empty(column)) {
data.type = 'binary'
const update = UPDATE(into)
.with({ [column]: data })
.where(where)
Object.defineProperty(update, 'target', { value: q.target })
sql = this.UPDATE(update)
} else {
data.type = 'json'
// REVISIT: decide whether dataset streams should behave like INSERT or UPSERT
sql = this.UPSERT(UPSERT([{}]).into(into).forSQL())
this.values = [data]
}

return (this.sql = sql)
}

/**
* Renders a STREAM.from query into generic SQL
* @param {import('./infer/cqn').STREAM} q
* @returns {string} SQL
*/
STREAM_from(q) {
const { column, from, where, columns } = q.STREAM

const select = cds.ql
.SELECT(column ? [column] : columns)
.where(where)
.limit(column ? 1 : undefined)

// SELECT.from() does not accept joins
select.SELECT.from = from

if (column) {
this.one = true
} else {
select.SELECT.expand = 'root'
this.one = !!from.SELECT?.one
}
return this.SELECT(select.forSQL())
}

// Expression Clauses ---------------------------------------------

/**
Expand Down Expand Up @@ -880,8 +813,8 @@ class CQN2SQLRenderer {
case 'object':
if (val === null) return 'NULL'
if (val instanceof Date) return `'${val.toISOString()}'`
if (val instanceof Readable); // go on with default below
else if (Buffer.isBuffer(val)) val = val.toString('base64')
if (val instanceof Readable) ; // go on with default below
else if (Buffer.isBuffer(val)) ; // go on with default below
else if (is_regexp(val)) val = val.source
else val = JSON.stringify(val)
case 'string': // eslint-disable-line no-fallthrough
Expand Down
Loading

0 comments on commit 8165a4a

Please sign in to comment.