Skip to content

Commit

Permalink
feat: Support Readable Streams inside INSERT.entries (#343)
Browse files Browse the repository at this point in the history
It should always be possible to `INSERT` the results of a `SELECT`
statement. Which was no longer possible with the new approach of
streaming `cds.LargeBinary`. So this implementation allows for the
streams to be merged back into a single `JSON` stream.

```js
await (INSERT || UPSERT)(await SELECT.from('Books.drafts')).into('Books')
```

---------

Co-authored-by: Vitaly Kozyura <58591662+vkozyura@users.noreply.github.com>
  • Loading branch information
BobdenOs and vkozyura authored Jan 3, 2024
1 parent 9970e14 commit f6faf89
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 55 deletions.
109 changes: 107 additions & 2 deletions db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,107 @@ class CQN2SQLRenderer {

// Include this.values for placeholders
/** @type {unknown[][]} */
this.entries = [[...this.values, JSON.stringify(INSERT.entries)]]
this.entries = []
if (INSERT.entries[0] instanceof Readable) {
INSERT.entries[0].type = 'json'
this.entries = [[...this.values, INSERT.entries[0]]]
} else {
const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries))
stream.type = 'json'
this.entries = [[...this.values, stream]]
}

return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${this.columns
}) SELECT ${extraction} FROM json_each(?)`)
}

async *INSERT_entries_stream(entries) {
const bufferLimit = 1 << 16
let buffer = '['

let sep = ''
for (const row of entries) {
buffer += `${sep}{`
if (!sep) sep = ','

let sepsub = ''
for (const key in row) {
const keyJSON = `${sepsub}${JSON.stringify(key)}:`
if (!sepsub) sepsub = ','

const val = row[key]
if (val instanceof Readable) {
buffer += `${keyJSON}"`

// TODO: double check that it works
val.setEncoding('base64')
for await (const chunk of val) {
buffer += chunk
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += '"'
} else {
buffer += `${keyJSON}${val === undefined ? 'null' : JSON.stringify(val)}`
}
}
buffer += '}'
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += ']'
yield buffer
}

