Skip to content

Commit

Permalink
feat: encode + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed May 4, 2019
1 parent 7090f5d commit d5f6d03
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 100 deletions.
15 changes: 0 additions & 15 deletions .circleci/config.yml

This file was deleted.

37 changes: 31 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 16 additions & 20 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pull-length-prefixed",
"name": "it-length-prefixed",
"version": "1.3.2",
"description": "Streaming length prefixed buffers with pull-streams",
"description": "Streaming length prefixed buffers with async iterables",
"main": "src/index.js",
"scripts": {
"test": "aegir test -t node -t browser",
Expand All @@ -12,44 +12,40 @@
"release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser",
"build": "aegir build",
"coverage": "npx nyc -s npm run test:node -- --bail",
"coverage": "npx nyc --reporter=text --reporter=lcov npm run test:node -- --bail",
"coverage-publish": "npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov"
},
"repository": {
"type": "git",
"url": "git+https://github.com/dignifiedquire/pull-length-prefixed.git"
"url": "git+https://github.com/alanshaw/it-length-prefixed.git"
},
"keywords": [
"varint",
"pull-stream",
"async",
"iterable",
"iterator",
"length-prefixed-stream",
"length-prefixed"
],
"author": "Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"author": "Alan Shaw",
"license": "MIT",
"bugs": {
"url": "https://github.com/dignifiedquire/pull-length-prefixed/issues"
"url": "https://github.com/alanshaw/it-length-prefixed/issues"
},
"homepage": "https://github.com/dignifiedquire/pull-length-prefixed#readme",
"homepage": "https://github.com/alanshaw/it-length-prefixed#readme",
"dependencies": {
"pull-pushable": "^2.2.0",
"pull-reader": "^1.3.1",
"safe-buffer": "^5.1.2",
"varint": "^5.0.0"
},
"devDependencies": {
"aegir": "^18.2.1",
"chai": "^4.2.0",
"it-pipe": "^1.0.1",
"pull-block": "^1.4.0",
"pull-stream": "^3.6.9"
},
"contributors": [
"Dmitriy Ryajov <dryajov@gmail.com>",
"Jacob Heun <jacobheun@gmail.com>",
"Jacob Heun <jake@andyet.net>",
"Maciej Krüger <mkg20001@gmail.com>",
"Projjol Banerji <probaner23@gmail.com>",
"Richard Littauer <richard.littauer@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
"pull-stream": "^3.6.10",
"random-bytes": "^1.0.0",
"random-int": "^2.0.0",
"streaming-iterables": "^4.1.0"
}
}
83 changes: 24 additions & 59 deletions src/encode.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,35 @@
'use strict'

const Buffer = require('safe-buffer').Buffer
const Varint = require('varint')
const BufferList = require('bl')

module.exports = encode

const poolSize = 10 * 1024

function encode (opts) {
opts = Object.assign({
fixed: false
}, opts || {})

// Only needed for varint
const varint = require('varint')
let pool = opts.fixed ? null : createPool()
let used = 0

let ended = false
let first = true

return (read) => (end, cb) => {
if (end) ended = end
if (ended) return cb(ended)

read(null, (end, data) => {
if (end) ended = end
if (ended && !first) {
return cb(ended)
}

first = false
const MIN_POOL_SIZE = 147 // Varint.encode(Number.MAX_VALUE).length
const DEFAULT_POOL_SIZE = 10 * 1024

if (!ended && !Buffer.isBuffer(data)) {
ended = new Error('data must be a buffer')
return cb(ended)
}
function encode (options) {
options = options || {}
options.poolSize = Math.max(options.poolSize || DEFAULT_POOL_SIZE, MIN_POOL_SIZE)

const dataLength = ended ? 0 : data.length
return source => (async function * () {
let pool = Buffer.alloc(options.poolSize)
let poolOffset = 0

let encodedLength
if (opts.fixed) {
encodedLength = Buffer.alloc(4)
encodedLength.writeInt32BE(dataLength, 0) // writes exactly 4 bytes
} else {
varint.encode(dataLength, pool, used)
used += varint.encode.bytes
encodedLength = pool.slice(used - varint.encode.bytes, used)
for await (const chunk of source) {
Varint.encode(chunk.length, pool, poolOffset)
poolOffset += Varint.encode.bytes
const encodedLength = pool.slice(poolOffset - Varint.encode.bytes, poolOffset)

if (pool.length - used < 100) {
pool = createPool()
used = 0
}
if (pool.length - poolOffset < MIN_POOL_SIZE) {
pool = Buffer.alloc(options.poolSize)
poolOffset = 0
}

if (ended) {
return cb(null, encodedLength)
}

cb(null, Buffer.concat([
encodedLength,
data
], (opts.fixed ? 4 : varint.encode.bytes) + dataLength))
})
}
yield new BufferList().append(encodedLength).append(chunk)
// yield Buffer.concat([encodedLength, chunk])
}
})()
}

function createPool () {
return Buffer.alloc(poolSize)
}
module.exports = encode
module.exports.MIN_POOL_SIZE = MIN_POOL_SIZE
module.exports.DEFAULT_POOL_SIZE = DEFAULT_POOL_SIZE
52 changes: 52 additions & 0 deletions test/encode.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* eslint-env mocha */
'use strict'

const pipe = require('it-pipe')
const { expect } = require('chai')
const randomInt = require('random-int')
const randomBytes = require('random-bytes')
const { map, collect } = require('streaming-iterables')
const Varint = require('varint')

const lp = require('../')
const { MIN_POOL_SIZE } = lp.encode

const times = (n, fn) => Array.from(Array(n), fn)
const someBytes = n => randomBytes(randomInt(1, n || 32))

describe('encode', () => {
it('should encode length as prefix', async () => {
const input = await Promise.all(times(randomInt(1, 10), someBytes))
const output = await pipe(input, lp.encode(), map(c => c.slice()), collect)
output.forEach((o, i) => {
const length = Varint.decode(o)
expect(length).to.equal(input[i].length)
expect(o.slice(Varint.decode.bytes)).to.deep.equal(input[i])
})
})

it('should encode zero length as prefix', async () => {
const input = [Buffer.alloc(0)]
const output = await pipe(input, lp.encode(), map(c => c.slice()), collect)
output.forEach((o, i) => {
const length = Varint.decode(o)
expect(length).to.equal(input[i].length)
expect(o.slice(Varint.decode.bytes)).to.deep.equal(input[i])
})
})

it('should re-allocate buffer pool when empty', async () => {
const input = await Promise.all(times(MIN_POOL_SIZE * 2, someBytes))
const output = await pipe(
input,
lp.encode({ poolSize: MIN_POOL_SIZE * 1.5 }),
map(c => c.slice()),
collect
)
output.forEach((o, i) => {
const length = Varint.decode(o)
expect(length).to.equal(input[i].length)
expect(o.slice(Varint.decode.bytes)).to.deep.equal(input[i])
})
})
})

0 comments on commit d5f6d03

Please sign in to comment.