Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Lexicographic iteration #104

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
7 changes: 4 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ var bulk = require('bulk-write-stream')
var events = require('events')
var sodium = require('sodium-universal')
var alru = require('array-lru')
var pathBuilder = require('./lib/path')
var inherits = require('inherits')
var hash = require('./lib/hash')
var iterator = require('./lib/iterator')
var differ = require('./lib/differ')
var history = require('./lib/history')
Expand Down Expand Up @@ -56,6 +56,7 @@ function HyperDB (storage, key, opts) {
this.id = Buffer.alloc(32)
sodium.randombytes_buf(this.id)

this._path = pathBuilder(opts)
this._storage = createStorage(storage)
this._contentStorage = typeof opts.contentFeed === 'function'
? opts.contentFeed
Expand Down Expand Up @@ -106,7 +107,7 @@ HyperDB.prototype.batch = function (batch, cb) {
if (err) return done(err)

if (node) {
node.path = hash(node.key, true)
node.path = self._path(node.key, true)
heads = [node]
}

Expand Down Expand Up @@ -756,7 +757,7 @@ Writer.prototype._decode = function (seq, buf, cb) {
var val = messages.Entry.decode(buf)
val[util.inspect.custom] = inspect
val.seq = seq
val.path = hash(val.key, true)
val.path = this._db._path(val.key, true)
val.value = val.value && this._db._valueEncoding.decode(val.value)

if (this._feedsMessage && this._feedsLoaded === val.inflate) {
Expand Down
3 changes: 1 addition & 2 deletions lib/get.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
var hash = require('./hash')
var options = require('./options')

module.exports = get
Expand All @@ -17,7 +16,7 @@ function GetRequest (db, key, opts) {
this._callback = noop
this._options = opts || null
this._prefixed = !!(opts && opts.prefix)
this._path = (opts && opts.path) || hash(key, !this._prefixed)
this._path = (opts && opts.path) || db._path(key, !this._prefixed)
this._db = db
this._error = null
this._active = 0
Expand Down
44 changes: 0 additions & 44 deletions lib/hash.js

This file was deleted.

36 changes: 14 additions & 22 deletions lib/iterator.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
var nanoiterator = require('nanoiterator')
var inherits = require('inherits')
var hash = require('./hash')
var path = require('./path')
var options = require('./options')

var SORT_GT = [3, 2, 1, 0]
var SORT_GTE = [3, 2, 1, 0, 4]
var SORT_GT = [3, 2, 1, 0, path.SEPARATE]
var SORT_GTE = [3, 2, 1, 0, path.SEPARATE, path.TERMINATE]

module.exports = Iterator

Expand All @@ -16,15 +16,14 @@ function Iterator (db, prefix, opts) {

this._db = db
this._stack = [{
path: prefix ? hash(prefix, false) : [],
path: prefix ? this._db._path(prefix, false) : [],
node: null,
i: 0
}]

this._recursive = opts.recursive !== false
this._gt = !!opts.gt
this._start = this._stack[0].path.length
this._end = this._recursive ? Infinity : this._start + hash.LENGTH
this._map = options.map(opts, db)
this._reduce = options.reduce(opts, db)
this._collisions = []
Expand Down Expand Up @@ -72,9 +71,10 @@ Iterator.prototype._pushPrefix = function (path, i, val) {
// fast case
Iterator.prototype._singleNode = function (top, cb) {
var node = top.node
var end = Math.min(this._end, node.trie.length)

for (var i = top.i; i < end; i++) {
for (var i = top.i; i < node.trie.length; i++) {
if (!this._recursive && node.path[i - 1] === path.SEPARATE) break

var bucket = i < node.trie.length && node.trie[i]
if (!bucket) continue

Expand Down Expand Up @@ -114,14 +114,12 @@ Iterator.prototype._multiNode = function (path, nodes, cb) {

var ptr = path.length

if (ptr < this._end) {
var order = this._sortOrder(ptr)
var order = this._sortOrder(ptr)

for (var i = 0; i < order.length; i++) {
var sortValue = order[i]
if (!visitTrie(nodes, ptr, sortValue)) continue
this._pushPrefix(path, path.length, sortValue)
}
for (var i = 0; i < order.length; i++) {
var sortValue = order[i]
if (!visitTrie(nodes, ptr, sortValue)) continue
this._pushPrefix(path, ptr, sortValue)
}

nodes = this._filterResult(nodes, ptr)
Expand All @@ -132,11 +130,10 @@ Iterator.prototype._multiNode = function (path, nodes, cb) {
Iterator.prototype._filterResult = function (nodes, i) {
var result = null

nodes.sort(byKey)

for (var j = 0; j < nodes.length; j++) {
var node = nodes[j]
if (node.path.length !== i && i !== this._end) continue
if (this._recursive && node.path.length !== i) continue
if (!this._recursive && node.path[i - 1] !== path.SEPARATE) continue
if (!isPrefix(node.key, this._prefix)) continue

if (!result) result = []
Expand Down Expand Up @@ -190,11 +187,6 @@ Iterator.prototype._sortOrder = function (i) {
return gt && this._start === i ? SORT_GT : SORT_GTE
}

function byKey (a, b) {
var k = b.key.localeCompare(a.key)
return k || b.feed - a.feed
}

function allDeletes (nodes) {
for (var i = 0; i < nodes.length; i++) {
if (nodes[i].value) return false
Expand Down
77 changes: 77 additions & 0 deletions lib/path.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
var sodium = require('sodium-universal')

var KEY = Buffer.alloc(16)
var OUT = Buffer.alloc(8)

path.TERMINATE = 5
path.SEPARATE = 4

function path (opts) {
if (!opts) return lexint
return opts.lexint ? lexint : hash
}

module.exports = path

function lexint (keys, terminate) {
if (typeof keys === 'string') keys = split(keys)
if (!keys.length) return []

var lengths = keys.map(k => k.length)
var totalSize = lengths.reduce(sum, 0) * 4

var all = new Array(totalSize + keys.length + (terminate ? 1 : 0))

var offset = 0
for (var i = 0; i < keys.length; i++) {
var buf = Buffer.from(keys[i])
expandComponent(buf, all, offset)
offset += lengths[i] * 4
all[offset++] = path.SEPARATE
}

if (terminate) all[all.length - 1] = path.TERMINATE

return all
}

function hash (keys, terminate) {
if (typeof keys === 'string') keys = split(keys)
if (!keys.length) return []

var all = new Array(keys.length * 32 + keys.length + (terminate ? 1 : 0))

var offset = 0
for (var i = 0; i < keys.length; i++) {
sodium.crypto_shorthash(OUT, Buffer.from(keys[i]), KEY)
expandComponent(OUT, all, offset)
offset += 32
all[offset++] = path.SEPARATE
}

if (terminate) all[all.length - 1] = path.TERMINATE

return all
}

function expandComponent (next, out, offset) {
for (var i = 0; i < next.length; i++) {
var n = next[i]

for (var j = 3; j >= 0; j--) {
var r = n & 3
out[offset + 4 * i + j] = r
n -= r
n /= 4
}
}
}

function sum (s, n) { return s + n }

function split (key) {
var list = key.split('/')
if (list[0] === '') list.shift()
if (list[list.length - 1] === '') list.pop()
return list
}
10 changes: 5 additions & 5 deletions lib/put.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var hash = require('./hash')
var path = require('./path')

module.exports = put

Expand All @@ -16,7 +16,7 @@ function PutRequest (db, key, value, clock) {
this._error = null
this._callback = noop
this._db = db
this._path = hash(key, true)
this._path = db._path(key, true)
this._trie = []
}

Expand Down Expand Up @@ -107,13 +107,13 @@ PutRequest.prototype._copyTrie = function (worker, bucket, val) {
// check if we are the closest node, if so skip this
// except if we are terminating the val. if so we
// need to check for collions before making the decision
if (i === val && val !== 4) continue
if (i === val && val !== path.TERMINATE) continue

var ptrs = bucket[i] || []
for (var k = 0; k < ptrs.length; k++) {
var ptr = ptrs[k]
// if termination value, push if get(ptr).key !== key
if (val === 4) this._checkCollision(worker, i, ptr.feed, ptr.seq)
if (val === path.TERMINATE) this._checkCollision(worker, i, ptr.feed, ptr.seq)
else this._push(worker, i, ptr.feed, ptr.seq)
}
}
Expand All @@ -125,7 +125,7 @@ PutRequest.prototype._splitTrie = function (worker, bucket, val) {

// check if we need to split the trie at all
// i.e. is head still closest and is head not a conflict
if (headVal === val && (headVal < 4 || head.key === this.key)) return
if (headVal === val && (headVal < path.TERMINATE || head.key === this.key)) return

// push head to the trie
this._push(worker, headVal, head.feed, head.seq)
Expand Down
36 changes: 25 additions & 11 deletions test/helpers/create.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,32 @@ var reduce = (a, b) => a

exports.one = function (key, opts) {
if (!opts) opts = {}
opts.reduce = reduce
opts.valueEncoding = opts.valueEncoding || 'utf-8'
var storage = opts.latency ? name => latency(opts.latency, ram()) : ram
return hyperdb(storage, key, opts)
var options = Object.assign({
reduce,
valueEncoding: 'utf-8'
}, opts)
var storage = options.latency ? name => latency(options.latency, ram()) : ram
return hyperdb(storage, key, options)
}

exports.two = function (cb) {
createMany(2, function (err, dbs, replicateByIndex) {
exports.two = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
}
createMany(2, opts, function (err, dbs, replicateByIndex) {
if (err) return cb(err)
dbs.push(replicateByIndex.bind(null, [0, 1]))
return cb.apply(null, dbs)
})
}

exports.three = function (cb) {
createMany(3, function (err, dbs, replicateByIndex) {
exports.three = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
}
createMany(3, opts, function (err, dbs, replicateByIndex) {
if (err) return cb(err)
dbs.push(replicateByIndex.bind(null, [0, 1, 2]))
return cb.apply(null, dbs)
Expand All @@ -30,11 +40,15 @@ exports.three = function (cb) {

exports.many = createMany

function createMany (count, cb) {
function createMany (count, opts, cb) {
if (typeof opts === 'function') return createMany(count, {}, opts)

var dbs = []
var remaining = count - 1

var first = hyperdb(ram, { valueEncoding: 'utf-8' })
var options = Object.assign({}, { valueEncoding: 'utf-8' }, opts)

var first = hyperdb(ram, options)
first.ready(function (err) {
if (err) return cb(err)
dbs.push(first)
Expand All @@ -49,7 +63,7 @@ function createMany (count, cb) {
return cb(null, dbs, replicateByIndex)
})
}
var db = hyperdb(ram, first.key, { valueEncoding: 'utf-8' })
var db = hyperdb(ram, first.key, options)
db.ready(function (err) {
if (err) return cb(err)
first.authorize(db.local.key, function (err) {
Expand Down
Loading