Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
feat: use a rabin chunker in wasm (#31)
Browse files Browse the repository at this point in the history
* feat: use a rabin chunker in wasm
* chore: fix package.json
  • Loading branch information
hugomrdias authored and achingbrain committed Jun 4, 2019
1 parent f024451 commit d4021db
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 193 deletions.
9 changes: 3 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
"leadMaintainer": "Alex Potsides <alex.potsides@protocol.ai>",
"main": "src/index.js",
"browser": {
"fs": false,
"rabin": false
"fs": false
},
"scripts": {
"test": "aegir test",
Expand Down Expand Up @@ -65,10 +64,8 @@
"long": "^4.0.0",
"multicodec": "~0.5.1",
"multihashing-async": "~0.7.0",
"superstruct": "~0.6.1"
},
"optionalDependencies": {
"rabin": "^1.6.0"
"superstruct": "~0.6.1",
"rabin-wasm": "~0.0.4"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
191 changes: 4 additions & 187 deletions src/chunker/rabin.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
'use strict'

const errCode = require('err-code')
const Long = require('long')
const BufferList = require('bl')
let rabin
const { create } = require('rabin-wasm')

module.exports = async function * rabinChunker (source, options) {
if (!rabin) {
try {
rabin = nativeRabin()
} catch (_) {
// fallback to js implementation
rabin = jsRabin()
}
}
const rabin = jsRabin()

let min, max, avg

Expand All @@ -40,191 +31,17 @@ module.exports = async function * rabinChunker (source, options) {
}
}

const nativeRabin = () => {
const createRabin = require('rabin')

if (typeof rabin !== 'function') {
throw errCode(new Error(`rabin was not a function`), 'ERR_UNSUPPORTED')
}

return async function * (source, options) {
const rabin = createRabin(options)

// TODO: rewrite rabin using node streams v3
for await (const chunk of source) {
rabin.buffers.append(chunk)
rabin.pending.push(chunk)

const sizes = []

rabin.rabin.fingerprint(rabin.pending, sizes)
rabin.pending = []

for (let i = 0; i < sizes.length; i++) {
const size = sizes[i]
const buf = rabin.buffers.slice(0, size)
rabin.buffers.consume(size)

yield buf
}
}

if (rabin.buffers.length) {
yield rabin.buffers.slice(0)
}
}
}

const jsRabin = () => {
// see https://github.com/datproject/rabin/blob/c0378395dc0a125ab21ac176ec504f9995b34e62/src/rabin.cc
class Rabin {
constructor (options) {
this.window = new Array(options.window || 64).fill(Long.fromInt(0))
this.wpos = 0
this.count = 0
this.digest = Long.fromInt(0)
this.chunkLength = 0
this.polynomial = options.polynomial
this.polynomialDegree = 53
this.polynomialShift = this.polynomialDegree - 8
this.averageBits = options.bits || 12
this.minSize = options.min || 8 * 1024
this.maxSize = options.max || 32 * 1024
this.mask = Long.fromInt(1).shiftLeft(this.averageBits).subtract(1)
this.modTable = []
this.outTable = []

this.calculateTables()
}

calculateTables () {
for (let i = 0; i < 256; i++) {
let hash = Long.fromInt(0, true)

hash = this.appendByte(hash, i)

for (let j = 0; j < this.window.length - 1; j++) {
hash = this.appendByte(hash, 0)
}

this.outTable[i] = hash
}

const k = this.deg(this.polynomial)

for (let i = 0; i < 256; i++) {
const b = Long.fromInt(i, true)

this.modTable[i] = b.shiftLeft(k)
.modulo(this.polynomial)
.or(b.shiftLeft(k))
}
}

deg (p) {
let mask = Long.fromString('0x8000000000000000', true, 16)

for (let i = 0; i < 64; i++) {
if (mask.and(p).greaterThan(0)) {
return Long.fromInt(63 - i)
}

mask = mask.shiftRight(1)
}

return Long.fromInt(-1)
}

appendByte (hash, b) {
hash = hash.shiftLeft(8)
hash = hash.or(b)

return hash.modulo(this.polynomial)
}

getFingerprints (bufs) {
const lengths = []

for (let i = 0; i < bufs.length; i++) {
let buf = bufs[i]

while (true) {
const remaining = this.nextChunk(buf)

if (remaining < 0) {
break
}

buf = buf.slice(remaining)

lengths.push(this.chunkLength)
}
}

return lengths
}

nextChunk (buf) {
for (let i = 0; i < buf.length; i++) {
const val = Long.fromInt(buf[i])

this.slide(val)

this.count++

if ((this.count >= this.minSize && this.digest.and(this.mask).equals(0)) || this.count >= this.maxSize) {
this.chunkLength = this.count

this.reset()

return i + 1
}
}

return -1
}

slide (value) {
const out = this.window[this.wpos].toInt() & 255
this.window[this.wpos] = value
this.digest = this.digest.xor(this.outTable[out])
this.wpos = (this.wpos + 1) % this.window.length

this.append(value)
}

reset () {
this.window = this.window.map(() => Long.fromInt(0))
this.wpos = 0
this.count = 0
this.digest = Long.fromInt(0)

this.slide(Long.fromInt(1))
}

append (value) {
const index = this.digest.shiftRight(this.polynomialShift).toInt() & 255
this.digest = this.digest.shiftLeft(8)
this.digest = this.digest.or(value)

const entry = this.modTable[index]

if (entry) {
this.digest = this.digest.xor(entry)
}
}
}

return async function * (source, options) {
const r = new Rabin(options)
const r = await create(options.bits, options.min, options.max, options.window)
const buffers = new BufferList()
let pending = []

for await (const chunk of source) {
buffers.append(chunk)
pending.push(chunk)

const sizes = r.getFingerprints(pending)
const sizes = r.fingerprint(Buffer.concat(pending))
pending = []

for (let i = 0; i < sizes.length; i++) {
Expand Down

0 comments on commit d4021db

Please sign in to comment.