Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Readable Streams inside INSERT.entries #343

Merged
merged 15 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 114 additions & 11 deletions db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,105 @@ class CQN2SQLRenderer {

// Include this.values for placeholders
/** @type {unknown[][]} */
this.entries = [[...this.values, JSON.stringify(INSERT.entries)]]
return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${
this.columns
}) SELECT ${extraction} FROM json_each(?)`)
this.entries = []
if (INSERT.entries[0] instanceof Readable) {
INSERT.entries[0].type = 'json'
this.entries = [[...this.values, INSERT.entries[0]]]
} else {
const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries))
stream.type = 'json'
this.entries = [[...this.values, stream]]
}

return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${this.columns
}) SELECT ${extraction} FROM json_each(?)`)
}

async *INSERT_entries_stream(entries) {
const bufferLimit = 1 << 16
let buffer = '['

let sep = ''
for (const row of entries) {
buffer += `${sep}{`
if (!sep) sep = ','

let sepsub = ''
for (const key in row) {
const keyJSON = `${sepsub}${JSON.stringify(key)}:`
if (!sepsub) sepsub = ','

const val = row[key]
if (val instanceof Readable) {
buffer += `${keyJSON}"`

// TODO: double check that it works
val.setEncoding('base64')
for await (const chunk of val) {
buffer += chunk
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += '"'
} else {
buffer += `${keyJSON}${val === undefined ? 'null' : JSON.stringify(val)}`
}
}
buffer += '}'
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += ']'
yield buffer
}

async *INSERT_rows_stream(entries) {
const bufferLimit = 1 << 16
let buffer = '['

let sep = ''
for (const row of entries) {
buffer += `${sep}[`
if (!sep) sep = ','

let sepsub = ''
for (let key = 0; key < row.length; key++) {
const val = row[key]
if (val instanceof Readable) {
buffer += `${sepsub}"`

// TODO: double check that it works
val.setEncoding('base64')
for await (const chunk of val) {
buffer += chunk
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += '"'
} else {
buffer += `${sepsub}${val === undefined ? 'null' : JSON.stringify(val)}`
}

if (!sepsub) sepsub = ','
}
buffer += ']'
if (buffer.length > bufferLimit) {
yield buffer
buffer = ''
}
}

buffer += ']'
yield buffer
}

/**
Expand All @@ -443,21 +538,29 @@ class CQN2SQLRenderer {
const alias = INSERT.into.as
const elements = q.elements || q.target?.elements
const columns = INSERT.columns
|| cds.error`Cannot insert rows without columns or elements`
|| cds.error`Cannot insert rows without columns or elements`

const inputConverter = this.class._convertInput
const extraction = columns.map((c,i) => {
const extraction = columns.map((c, i) => {
const extract = `value->>'$[${i}]'`
const element = elements?.[c]
const converter = element?.[inputConverter]
return converter?.(extract,element) || extract
return converter?.(extract, element) || extract
})

this.columns = columns.map(c => this.quote(c))
this.entries = [[JSON.stringify(INSERT.rows)]]
return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${
this.columns
}) SELECT ${extraction} FROM json_each(?)`)

if (INSERT.rows[0] instanceof Readable) {
INSERT.rows[0].type = 'json'
this.entries = [[...this.values, INSERT.rows[0]]]
} else {
const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows))
stream.type = 'json'
this.entries = [[...this.values, stream]]
}

return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${this.columns
}) SELECT ${extraction} FROM json_each(?)`)
}

/**
Expand Down
22 changes: 12 additions & 10 deletions db-service/test/cqn2sql/insert.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict'
const { text } = require('stream/consumers')

const cds = require('@sap/cds/lib')
const cqn2sql = require('../../lib/cqn2sql')

Expand All @@ -9,7 +11,7 @@ beforeAll(async () => {
describe('insert', () => {
describe('insert only', () => {
// Values are missing
test('test with insert values into columns', () => {
test('test with insert values into columns', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo'] },
Expand All @@ -19,10 +21,10 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert rows into columns', () => {
test('test with insert rows into columns', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo'] },
Expand All @@ -34,11 +36,11 @@ describe('insert', () => {
},
}
const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

// no filtering in INSERT
xtest('test filter in insert rows into columns with not existing column', () => {
xtest('test filter in insert rows into columns with not existing column', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo2'] },
Expand All @@ -50,10 +52,10 @@ describe('insert', () => {
},
}
const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert entries', () => {
test('test with insert entries', async () => {
const cqnInsert = {
INSERT: {
into: 'Foo2',
Expand All @@ -65,10 +67,10 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with insert with alias', () => {
test('test with insert with alias', async () => {
const cqnInsert = {
INSERT: {
into: { ref: ['Foo2'], as: 'Fooooo2' },
Expand All @@ -80,7 +82,7 @@ describe('insert', () => {
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})
})

Expand Down
14 changes: 8 additions & 6 deletions db-service/test/cqn2sql/upsert.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict'
const { text } = require('stream/consumers')

const cds = require('@sap/cds/lib')
const cqn2sql = require('../../lib/cqn2sql')

Expand All @@ -7,7 +9,7 @@ beforeAll(async () => {
})

describe('upsert', () => {
test('test with keys only', () => {
test('test with keys only', async () => {
const cqnUpsert = {
UPSERT: {
into: 'Foo2',
Expand All @@ -17,11 +19,11 @@ describe('upsert', () => {
}

const { sql, entries } = cqn2sql(cqnUpsert)
expect({ sql, entries }).toMatchSnapshot()
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})

test('test with entries', () => {
const cqnInsert = {
test('test with entries', async () => {
const cqnUpsert = {
UPSERT: {
into: 'Foo2',
entries: [
Expand All @@ -31,7 +33,7 @@ describe('upsert', () => {
},
}

const { sql, entries } = cqn2sql(cqnInsert)
expect({ sql, entries }).toMatchSnapshot()
const { sql, entries } = cqn2sql(cqnUpsert)
expect({ sql, entries: [[await text(entries[0][0])]] }).toMatchSnapshot()
})
})
38 changes: 11 additions & 27 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@
}

async onINSERT({ query, data }) {
// Using runBatch for HANA 2.0 and lower sometimes leads to integer underflow errors
// REVISIT: Address runBatch issues in node-hdb and hana-client
if (HANAVERSION <= 2) {
return super.onINSERT(...arguments)
}
const { sql, entries, cqn } = this.cqn2sql(query, data)
if (!sql) return // Do nothing when there is nothing to be done
const ps = await this.prepare(sql)
// HANA driver supports batch execution
const results = entries ? await ps.runBatch(entries) : await ps.run()
const results = await (entries
? HANAVERSION <= 2
? entries.reduce((l, c) => l.then(() => ps.run(c)), Promise.resolve(0))
: ps.run(entries)
: ps.run())
return new this.class.InsertResults(cqn, results)
}

Expand Down Expand Up @@ -269,7 +268,7 @@

const src = q

const { limit, one, orderBy, expand, columns, localized, count, from, parent } = q.SELECT

Check warning on line 271 in hana/lib/HANAService.js

View workflow job for this annotation

GitHub Actions / Node.js 18

'from' is assigned a value but never used

Check warning on line 271 in hana/lib/HANAService.js

View workflow job for this annotation

GitHub Actions / HANA Node.js 18

'from' is assigned a value but never used

// When one of these is defined wrap the query in a sub query
if (expand || (parent && (limit || one || orderBy))) {
Expand Down Expand Up @@ -544,28 +543,13 @@
// HANA Express does not process large JSON documents
// The limit is somewhere between 64KB and 128KB
if (HANAVERSION <= 2) {
// Simple line splitting would be preferred, but batch execute does not work properly
// Which makes sending every line separately much slower
// this.entries = INSERT.entries.map(e => [JSON.stringify(e)])

this.entries = []
let cur = ['[']
this.entries.push(cur)
INSERT.entries
.map(r => JSON.stringify(r))
.forEach(r => {
if (cur[0].length > 65535) {
cur[0] += ']'
cur = ['[']
this.entries.push(cur)
} else if (cur[0].length > 1) {
cur[0] += ','
}
cur[0] += r
})
cur[0] += ']'
this.entries = INSERT.entries.map(e => (e instanceof Readable ? [e] : [Readable.from(this.INSERT_entries_stream([e]))]))
} else {
this.entries = [[JSON.stringify(INSERT.entries)]]
this.entries = [
INSERT.entries[0] instanceof Readable
? INSERT.entries[0]
: Readable.from(this.INSERT_entries_stream(INSERT.entries))
]
}

// WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data
Expand Down
10 changes: 6 additions & 4 deletions postgres/lib/PostgresService.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class PostgresService extends SQLService {
cds.options.dialect = 'postgres'
}
this.kind = 'postgres'
this._queryCache = {}
return super.init(...arguments)
}

Expand Down Expand Up @@ -135,12 +136,13 @@ GROUP BY k
}

prepare(sql) {
const query = {
// Track queries name for postgres referencing prepare statements
// sha1 as it needs to be less then 63 character
const sha = crypto.createHash('sha1').update(sql).digest('hex')
const query = this._queryCache[sha] = this._queryCache[sha] || {
_streams: 0,
text: sql,
// Track queries name for postgres referencing prepare statements
// sha1 as it needs to be less then 63 characters
name: crypto.createHash('sha1').update(sql).digest('hex'),
name: sha,
}
return {
run: async values => {
Expand Down
12 changes: 6 additions & 6 deletions sqlite/test/general/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ describe('STREAM', () => {

describe('cds.stream', () => {
beforeAll(async () => {
const data = fs.readFileSync(path.join(__dirname, 'samples/test.jpg'))
await cds.run('INSERT INTO test_Images values(?,?)', [
[1, data],
[2, null],
])
let data = fs.createReadStream(path.join(__dirname, 'samples/test.jpg'))
await INSERT([
{ data: data, ID: 1 },
{ data: null, ID: 2 },
]).into('test.Images')
})

afterAll(async () => {
Expand Down Expand Up @@ -236,7 +236,7 @@ describe('STREAM', () => {
const { Images } = cds.entities('test')
const stream = fs.createReadStream(path.join(__dirname, 'samples/data.json'))

const changes = await STREAM.into(Images).data(stream)
const changes = await INSERT.into(Images).entries(stream)
try {
expect(changes).toEqual(2)
} catch (e) {
Expand Down
11 changes: 11 additions & 0 deletions test/scenarios/bookshop/genres.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ describe('Bookshop - Genres', () => {

delete insertResponse.data['@odata.context']
const assert = require('assert')

// Read after write does not sort the results
// therefor asynchronious databases might return in different orders
const sort = (a, b) => {
if (!a?.children || !b?.children) return
const order = b.children.reduce((l, c, i) => { l[c.ID] = i; return l }, {})
a.children.sort((a, b) => order[a.ID] - order[b.ID])
a.children.forEach((c, i) => sort(c, b.children[i]))
}

sort(insertResponse.data, body)
assert.deepEqual(insertResponse.data, body)

// REVISIT clean up so the deep update test does not fail
Expand Down
Loading