Skip to content

Commit

Permalink
feat: custom length encoding and decoding (#8)
Browse files Browse the repository at this point in the history
This PR allows a custom length encoding/decoding function to be passed to `encode`/`decode`.

There's also a int32BE fixed length encoder/decoder. e.g.

```js
const lp = require('it-length-prefixed')
const { int32BEDecode, int32BEDecode } = require('it-length-prefixed')

await pipe(
  [Buffer.from('hello world')],
  lp.encode({ lengthEncoder: int32BEDecode }),
  lp.decode({ lengthDecoder: int32BEDecode }),
  async source => {
    for await (const chunk of source) {
      console.log(chunk.toString())
    }
  }
)
```

See updated README for more info.

BREAKING CHANGE: Additional validation now checks for messages with a length that is too long to prevent a possible DoS attack. The error code `ERR_MSG_TOO_LONG` has changed to `ERR_MSG_DATA_TOO_LONG` and the error code `ERR_MSG_LENGTH_TOO_LONG` has been added.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw authored Nov 13, 2019
1 parent b651ba4 commit e419b63
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 65 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: node_js
node_js:
- 10
- 12

script:
- npm run lint
Expand Down
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,31 @@ console.log(decoded)

- `opts: Object`, optional
- `poolSize: 10 * 1024`: Buffer pool size to allocate up front
- `minPoolSize: 8`: The minimum size the pool can be before it is re-allocated. Note: it is important this value is greater than the maximum value that can be encoded by the `lengthEncoder` (see the next option). Since encoded lengths are written into a buffer pool, there needs to be enough space to hold the encoded value.
- `lengthEncoder: Function`: A function that encodes the length that will prefix each message. By default this is a [`varint`](https://www.npmjs.com/package/varint) encoder. It is passed a `value` to encode, an (optional) `target` buffer to write to and an (optional) `offset` to start writing from. The function should encode the `value` into the `target` (or alloc a new Buffer if not specified), set the `lengthEncoder.bytes` value (the number of bytes written) and return the `target`.
- The following additional length encoders are available:
- **int32BE** - `const { int32BEEncode } = require('it-length-prefixed')`

All messages will be prefixed with a varint.
Returns a [transform](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#transform-it) that yields [`BufferList`](https://www.npmjs.com/package/bl) objects. All messages will be prefixed with a length, determined by the `lengthEncoder` function.

Returns a [transform](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#transform-it) that yields [`BufferList`](https://www.npmjs.com/package/bl) objects.

### `encode.single(chunk)`
### `encode.single(chunk, [opts])`

- `chunk: Buffer|BufferList` chunk to encode
- `opts: Object`, optional
- `lengthEncoder: Function`: See description above. Note that this encoder will _not_ be passed a `target` or `offset` and so will need to allocate a buffer to write to.

Returns a `BufferList` containing the encoded chunk.

### `decode([opts])`

- `opts: Object`, optional
- `maxDataLength`: If provided, will not decode messages longer than the size specified, if omitted will use the current default of 4MB.
- `maxLengthLength`: If provided, will not decode messages whose length section exceeds the size specified, if omitted will use the default of 147 bytes.
- `maxDataLength`: If provided, will not decode messages whose data section exceeds the size specified, if omitted will use the default of 4MB.
- `onLength(len: Number)`: Called for every length prefix that is decoded from the stream
- `onData(data: BufferList)`: Called for every chunk of data that is decoded from the stream
- `lengthDecoder: Function`: A function that decodes the length that prefixes each message. By default this is a [`varint`](https://www.npmjs.com/package/varint) decoder. It is passed some `data` to decode which is a [`BufferList`](https://www.npmjs.com/package/bl). The function should decode the length, set the `lengthDecoder.bytes` value (the number of bytes read) and return the length. If the length cannot be decoded, the function should throw a `RangeError`.
- The following additional length decoders are available:
- **int32BE** - `const { int32BEDecode } = require('it-length-prefixed')`

Returns a [transform](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#transform-it) that yields [`BufferList`](https://www.npmjs.com/package/bl) objects.

Expand All @@ -87,8 +95,10 @@ Behaves like `decode` except it only reads the exact number of bytes needed for

- `reader: Reader`: An [it-reader](https://github.com/alanshaw/it-reader)
- `opts: Object`, optional
- `maxDataLength`: If provided, will not decode messages longer than the size specified, if omitted will use the current default of 4MB.
- `maxLengthLength`: If provided, will not decode messages whose length section exceeds the size specified, if omitted will use the default of 147 bytes.
- `maxDataLength`: If provided, will not decode messages whose data section exceeds the size specified, if omitted will use the default of 4MB.
- `onData(data: BufferList)`: Called for every chunk of data that is decoded from the stream
- `lengthEncoder: Function`: See description above.

Returns a [transform](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#transform-it) that yields [`BufferList`](https://www.npmjs.com/package/bl) objects.

Expand Down
47 changes: 20 additions & 27 deletions src/decode.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,39 @@
'use strict'

const BufferList = require('bl/BufferList')
const Varint = require('varint')
const varintDecode = require('./varint-decode')

const MSB = 0x80
const isEndByte = byte => !(byte & MSB)
// Maximum length of the length section of the message
const MAX_LENGTH_LENGTH = 8 // Varint.encode(Number.MAX_SAFE_INTEGER).length
// Maximum length of the data section of the message
const MAX_DATA_LENGTH = 1024 * 1024 * 4

const toBufferProxy = bl => new Proxy({}, {
get: (_, prop) => prop[0] === 'l' ? bl[prop] : bl.get(parseInt(prop))
})

const Empty = Buffer.alloc(0)

const ReadModes = { LENGTH: 'readLength', DATA: 'readData' }

const ReadHandlers = {
[ReadModes.LENGTH]: (chunk, buffer, state, options) => {
// console.log(ReadModes.LENGTH, chunk.length)
let endByteIndex = -1

// BufferList bytes must be accessed via .get
const getByte = chunk.get ? i => chunk.get(i) : i => chunk[i]
buffer = buffer.append(chunk)

for (let i = 0; i < chunk.length; i++) {
if (isEndByte(getByte(i))) {
endByteIndex = i
break
let dataLength
try {
dataLength = options.lengthDecoder(buffer)
} catch (err) {
if (buffer.length > options.maxLengthLength) {
throw Object.assign(err, { message: 'message length too long', code: 'ERR_MSG_LENGTH_TOO_LONG' })
}
if (err instanceof RangeError) {
return { mode: ReadModes.LENGTH, buffer }
}
throw err
}

if (endByteIndex === -1) {
return { mode: ReadModes.LENGTH, buffer: buffer.append(chunk) }
}

endByteIndex = buffer.length + endByteIndex
buffer = buffer.append(chunk)

const dataLength = Varint.decode(toBufferProxy(buffer.shallowSlice(0, endByteIndex + 1)))

if (dataLength > options.maxDataLength) {
throw Object.assign(new Error('message too long'), { code: 'ERR_MSG_TOO_LONG' })
throw Object.assign(new Error('message data too long'), { code: 'ERR_MSG_DATA_TOO_LONG' })
}

chunk = buffer.shallowSlice(endByteIndex + 1)
chunk = buffer.shallowSlice(options.lengthDecoder.bytes)
buffer = new BufferList()

if (options.onLength) options.onLength(dataLength)
Expand Down Expand Up @@ -77,6 +67,8 @@ const ReadHandlers = {

function decode (options) {
options = options || {}
options.lengthDecoder = options.lengthDecoder || varintDecode
options.maxLengthLength = options.maxLengthLength || MAX_LENGTH_LENGTH
options.maxDataLength = options.maxDataLength || MAX_DATA_LENGTH

return source => (async function * () {
Expand Down Expand Up @@ -127,4 +119,5 @@ decode.fromReader = (reader, options) => {
}

module.exports = decode
module.exports.MAX_LENGTH_LENGTH = MAX_LENGTH_LENGTH
module.exports.MAX_DATA_LENGTH = MAX_DATA_LENGTH
24 changes: 15 additions & 9 deletions src/encode.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
'use strict'

const Varint = require('varint')
const { Buffer } = require('buffer')
const BufferList = require('bl/BufferList')
const varintEncode = require('./varint-encode')

const MIN_POOL_SIZE = 147 // Varint.encode(Number.MAX_VALUE).length
const MIN_POOL_SIZE = 8 // Varint.encode(Number.MAX_SAFE_INTEGER).length
const DEFAULT_POOL_SIZE = 10 * 1024

function encode (options) {
options = options || {}
options.poolSize = Math.max(options.poolSize || DEFAULT_POOL_SIZE, MIN_POOL_SIZE)

const poolSize = Math.max(options.poolSize || DEFAULT_POOL_SIZE, options.minPoolSize || MIN_POOL_SIZE)
const encodeLength = options.lengthEncoder || varintEncode

return source => (async function * () {
let pool = Buffer.alloc(options.poolSize)
let pool = Buffer.alloc(poolSize)
let poolOffset = 0

for await (const chunk of source) {
Varint.encode(chunk.length, pool, poolOffset)
poolOffset += Varint.encode.bytes
const encodedLength = pool.slice(poolOffset - Varint.encode.bytes, poolOffset)
encodeLength(chunk.length, pool, poolOffset)
const encodedLength = pool.slice(poolOffset, poolOffset + encodeLength.bytes)
poolOffset += encodeLength.bytes

if (pool.length - poolOffset < MIN_POOL_SIZE) {
pool = Buffer.alloc(options.poolSize)
pool = Buffer.alloc(poolSize)
poolOffset = 0
}

Expand All @@ -31,7 +33,11 @@ function encode (options) {
})()
}

encode.single = c => new BufferList([Buffer.from(Varint.encode(c.length)), c])
encode.single = (chunk, options) => {
options = options || {}
const encodeLength = options.lengthEncoder || varintEncode
return new BufferList([encodeLength(chunk.length), chunk])
}

module.exports = encode
module.exports.MIN_POOL_SIZE = MIN_POOL_SIZE
Expand Down
6 changes: 6 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@

exports.encode = require('./encode')
exports.decode = require('./decode')

exports.varintEncode = require('./varint-encode')
exports.varintDecode = require('./varint-decode')

exports.int32BEEncode = require('./int32BE-encode')
exports.int32BEDecode = require('./int32BE-decode')
10 changes: 10 additions & 0 deletions src/int32BE-decode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict'

const int32BEDecode = data => {
if (data.length < 4) throw RangeError('Could not decode int32BE')
return data.readInt32BE(0)
}

int32BEDecode.bytes = 4 // Always because fixed length

module.exports = int32BEDecode
13 changes: 13 additions & 0 deletions src/int32BE-encode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict'

const { Buffer } = require('buffer')

const int32BEEncode = (value, target, offset) => {
target = target || Buffer.allocUnsafe(4)
target.writeInt32BE(value, offset)
return target
}

int32BEEncode.bytes = 4 // Always because fixed length

module.exports = int32BEEncode
15 changes: 15 additions & 0 deletions src/varint-decode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

const Varint = require('varint')

const toBufferProxy = bl => new Proxy({}, {
get: (_, prop) => prop[0] === 'l' ? bl[prop] : bl.get(parseInt(prop))
})

const varintDecode = data => {
const len = Varint.decode(Buffer.isBuffer(data) ? data : toBufferProxy(data))
varintDecode.bytes = Varint.decode.bytes
return len
}

module.exports = varintDecode
14 changes: 14 additions & 0 deletions src/varint-encode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

const Varint = require('varint')
const { Buffer } = require('buffer')

// Encode the passed length `value` to the `target` buffer at the given `offset`
const varintEncode = (value, target, offset) => {
const ret = Varint.encode(value, target, offset)
varintEncode.bytes = Varint.encode.bytes
// If no target, create Buffer from returned array
return target || Buffer.from(ret)
}

module.exports = varintEncode
9 changes: 9 additions & 0 deletions test/_helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict'

const { map } = require('streaming-iterables')
const randomInt = require('random-int')
const randomBytes = require('random-bytes')

module.exports.toBuffer = map(c => c.slice())
module.exports.times = (n, fn) => Array.from(Array(n), fn)
module.exports.someBytes = n => randomBytes(randomInt(1, n || 32))
9 changes: 3 additions & 6 deletions test/decode.from-reader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
const pipe = require('it-pipe')
const Reader = require('it-reader')
const { expect } = require('chai')
const randomInt = require('random-int')
const randomBytes = require('random-bytes')
const { map, collect } = require('streaming-iterables')
const { collect } = require('streaming-iterables')
const Varint = require('varint')
const { toBuffer, times, someBytes } = require('./_helpers')

const lp = require('../')
const toBuffer = map(c => c.slice())
const times = (n, fn) => Array.from(Array(n), fn)
const someBytes = n => randomBytes(randomInt(1, n || 32))

describe('decode from reader', () => {
it('should be able to decode from an it-reader', async () => {
Expand Down Expand Up @@ -47,7 +44,7 @@ describe('decode from reader', () => {
collect
)
} catch (err) {
expect(err.code).to.equal('ERR_MSG_TOO_LONG')
expect(err.code).to.equal('ERR_MSG_DATA_TOO_LONG')
return
}

Expand Down
56 changes: 51 additions & 5 deletions test/decode.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ const pipe = require('it-pipe')
const { expect } = require('chai')
const randomInt = require('random-int')
const randomBytes = require('random-bytes')
const { map, collect } = require('streaming-iterables')
const { collect } = require('streaming-iterables')
const Varint = require('varint')
const BufferList = require('bl/BufferList')
const defer = require('p-defer')
const { toBuffer, times } = require('./_helpers')

const lp = require('../')
const { MAX_DATA_LENGTH } = lp.decode
const toBuffer = map(c => c.slice())
const { MAX_LENGTH_LENGTH, MAX_DATA_LENGTH } = lp.decode
const { int32BEDecode } = lp

describe('decode', () => {
it('should decode single message', async () => {
Expand Down Expand Up @@ -83,7 +84,24 @@ describe('decode', () => {
expect(output.slice(-byteLength)).to.deep.equal(bytes)
})

it('should not decode a message that is too long', async () => {
it('should not decode message length that is too long', async () => {
// A value < 0x80 signifies end of varint so pass buffers of >= 0x80
// so that it will keep throwing a RangeError until we reach the max length
const lengths = times(5, () => Buffer.alloc(MAX_LENGTH_LENGTH / 4).fill(0x80))
const bytes = await randomBytes(randomInt(2, 64))

const input = [...lengths, bytes]

try {
await pipe(input, lp.decode(), toBuffer, collect)
} catch (err) {
expect(err.code).to.equal('ERR_MSG_LENGTH_TOO_LONG')
return
}
throw new Error('did not throw for too long message')
})

it('should not decode message data that is too long', async () => {
const byteLength = MAX_DATA_LENGTH + 1
const bytes = await randomBytes(byteLength)

Expand All @@ -95,7 +113,7 @@ describe('decode', () => {
try {
await pipe(input, lp.decode(), toBuffer, collect)
} catch (err) {
expect(err.code).to.equal('ERR_MSG_TOO_LONG')
expect(err.code).to.equal('ERR_MSG_DATA_TOO_LONG')
return
}
throw new Error('did not throw for too long message')
Expand Down Expand Up @@ -179,4 +197,32 @@ describe('decode', () => {

await Promise.all([lengthDeferred.promise, dataDeferred.promise])
})

it('should decode with custom length decoder (int32BE)', async () => {
const byteLength0 = randomInt(2, 64)
const encodedByteLength0 = Buffer.allocUnsafe(4)
encodedByteLength0.writeInt32BE(byteLength0)
const bytes0 = await randomBytes(byteLength0)

const byteLength1 = randomInt(1, 64)
const encodedByteLength1 = Buffer.allocUnsafe(4)
encodedByteLength1.writeInt32BE(byteLength1)
const bytes1 = await randomBytes(byteLength1)

const input = [
Buffer.concat([
encodedByteLength0,
bytes0.slice(0, 1)
]),
Buffer.concat([
bytes0.slice(1),
encodedByteLength1,
bytes1
])
]

const output = await pipe(input, lp.decode({ lengthDecoder: int32BEDecode }), toBuffer, collect)
expect(output[0].slice(-byteLength0)).to.deep.equal(bytes0)
expect(output[1].slice(-byteLength1)).to.deep.equal(bytes1)
})
})
Loading

0 comments on commit e419b63

Please sign in to comment.