diff --git a/src/decode.js b/src/decode.js index 4aa55a1..b055ac8 100644 --- a/src/decode.js +++ b/src/decode.js @@ -20,16 +20,37 @@ function decode (opts) { return (read) => { reader(read) + + // this function has to be written without recursion + // or it blows the stack in case of sync stream function next () { - _decodeFromReader(reader, opts, (err, msg) => { - if (err) return p.end(err) + let doNext = true + let decoded = false - p.push(msg) - next() - }) + const decodeCb = (err, msg) => { + decoded = true + if (err) { + p.end(err) + doNext = false + } else { + p.push(msg) + if (!doNext) { + next() + } + } + } + + while (doNext) { + decoded = false + _decodeFromReader(reader, opts, decodeCb) + if (!decoded) { + doNext = false + } + } } next() + return p } } @@ -100,6 +121,11 @@ function readVarintMessage (reader, maxLength, cb) { if (msgSize > maxLength) { return cb(new Error('size longer than max permitted length of ' + maxLength + '!')) } + + if (msgSize <= 0) { + return cb(true) // eslint-disable-line standard/no-callback-literal + } + readMessage(reader, msgSize, (err, msg) => { if (err) { return cb(err) diff --git a/src/encode.js b/src/encode.js index 23c48b9..9664c0e 100644 --- a/src/encode.js +++ b/src/encode.js @@ -17,6 +17,7 @@ function encode (opts) { let used = 0 let ended = false + let first = true return (read) => (end, cb) => { if (end) ended = end @@ -24,19 +25,25 @@ function encode (opts) { read(null, (end, data) => { if (end) ended = end - if (ended) return cb(ended) + if (ended && !first) { + return cb(ended) + } + + first = false - if (!Buffer.isBuffer(data)) { + if (!ended && !Buffer.isBuffer(data)) { ended = new Error('data must be a buffer') return cb(ended) } + const dataLength = ended ? 0 : data.length + let encodedLength if (opts.fixed) { encodedLength = Buffer.alloc(4) - encodedLength.writeInt32BE(data.length, 0) // writes exactly 4 bytes + encodedLength.writeInt32BE(dataLength, 0) // writes exactly 4 bytes } else { - varint.encode(data.length, pool, used) + varint.encode(dataLength, pool, used) used += varint.encode.bytes encodedLength = pool.slice(used - varint.encode.bytes, used) @@ -46,10 +53,14 @@ function encode (opts) { } } + if (ended) { + return cb(null, encodedLength) + } + cb(null, Buffer.concat([ encodedLength, data - ])) + ], (opts.fixed ? 4 : varint.encode.bytes) + dataLength)) }) } } diff --git a/test/index.spec.js b/test/index.spec.js index 54ef6b7..8e20887 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -96,6 +96,35 @@ describe('pull-length-prefixed', () => { ) }) + it('zero length', (done) => { + pull( + pull.values(), + lp.encode(), + pull.collect((err, encoded) => { + if (err) throw err + + expect( + encoded + ).to.be.eql([Buffer.alloc(1, 0)]) + + pull( + pull.values(), + lp.encode(), + lp.decode(), + pull.collect((err, decoded) => { + if (err) throw err + + expect( + decoded + ).to.be.eql([]) + + done() + }) + ) + }) + ) + }) + it('push time based', (done) => { const p = new Pushable() const input = [] diff --git a/test/syncStream.spec.js b/test/syncStream.spec.js new file mode 100644 index 0000000..96e4b87 --- /dev/null +++ b/test/syncStream.spec.js @@ -0,0 +1,49 @@ +/* eslint-env mocha */ +'use strict' + +const Buffer = require('safe-buffer').Buffer + +const pull = require('pull-stream') +const expect = require('chai').expect +const varint = require('varint') + +const lp = require('../src') + +describe('pull-length-prefixed', () => { + it('sync stream', (done) => { + const input = [...Array(500).keys()].map(() => Buffer.from('payload')) + + pull( + pull.values(input), + lp.encode(), + pull.collect((err, encoded) => { + if (err) throw err + + expect( + encoded + ).to.be.eql( + input.map(data => { + const len = varint.encode(data.length) + return Buffer.concat([ + Buffer.alloc(len.length, len, 'utf8'), + Buffer.alloc(data.length, data, 'utf8') + ]) + })) + + pull( + pull.values(encoded), + lp.decode(), + pull.collect((err, output) => { + if (err) throw err + expect( + input + ).to.be.eql( + output + ) + done() + }) + ) + }) + ) + }) +})