Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

feat: use a rabin chunker in wasm #31

Merged
merged 2 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
"leadMaintainer": "Alex Potsides <alex.potsides@protocol.ai>",
"main": "src/index.js",
"browser": {
"fs": false,
"rabin": false
"fs": false
},
"scripts": {
"test": "aegir test",
Expand Down Expand Up @@ -65,10 +64,8 @@
"long": "^4.0.0",
"multicodec": "~0.5.1",
"multihashing-async": "~0.7.0",
"superstruct": "~0.6.1"
},
"optionalDependencies": {
"rabin": "^1.6.0"
"superstruct": "~0.6.1",
"rabin-wasm": "~0.0.4"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
191 changes: 4 additions & 187 deletions src/chunker/rabin.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
'use strict'

const errCode = require('err-code')
const Long = require('long')
const BufferList = require('bl')
let rabin
const { create } = require('rabin-wasm')

module.exports = async function * rabinChunker (source, options) {
if (!rabin) {
try {
rabin = nativeRabin()
} catch (_) {
// fallback to js implementation
rabin = jsRabin()
}
}
const rabin = jsRabin()

let min, max, avg

Expand All @@ -40,191 +31,17 @@ module.exports = async function * rabinChunker (source, options) {
}
}

const nativeRabin = () => {
const createRabin = require('rabin')

if (typeof rabin !== 'function') {
throw errCode(new Error(`rabin was not a function`), 'ERR_UNSUPPORTED')
}

return async function * (source, options) {
const rabin = createRabin(options)

// TODO: rewrite rabin using node streams v3
for await (const chunk of source) {
rabin.buffers.append(chunk)
rabin.pending.push(chunk)

const sizes = []

rabin.rabin.fingerprint(rabin.pending, sizes)
rabin.pending = []

for (let i = 0; i < sizes.length; i++) {
const size = sizes[i]
const buf = rabin.buffers.slice(0, size)
rabin.buffers.consume(size)

yield buf
}
}

if (rabin.buffers.length) {
yield rabin.buffers.slice(0)
}
}
}

const jsRabin = () => {
// see https://github.com/datproject/rabin/blob/c0378395dc0a125ab21ac176ec504f9995b34e62/src/rabin.cc
class Rabin {
constructor (options) {
this.window = new Array(options.window || 64).fill(Long.fromInt(0))
this.wpos = 0
this.count = 0
this.digest = Long.fromInt(0)
this.chunkLength = 0
this.polynomial = options.polynomial
this.polynomialDegree = 53
this.polynomialShift = this.polynomialDegree - 8
this.averageBits = options.bits || 12
this.minSize = options.min || 8 * 1024
this.maxSize = options.max || 32 * 1024
this.mask = Long.fromInt(1).shiftLeft(this.averageBits).subtract(1)
this.modTable = []
this.outTable = []

this.calculateTables()
}

calculateTables () {
for (let i = 0; i < 256; i++) {
let hash = Long.fromInt(0, true)

hash = this.appendByte(hash, i)

for (let j = 0; j < this.window.length - 1; j++) {
hash = this.appendByte(hash, 0)
}

this.outTable[i] = hash
}

const k = this.deg(this.polynomial)

for (let i = 0; i < 256; i++) {
const b = Long.fromInt(i, true)

this.modTable[i] = b.shiftLeft(k)
.modulo(this.polynomial)
.or(b.shiftLeft(k))
}
}

deg (p) {
let mask = Long.fromString('0x8000000000000000', true, 16)

for (let i = 0; i < 64; i++) {
if (mask.and(p).greaterThan(0)) {
return Long.fromInt(63 - i)
}

mask = mask.shiftRight(1)
}

return Long.fromInt(-1)
}

appendByte (hash, b) {
hash = hash.shiftLeft(8)
hash = hash.or(b)

return hash.modulo(this.polynomial)
}

getFingerprints (bufs) {
const lengths = []

for (let i = 0; i < bufs.length; i++) {
let buf = bufs[i]

while (true) {
const remaining = this.nextChunk(buf)

if (remaining < 0) {
break
}

buf = buf.slice(remaining)

lengths.push(this.chunkLength)
}
}

return lengths
}

nextChunk (buf) {
for (let i = 0; i < buf.length; i++) {
const val = Long.fromInt(buf[i])

this.slide(val)

this.count++

if ((this.count >= this.minSize && this.digest.and(this.mask).equals(0)) || this.count >= this.maxSize) {
this.chunkLength = this.count

this.reset()

return i + 1
}
}

return -1
}

slide (value) {
const out = this.window[this.wpos].toInt() & 255
this.window[this.wpos] = value
this.digest = this.digest.xor(this.outTable[out])
this.wpos = (this.wpos + 1) % this.window.length

this.append(value)
}

reset () {
this.window = this.window.map(() => Long.fromInt(0))
this.wpos = 0
this.count = 0
this.digest = Long.fromInt(0)

this.slide(Long.fromInt(1))
}

append (value) {
const index = this.digest.shiftRight(this.polynomialShift).toInt() & 255
this.digest = this.digest.shiftLeft(8)
this.digest = this.digest.or(value)

const entry = this.modTable[index]

if (entry) {
this.digest = this.digest.xor(entry)
}
}
}

return async function * (source, options) {
const r = new Rabin(options)
const r = await create(options.bits, options.min, options.max, options.window)
const buffers = new BufferList()
let pending = []

for await (const chunk of source) {
buffers.append(chunk)
pending.push(chunk)

const sizes = r.getFingerprints(pending)
const sizes = r.fingerprint(Buffer.concat(pending))
pending = []

for (let i = 0; i < sizes.length; i++) {
Expand Down