async *INSERT_rows_stream(entries) {
const bufferLimit = 1 << 16
let buffer = '['

let sep = ''
for (const row of entries) {
buffer += `${sep}[`
if (!sep) sep = ','

let sepsub = ''
for (let key = 0; key < row.length; key++) {
const val = row[key]
if (val instanceof Readable) {
buffer += `${sepsub}"`

// TODO: double check that it works
val.setEncoding('base64')
for await (const chunk of val) {
buffer += chunk
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += '"'
} else {
buffer += `${sepsub}${val === undefined ? 'null' : JSON.stringify(val)}`
}

if (!sepsub) sepsub = ','
}
buffer += ']'
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += ']'
yield buffer
}

/**
* Renders an INSERT query with rows property
* @param {import('./infer/cqn').INSERT} q
Expand All @@ -453,7 +549,16 @@ class CQN2SQLRenderer {
})

this.columns = columns.map(c => this.quote(c))
this.entries = [[JSON.stringify(INSERT.rows)]]

if (INSERT.rows[0] instanceof Readable) {
INSERT.rows[0].type = 'json'
this.entries = [[...this.values, INSERT.rows[0]]]
} else {
const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows))
stream.type = 'json'
this.entries = [[...this.values, stream]]
}

return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${this.columns
}) SELECT ${extraction} FROM json_each(?)`)
}
Expand Down
22 changes: 12 additions & 10 deletions db-service/test/cqn2sql/insert.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict'
const { text } = require('stream/consumers')

const cds = require('@sap/cds/lib')
const cqn2sql = require('../../lib/cqn2sql')

Expand All @@ -9,7 +11,7 @@ beforeAll(async () => {
describe('insert', () => {
describe('insert only', () => {
// Values are missing
test('test with insert values into columns', () => {
test('test with insert values into columns', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo'] },
Expand All @@ -19,10 +21,10 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert rows into columns', () => {
test('test with insert rows into columns', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo'] },
Expand All @@ -34,11 +36,11 @@ describe('insert', () => {
},
}
const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

// no filtering in INSERT
xtest('test filter in insert rows into columns with not existing column', () => {
xtest('test filter in insert rows into columns with not existing column', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo2'] },
Expand All @@ -50,10 +52,10 @@ describe('insert', () => {
},
}
const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert entries', () => {
test('test with insert entries', async () => {
const cqnInsert = {
INSERT: {
into: 'Foo2',
Expand All @@ -65,10 +67,10 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert with alias', () => {
test('test with insert with alias', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo2'], as: 'Fooooo2' },
Expand All @@ -80,7 +82,7 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})
})

Expand Down
14 changes: 8 additions & 6 deletions db-service/test/cqn2sql/upsert.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict'
const { text } = require('stream/consumers')

const cds = require('@sap/cds/lib')
const cqn2sql = require('../../lib/cqn2sql')

Expand All @@ -7,7 +9,7 @@ beforeAll(async () => {
})

describe('upsert', () => {
test('test with keys only', () => {
test('test with keys only', async () => {
const cqnUpsert = {
UPSERT: {
into: 'Foo2',
Expand All @@ -17,11 +19,11 @@ describe('upsert', () => {
}

const { sql, entries } = cqn2sql(cqnUpsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with entries', () => {
const cqnInsert = {
test('test with entries', async () => {
const cqnUpsert = {
UPSERT: {
into: 'Foo2',
entries: [
Expand All @@ -31,7 +33,7 @@ describe('upsert', () => {
},
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
const { sql, entries } = cqn2sql(cqnUpsert)
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})
})
38 changes: 11 additions & 27 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@ class HANAService extends SQLService {
}

async onINSERT({ query, data }) {
// Using runBatch for HANA 2.0 and lower sometimes leads to integer underflow errors
// REVISIT: Address runBatch issues in node-hdb and hana-client
if (HANAVERSION <= 2) {
return super.onINSERT(...arguments)
}
const { sql, entries, cqn } = this.cqn2sql(query, data)
if (!sql) return // Do nothing when there is nothing to be done
const ps = await this.prepare(sql)
// HANA driver supports batch execution
const results = entries ? await ps.runBatch(entries) : await ps.run()
const results = await (entries
? HANAVERSION <= 2
? entries.reduce((l, c) => l.then(() => ps.run(c)), Promise.resolve(0))
: ps.run(entries)
: ps.run())
return new this.class.InsertResults(cqn, results)
}

Expand Down Expand Up @@ -545,28 +544,13 @@ class HANAService extends SQLService {
// HANA Express does not process large JSON documents
// The limit is somewhere between 64KB and 128KB
if (HANAVERSION <= 2) {
// Simple line splitting would be preferred, but batch execute does not work properly
// Which makes sending every line separately much slower
// this.entries = INSERT.entries.map(e => [JSON.stringify(e)])

this.entries = []
let cur = ['[']
this.entries.push(cur)
INSERT.entries
.map(r => JSON.stringify(r))
.forEach(r => {
if (cur[0].length > 65535) {
cur[0] += ']'
cur = ['[']
this.entries.push(cur)
} else if (cur[0].length > 1) {
cur[0] += ','
}
cur[0] += r
})
cur[0] += ']'
this.entries = INSERT.entries.map(e => (e instanceof Readable ? [e] : [Readable.from(this.INSERT_entries_stream([e]))]))
} else {
this.entries = [[JSON.stringify(INSERT.entries)]]
this.entries = [
INSERT.entries[0] instanceof Readable
? INSERT.entries[0]
: Readable.from(this.INSERT_entries_stream(INSERT.entries))
]
}

// WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data
Expand Down
10 changes: 6 additions & 4 deletions postgres/lib/PostgresService.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class PostgresService extends SQLService {
cds.options.dialect = 'postgres'
}
this.kind = 'postgres'
this._queryCache = {}
return super.init(...arguments)
}

Expand Down Expand Up @@ -135,12 +136,13 @@ GROUP BY k
}

prepare(sql) {
const query = {
// Track queries name for postgres referencing prepare statements
// sha1 as it needs to be less then 63 character
const sha = crypto.createHash('sha1').update(sql).digest('hex')
const query = this._queryCache[sha] = this._queryCache[sha] || {
_streams: 0,
text: sql,
// Track queries name for postgres referencing prepare statements
// sha1 as it needs to be less then 63 characters
name: crypto.createHash('sha1').update(sql).digest('hex'),
name: sha,
}
return {
run: async values => {
Expand Down
12 changes: 6 additions & 6 deletions sqlite/test/general/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ describe('STREAM', () => {

describe('cds.stream', () => {
beforeAll(async () => {
const data = fs.readFileSync(path.join(__dirname, 'samples/test.jpg'))
await cds.run('INSERT INTO test_Images values(?,?)', [
[1, data],
[2, null],
])
let data = fs.createReadStream(path.join(__dirname, 'samples/test.jpg'))
await INSERT([
{ data: data, ID: 1 },
{ data: null, ID: 2 },
]).into('test.Images')
})

afterAll(async () => {
Expand Down Expand Up @@ -236,7 +236,7 @@ describe('STREAM', () => {
const { Images } = cds.entities('test')
const stream = fs.createReadStream(path.join(__dirname, 'samples/data.json'))

const changes = await STREAM.into(Images).data(stream)
const changes = await INSERT.into(Images).entries(stream)
try {
expect(changes).toEqual(2)
} catch (e) {
Expand Down
11 changes: 11 additions & 0 deletions test/scenarios/bookshop/genres.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ describe('Bookshop - Genres', () => {

delete insertResponse.data['@odata.context']
const assert = require('assert')

// Read after write does not sort the results
// therefor asynchronious databases might return in different orders
const sort = (a, b) => {
if (!a?.children || !b?.children) return
const order = b.children.reduce((l, c, i) => { l[c.ID] = i; return l }, {})
a.children.sort((a, b) => order[a.ID] - order[b.ID])
a.children.forEach((c, i) => sort(c, b.children[i]))
}

sort(insertResponse.data, body)
assert.deepEqual(insertResponse.data, body)

// REVISIT clean up so the deep update test does not fail
Expand Down

0 comments on commit f6faf89

Please sign in to comment.