Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up bind functionality #2286

Merged
merged 8 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/pg-protocol/src/buffer-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class Writer {
private offset: number = 5
private headerPosition: number = 0
constructor(private size = 256) {
this.buffer = Buffer.alloc(size)
this.buffer = Buffer.allocUnsafe(size)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious -- is changing to allocUnsafe here also contributes to performance improvements? The original PR comment doesn't mention anything about this change.
Also how certain we are that this doesn't cause any nasty side-effects of being, well, unsafe?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a link to the NodeJS documentation on Buffer.allocUnsafe().

I'm not an expert on Buffers, though the documentation's last sentenance says this can increase a performance (subtle) over Buffer.alloc().

Allocates a new Buffer of size bytes. If size is larger than buffer.constants.MAX_LENGTH or smaller than 0, ERR_INVALID_OPT_VALUE is thrown.

The underlying memory for Buffer instances created in this way is not initialized. The contents of the newly created Buffer are unknown and may contain sensitive data. Use Buffer.alloc() instead to initialize Buffer instances with zeroes.

[...]

Use of this pre-allocated internal memory pool is a key difference between calling Buffer.alloc(size, fill) vs. Buffer.allocUnsafe(size).fill(fill). Specifically, Buffer.alloc(size, fill) will never use the internal Buffer pool, while Buffer.allocUnsafe(size).fill(fill) will use the internal Buffer pool if size is less than or equal to half Buffer.poolSize. The difference is subtle but can be important when an application requires the additional performance that Buffer.allocUnsafe() provides.

}

private ensure(size: number): void {
Expand All @@ -15,7 +15,7 @@ export class Writer {
// exponential growth factor of around ~ 1.5
// https://stackoverflow.com/questions/2269063/buffer-growth-strategy
var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size
this.buffer = Buffer.alloc(newSize)
this.buffer = Buffer.allocUnsafe(newSize)
oldBuffer.copy(this.buffer)
}
}
Expand Down
67 changes: 35 additions & 32 deletions packages/pg-protocol/src/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,50 +106,53 @@ type BindOpts = {
binary?: boolean
statement?: string
values?: any[]
// optional map from JS value to postgres value per parameter
valueMap?: (param: any) => any
brianc marked this conversation as resolved.
Show resolved Hide resolved
}

const bind = (config: BindOpts = {}): Buffer => {
// normalize config
const portal = config.portal || ''
const statement = config.statement || ''
const binary = config.binary || false
var values = config.values || emptyArray
var len = values.length
const paramWriter = new Writer()

var useBinary = false
// TODO(bmc): all the loops in here aren't nice, we can do better
for (var j = 0; j < len; j++) {
useBinary = useBinary || values[j] instanceof Buffer
}

var buffer = writer.addCString(portal).addCString(statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer ? 1 : 0)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
const writeValues = function (values: any[], valueMap?: (val: any) => any): void {
for (let i = 0; i < values.length; i++) {
var val = values[i]
if (val === null || typeof val === 'undefined') {
buffer.addInt32(-1)
writer.addInt16(0)
paramWriter.addInt32(-1)
} else if (val instanceof Buffer) {
buffer.addInt32(val.length)
buffer.add(val)
writer.addInt16(1)
const mappedVal = valueMap ? valueMap(val) : val
paramWriter.addInt32(mappedVal.length)
paramWriter.add(mappedVal)
} else {
buffer.addInt32(Buffer.byteLength(val))
buffer.addString(val)
writer.addInt16(0)
const mappedVal = valueMap ? valueMap(val) : val
paramWriter.addInt32(Buffer.byteLength(mappedVal))
paramWriter.addString(mappedVal)
brianc marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

const bind = (config: BindOpts = {}): Buffer => {
// normalize config
const portal = config.portal || ''
const statement = config.statement || ''
const binary = config.binary || false
const values = config.values || emptyArray
const len = values.length

writer.addCString(portal).addCString(statement)
writer.addInt16(len)

writeValues(values, config.valueMap)

writer.addInt16(len)
writer.add(paramWriter.flush())

if (binary) {
buffer.addInt16(1) // format codes to use binary
buffer.addInt16(1)
writer.addInt16(1) // format codes to use binary
writer.addInt16(1)
} else {
buffer.addInt16(0) // format codes to use text
writer.addInt16(0) // format codes to use text
}
return writer.flush(code.bind)
}
Expand Down
59 changes: 31 additions & 28 deletions packages/pg/bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,40 @@ const run = async () => {
console.log('warmup done')
const seconds = 5

let queries = await bench(client, params, seconds * 1000)
console.log('')
console.log('little queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 733 qps')
for (let i = 0; i < 4; i++) {
let queries = await bench(client, params, seconds * 1000)
console.log('')
console.log('little queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 733 qps')

console.log('')
queries = await bench(client, seq, seconds * 1000)
console.log('sequence queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 1309 qps')
console.log('')
queries = await bench(client, seq, seconds * 1000)
console.log('sequence queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 1309 qps')

console.log('')
queries = await bench(client, insert, seconds * 1000)
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 6303 qps')
console.log('')
queries = await bench(client, insert, seconds * 1000)
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 6445 qps')

console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
await new Promise((resolve) => setTimeout(resolve, 250))
}

await client.end()
await client.end()
Expand Down
28 changes: 14 additions & 14 deletions packages/pg/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,22 @@ class Query extends EventEmitter {
})
}

if (this.values) {
try {
this.values = this.values.map(utils.prepareValue)
} catch (err) {
this.handleError(err, connection)
return
}
// because we're mapping user supplied values to
// postgres wire protocol compatible values it could
// throw an exception, so try/catch this section
try {
connection.bind({
portal: this.portal,
statement: this.name,
values: this.values,
binary: this.binary,
valueMap: utils.prepareValue,
})
} catch (err) {
this.handleError(err, connection)
return
}

connection.bind({
portal: this.portal,
statement: this.name,
values: this.values,
binary: this.binary,
})

connection.describe({
type: 'P',
name: this.portal || '',
Expand Down
73 changes: 0 additions & 73 deletions packages/pg/lib/result.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,79 +95,6 @@ class Result {
}
}
}

// adds a command complete message
addCommandComplete(msg) {
var match
if (msg.text) {
// pure javascript
match = matchRegexp.exec(msg.text)
} else {
// native bindings
match = matchRegexp.exec(msg.command)
}
if (match) {
this.command = match[1]
if (match[3]) {
// COMMMAND OID ROWS
this.oid = parseInt(match[2], 10)
this.rowCount = parseInt(match[3], 10)
} else if (match[2]) {
// COMMAND ROWS
this.rowCount = parseInt(match[2], 10)
}
}
}

_parseRowAsArray(rowData) {
var row = new Array(rowData.length)
for (var i = 0, len = rowData.length; i < len; i++) {
var rawValue = rowData[i]
if (rawValue !== null) {
row[i] = this._parsers[i](rawValue)
} else {
row[i] = null
}
}
return row
}

parseRow(rowData) {
var row = {}
for (var i = 0, len = rowData.length; i < len; i++) {
var rawValue = rowData[i]
var field = this.fields[i].name
if (rawValue !== null) {
row[field] = this._parsers[i](rawValue)
} else {
row[field] = null
}
}
return row
}

addRow(row) {
this.rows.push(row)
}

addFields(fieldDescriptions) {
// clears field definitions
// multiple query statements in 1 action can result in multiple sets
// of rowDescriptions...eg: 'select NOW(); select 1::int;'
// you need to reset the fields
this.fields = fieldDescriptions
if (this.fields.length) {
this._parsers = new Array(fieldDescriptions.length)
}
for (var i = 0; i < fieldDescriptions.length; i++) {
var desc = fieldDescriptions[i]
if (this._types) {
this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text')
} else {
this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text')
}
}
}
}

module.exports = Result