Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hdb): Allow streaming blob in and out over the same connection #561

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions hana/lib/drivers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class HANADriver {
run: async params => {
const { values, streams } = this._extractStreams(params)
const stmt = await prep
let changes = await prom(stmt, 'exec')(values)
await this._sendStreams(stmt, streams)
const execProm = prom(stmt, 'exec')(values)
await this._sendStreams(execProm, stmt, streams)
let changes = await execProm
// REVISIT: hana-client does not return any changes when doing an update with streams
// This causes the best assumption to be that the changes are one
// To get the correct information it is required to send a count with the update where clause
Expand Down Expand Up @@ -75,10 +76,12 @@ class HANADriver {
/**
* Sends streams to the prepared statement
* @abstract
* @param {Promise} execProm
* @param {import('@cap-js/db-service/lib/SQLService').PreparedStatement} stmt
* @param {Array<ReadableStream>} streams
*/
async _sendStreams(stmt, streams) {
async _sendStreams(execProm, stmt, streams) {
execProm
stmt
streams
return
Expand Down
31 changes: 11 additions & 20 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { Readable, Stream } = require('stream')

const hdb = require('@sap/hana-client')
const { StringDecoder } = require('string_decoder')

Check warning on line 4 in hana/lib/drivers/hana-client.js

View workflow job for this annotation

GitHub Actions / HANA Node.js 18

'StringDecoder' is assigned a value but never used

Check warning on line 4 in hana/lib/drivers/hana-client.js

View workflow job for this annotation

GitHub Actions / Node.js 18

'StringDecoder' is assigned a value but never used
const { driver, prom, handleLevel } = require('./base')

const streamUnsafe = false
Expand Down Expand Up @@ -74,7 +74,7 @@
rsStreamsProm.resolve = resolve
rsStreamsProm.reject = reject
})
rsStreams.catch(() => {})
rsStreams.catch(() => { })

rs._rowPosition = -1
const _next = prom(rs, 'next')
Expand Down Expand Up @@ -161,7 +161,7 @@
}

const resultSet = Array.isArray(rows) ? rows[0] : rows

// merge table output params into scalar params
const params = Array.isArray(outParameters) && outParameters.filter(md => !(md.PARAMETER_NAME in result))
if (params && params.length) {
Expand All @@ -174,7 +174,7 @@
resultSet.nextResult()
}
}

return result
}

Expand All @@ -195,15 +195,19 @@
}
}

async _sendStreams(stmt, streams) {
async _sendStreams(execProm, stmt, streams) {
await execProm
// Sends all streams to the database
const sendParameterData = prom(stmt, 'sendParameterData')
for (let i = 0; i < streams.length; i++) {
const curStream = streams[i]
if (!curStream) continue
for await (const chunk of curStream) {
curStream.pause()
await sendParameterData(i, Buffer.from(chunk))
const buffer = Buffer.from(chunk)
if (buffer.length > 0) {
await sendParameterData(i, buffer)
}
curStream.resume()
}
await sendParameterData(i, null)
Expand Down Expand Up @@ -343,29 +347,16 @@

const getData = prom(rs, 'getData')

let decoder = new StringDecoder(encoding)

let blobPosition = 0

while (true) {
// REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding
let start = 0
const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') {
decoder = {
write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'),
end: () => Buffer.allocUnsafe(0),
}
start = 7
}
blobPosition += read
if (read < binaryBuffer.byteLength) {
yield decoder.write(binaryBuffer.slice(start, read))
yield binaryBuffer.slice(0, read)
break
}
yield decoder.write(binaryBuffer.slice(start).toString('base64'))
yield binaryBuffer
}
yield decoder.end()
} catch (e) {
promChain.reject(e)
} finally {
Expand Down
38 changes: 27 additions & 11 deletions hana/lib/drivers/hdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
this._native = hdb.createClient(creds)
this._native.setAutoCommit(false)
this._native.on('close', () => this.destroy?.())
this._activeBlobStreams = 0

this.connected = false
}
Expand Down Expand Up @@ -91,14 +92,22 @@
obj[col] = await text(row[col].createReadStream())
continue
}
obj[col] = i > 3
? row[col] === null
? null
: (
row[col].createReadStream?.()
|| Readable.from(echoStream(row[col]), { objectMode: false })
)
: row[col]
if (i > 3) {
if (row[col] == null) {
obj[col] = null
continue
}
let blob = row[col].createReadStream?.()
if (blob) {
this._activeBlobStreams++
blob.on('end', () => { this._activeBlobStreams-- })
} else {
blob = Readable.from(echoStream(row[col]), { objectMode: false })
}
obj[col] = blob
} else {
obj[col] = row[col]
}
}
result.push(obj)
}
Expand Down Expand Up @@ -135,8 +144,8 @@

_getResultForProcedure(rows, outParameters) {
// on hdb, rows already contains results for scalar params
const isArray = Array.isArray(rows)
const result = isArray ? {...rows[0]} : {...rows}
const isArray = Array.isArray(rows)
const result = isArray ? { ...rows[0] } : { ...rows }

// merge table output params into scalar params
const args = isArray ? rows.slice(1) : []
Expand All @@ -146,7 +155,7 @@
result[params[i].PARAMETER_NAME] = args[i]
}
}

return result
}

Expand All @@ -167,6 +176,12 @@
streams,
}
}

async _sendStreams(/*execProm, stmt, streams */) {
if (this._activeBlobStreams) {
this._native._connection._queue.busy = false
}
}
}

function* echoStream(ret) {
Expand Down Expand Up @@ -422,6 +437,7 @@
}

const { readInt64LE } = require('hdb/lib/util/bignum.js')
const { stream } = require('@sap/cds/lib')

Check warning on line 440 in hana/lib/drivers/hdb.js

View workflow job for this annotation

GitHub Actions / HANA Node.js 18

'stream' is assigned a value but never used

Check warning on line 440 in hana/lib/drivers/hdb.js

View workflow job for this annotation

GitHub Actions / Node.js 18

'stream' is assigned a value but never used
const readBlob = function (state, encoding) {
// Check if the blob is null
let ens = state.ensure(2)
Expand Down
Loading