Skip to content

Commit

Permalink
fix(hana): Remove encoding from hana-client streams (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobdenOs authored May 3, 2024
1 parent 80383f0 commit fed8f6f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
29 changes: 10 additions & 19 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const { Readable, Stream } = require('stream')

const hdb = require('@sap/hana-client')
const { StringDecoder } = require('string_decoder')
const { driver, prom, handleLevel } = require('./base')

const streamUnsafe = false
Expand Down Expand Up @@ -97,7 +96,7 @@ class HANAClientDriver extends driver {
row[col] = i > 3 ?
rs.isNull(i)
? null
: Readable.from(streamBlob(rsStreams, rs._rowPosition, i, 'binary'))
: Readable.from(streamBlob(rsStreams, rs._rowPosition, i), { objectMode: false })
: values[i]
}

Expand Down Expand Up @@ -153,7 +152,7 @@ class HANAClientDriver extends driver {
if (rs.getRowCount() === 0) return null
await prom(rs, 'next')()
if (rs.isNull(0)) return null
return Readable.from(streamBlob(rs, undefined, 0, 'binary'), { objectMode: false })
return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false })
}
return Readable.from(rsIterator(rs, one), { objectMode: false })
}
Expand Down Expand Up @@ -217,7 +216,8 @@ class HANAClientDriver extends driver {
if (!curStream) continue
for await (const chunk of curStream) {
curStream.pause()
await sendParameterData(i, Buffer.from(chunk))
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
if (buffer.length) await sendParameterData(i, buffer)
curStream.resume()
}
await sendParameterData(i, null)
Expand Down Expand Up @@ -293,7 +293,9 @@ async function* rsIterator(rs, one) {
yield buffer
buffer = ''

for await (const chunk of streamBlob(rs, undefined, columnIndex, 'base64', binaryBuffer)) {
const stream = Readable.from(streamBlob(rs, undefined, columnIndex, binaryBuffer), { objectMode: false })
stream.setEncoding('base64')
for await (const chunk of stream) {
yield chunk
}
buffer += '"'
Expand All @@ -316,7 +318,7 @@ async function* rsIterator(rs, one) {
yield buffer
}

async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
const promChain = {
resolve: () => { },
reject: () => { }
Expand Down Expand Up @@ -357,29 +359,18 @@ async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffe

const getData = prom(rs, 'getData')

let decoder = new StringDecoder(encoding)

let blobPosition = 0

while (true) {
// REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding
let start = 0
const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') {
decoder = {
write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'),
end: () => Buffer.allocUnsafe(0),
}
start = 7
}
blobPosition += read
if (read < binaryBuffer.byteLength) {
yield decoder.write(binaryBuffer.slice(start, read))
yield binaryBuffer.slice(0, read)
break
}
yield decoder.write(binaryBuffer.slice(start).toString('base64'))
yield binaryBuffer
}
yield decoder.end()
} catch (e) {
promChain.reject(e)
} finally {
Expand Down
11 changes: 11 additions & 0 deletions sqlite/test/general/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ describe('streaming', () => {
await checkSize(stream2_)
}))

test('WRITE stream property from READ stream', async () => cds.tx(async () => {
const { Images } = cds.entities('test')
const { data: stream } = await SELECT.one.from(Images).columns('data').where({ ID: 1 })

const changes = await UPDATE(Images).with({ data2: stream }).where({ ID: 3 })
expect(changes).toEqual(1)

const [{ data2: stream_ }] = await SELECT.from(Images).columns('data2').where({ ID: 3 })
await checkSize(stream_)
}))

test('WRITE multiple blob properties', async () => cds.tx(async () => {
const { Images } = cds.entities('test')
const blob1 = fs.readFileSync(path.join(__dirname, 'samples/test.jpg'))
Expand Down

0 comments on commit fed8f6f

Please sign in to comment.