Skip to content

Commit

Permalink
fix: sync streams and empty streams
Browse files Browse the repository at this point in the history
* support for empty streams
* return immediately on empty stream
* compute data.length only once
* added test for zero-length streams
* fix: properly handle sync streams

Co-Authored-By: jacobheun <jacobheun@gmail.com>
  • Loading branch information
jacobheun authored and dignifiedquire committed Apr 4, 2019
1 parent accb16b commit 33e3a64
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 10 deletions.
36 changes: 31 additions & 5 deletions src/decode.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,37 @@ function decode (opts) {

return (read) => {
reader(read)

// this function has to be written without recursion
// or it blows the stack in case of sync stream
function next () {
_decodeFromReader(reader, opts, (err, msg) => {
if (err) return p.end(err)
let doNext = true
let decoded = false

p.push(msg)
next()
})
const decodeCb = (err, msg) => {
decoded = true
if (err) {
p.end(err)
doNext = false
} else {
p.push(msg)
if (!doNext) {
next()
}
}
}

while (doNext) {
decoded = false
_decodeFromReader(reader, opts, decodeCb)
if (!decoded) {
doNext = false
}
}
}

next()

return p
}
}
Expand Down Expand Up @@ -100,6 +121,11 @@ function readVarintMessage (reader, maxLength, cb) {
if (msgSize > maxLength) {
return cb(new Error('size longer than max permitted length of ' + maxLength + '!'))
}

if (msgSize <= 0) {
return cb(true) // eslint-disable-line standard/no-callback-literal
}

readMessage(reader, msgSize, (err, msg) => {
if (err) {
return cb(err)
Expand Down
21 changes: 16 additions & 5 deletions src/encode.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,33 @@ function encode (opts) {
let used = 0

let ended = false
let first = true

return (read) => (end, cb) => {
if (end) ended = end
if (ended) return cb(ended)

read(null, (end, data) => {
if (end) ended = end
if (ended) return cb(ended)
if (ended && !first) {
return cb(ended)
}

first = false

if (!Buffer.isBuffer(data)) {
if (!ended && !Buffer.isBuffer(data)) {
ended = new Error('data must be a buffer')
return cb(ended)
}

const dataLength = ended ? 0 : data.length

let encodedLength
if (opts.fixed) {
encodedLength = Buffer.alloc(4)
encodedLength.writeInt32BE(data.length, 0) // writes exactly 4 bytes
encodedLength.writeInt32BE(dataLength, 0) // writes exactly 4 bytes
} else {
varint.encode(data.length, pool, used)
varint.encode(dataLength, pool, used)
used += varint.encode.bytes
encodedLength = pool.slice(used - varint.encode.bytes, used)

Expand All @@ -46,10 +53,14 @@ function encode (opts) {
}
}

if (ended) {
return cb(null, encodedLength)
}

cb(null, Buffer.concat([
encodedLength,
data
]))
], (opts.fixed ? 4 : varint.encode.bytes) + dataLength))
})
}
}
Expand Down
29 changes: 29 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@ describe('pull-length-prefixed', () => {
)
})

it('zero length', (done) => {
pull(
pull.values(),
lp.encode(),
pull.collect((err, encoded) => {
if (err) throw err

expect(
encoded
).to.be.eql([Buffer.alloc(1, 0)])

pull(
pull.values(),
lp.encode(),
lp.decode(),
pull.collect((err, decoded) => {
if (err) throw err

expect(
decoded
).to.be.eql([])

done()
})
)
})
)
})

it('push time based', (done) => {
const p = new Pushable()
const input = []
Expand Down
49 changes: 49 additions & 0 deletions test/syncStream.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* eslint-env mocha */
'use strict'

const Buffer = require('safe-buffer').Buffer

const pull = require('pull-stream')
const expect = require('chai').expect
const varint = require('varint')

const lp = require('../src')

describe('pull-length-prefixed', () => {
it('sync stream', (done) => {
const input = [...Array(500).keys()].map(() => Buffer.from('payload'))

pull(
pull.values(input),
lp.encode(),
pull.collect((err, encoded) => {
if (err) throw err

expect(
encoded
).to.be.eql(
input.map(data => {
const len = varint.encode(data.length)
return Buffer.concat([
Buffer.alloc(len.length, len, 'utf8'),
Buffer.alloc(data.length, data, 'utf8')
])
}))

pull(
pull.values(encoded),
lp.decode(),
pull.collect((err, output) => {
if (err) throw err
expect(
input
).to.be.eql(
output
)
done()
})
)
})
)
})
})

0 comments on commit 33e3a64

Please sign in to comment.