Skip to content

Commit

Permalink
Added sync ops and expose snapshots, share snapshots between iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Böhmer committed Jun 3, 2018
1 parent 82b9675 commit a78ddc1
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ leakydb
npm-debug.log
prebuilds/
yarn.lock
package-lock.json
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
, "src/database_async.cc"
, "src/iterator.cc"
, "src/iterator_async.cc"
, "src/database_snapshot.cc"
, "src/leveldown.cc"
, "src/leveldown_async.cc"
]
Expand Down
17 changes: 17 additions & 0 deletions leveldown.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const AbstractLevelDOWN = require('abstract-leveldown').AbstractLevelDOWN
const binding = require('bindings')('leveldown').leveldown
const ChainedBatch = require('./chained-batch')
const Iterator = require('./iterator')
const Snapshot = require('./snapshot')

function LevelDOWN (location) {
if (!(this instanceof LevelDOWN)) {
Expand Down Expand Up @@ -77,6 +78,22 @@ LevelDOWN.prototype._iterator = function (options) {
return new Iterator(this, options)
}

LevelDOWN.prototype.getSync = function (key) {
return this.binding.getSync(key);
}

LevelDOWN.prototype.putSync = function (key, value) {
return this.binding.putSync(key, value);
}

LevelDOWN.prototype.deleteSync = function (key) {
return this.binding.deleteSync(key);
}

LevelDOWN.prototype.snapshot = function (options) {
return new Snapshot(this, options);
}

LevelDOWN.destroy = function (location, callback) {
if (arguments.length < 2) {
throw new Error('destroy() requires `location` and `callback` arguments')
Expand Down
28 changes: 28 additions & 0 deletions snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const extend = require('xtend'),
IteratorStream = require('level-iterator-stream');

const Iterator = require('./iterator');

function Snapshot (db, options) {
this.db = db;
this.binding = db.binding.databaseSnapshot(options)
}

Snapshot.prototype.getSync = function (key) {
return this.binding.getSync(key);
}

Snapshot.prototype.iterator = function (options) {
options = this.db._setupIteratorOptions(options);
console.log(JSON.stringify(options));
options = extend({ databaseSnapshot: this.binding }, options)
return new Iterator(this.db, options);
}

Snapshot.prototype.createReadStream = function(options) {
options = extend({ keys: true, values: true }, options)
if (typeof options.limit !== 'number') { options.limit = -1 }
return new IteratorStream(this.iterator(options), options)
}

module.exports = Snapshot
113 changes: 113 additions & 0 deletions src/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Database::Database (const v8::Local<v8::Value>& from)
: location(new Nan::Utf8String(from))
, db(NULL)
, currentIteratorId(0)
, currentDatabaseSnapshotId(0)
, pendingCloseWorker(NULL)
, blockCache(NULL)
, filterPolicy(NULL) {};
Expand Down Expand Up @@ -151,6 +152,9 @@ void Database::Init () {
Nan::SetPrototypeMethod(tpl, "compactRange", Database::CompactRange);
Nan::SetPrototypeMethod(tpl, "getProperty", Database::GetProperty);
Nan::SetPrototypeMethod(tpl, "iterator", Database::Iterator);
Nan::SetPrototypeMethod(tpl, "getSync", Database::GetSync);
Nan::SetPrototypeMethod(tpl, "putSync", Database::PutSync);
Nan::SetPrototypeMethod(tpl, "databaseSnapshot", Database::DatabaseSnapshot);
}

NAN_METHOD(Database::New) {
Expand Down Expand Up @@ -515,5 +519,114 @@ NAN_METHOD(Database::Iterator) {
info.GetReturnValue().Set(iteratorHandle);
}

NAN_METHOD(Database::GetSync) {

if (info.Length() != 1) \
return Nan::ThrowError("getSync() requires a single argument: key");

leveldown::Database* database =
Nan::ObjectWrap::Unwrap<leveldown::Database>(info.This());

v8::Local<v8::Object> keyHandle = info[0].As<v8::Object>();
LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key);

leveldb::ReadOptions options;
options.fill_cache = true;

std::string value;
leveldb::Status status = database->GetFromDatabase(&options, key, value);

DisposeStringOrBufferFromSlice(keyHandle, key);

if (!status.ok())
return Nan::ThrowError(status.ToString().c_str());

v8::Local<v8::Value> returnValue
= Nan::New<v8::String>((char*)value.data(), value.size()).ToLocalChecked();

info.GetReturnValue().Set(returnValue);
}

NAN_METHOD(Database::PutSync) {

if (info.Length() != 2) \
return Nan::ThrowError("putSync() requires two arguments: key, value");

leveldown::Database* database =
Nan::ObjectWrap::Unwrap<leveldown::Database>(info.This());

v8::Local<v8::Object> keyHandle = info[0].As<v8::Object>();
LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key);

v8::Local<v8::Object> valueHandle = info[1].As<v8::Object>();
LD_STRING_OR_BUFFER_TO_SLICE(value, valueHandle, value);

leveldb::WriteOptions options;

leveldb::Status status = database->PutToDatabase(&options, key, value);

DisposeStringOrBufferFromSlice(keyHandle, key);
DisposeStringOrBufferFromSlice(valueHandle, value);

if (!status.ok())
return Nan::ThrowError(status.ToString().c_str());
}

NAN_METHOD(Database::DeleteSync) {

if (info.Length() != 1) \
return Nan::ThrowError("deleteSync() requires one argument: key");

leveldown::Database* database =
Nan::ObjectWrap::Unwrap<leveldown::Database>(info.This());

v8::Local<v8::Object> keyHandle = info[0].As<v8::Object>();
LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key);

leveldb::WriteOptions options;

leveldb::Status status = database->DeleteFromDatabase(&options, key);

DisposeStringOrBufferFromSlice(keyHandle, key);

if (!status.ok())
return Nan::ThrowError(status.ToString().c_str());
}

NAN_METHOD(Database::DatabaseSnapshot) {

Database* database = Nan::ObjectWrap::Unwrap<Database>(info.This());

v8::Local<v8::Object> optionsObj;
if (info.Length() > 0 && info[0]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(info[0]);
}

// each databaseSnapshot gets a unique id for this Database, so we can
// easily store & lookup on our `iterators` map
uint32_t id = database->currentDatabaseSnapshotId++;
Nan::TryCatch try_catch;
v8::Local<v8::Object> databaseSnapshotHandle = DatabaseSnapshot::NewInstance(
info.This()
, Nan::New<v8::Number>(id)
, optionsObj
);
if (try_catch.HasCaught()) {
// NB: node::FatalException can segfault here if there is no room on stack.
return Nan::ThrowError("Fatal Error in Database::DatabaseSnapshot!");
}

leveldown::DatabaseSnapshot *databaseSnapshot =
Nan::ObjectWrap::Unwrap<leveldown::DatabaseSnapshot>(databaseSnapshotHandle);

database->databaseSnapshots[id] = databaseSnapshot;

info.GetReturnValue().Set(databaseSnapshotHandle);
}


void Database::ReleaseDatabaseSnapshot (uint32_t id) {
databaseSnapshots.erase(id);
}

} // namespace leveldown
8 changes: 8 additions & 0 deletions src/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "leveldown.h"
#include "iterator.h"
#include "database_snapshot.h"

namespace leveldown {

Expand Down Expand Up @@ -75,6 +76,7 @@ class Database : public Nan::ObjectWrap {
void ReleaseSnapshot (const leveldb::Snapshot* snapshot);
void CloseDatabase ();
void ReleaseIterator (uint32_t id);
void ReleaseDatabaseSnapshot (uint32_t id);

Database (const v8::Local<v8::Value>& from);
~Database ();
Expand All @@ -83,11 +85,13 @@ class Database : public Nan::ObjectWrap {
Nan::Utf8String* location;
leveldb::DB* db;
uint32_t currentIteratorId;
uint32_t currentDatabaseSnapshotId;
void(*pendingCloseWorker);
leveldb::Cache* blockCache;
const leveldb::FilterPolicy* filterPolicy;

std::map< uint32_t, leveldown::Iterator * > iterators;
std::map< uint32_t, leveldown::DatabaseSnapshot * > databaseSnapshots;

static void WriteDoing(uv_work_t *req);
static void WriteAfter(uv_work_t *req);
Expand All @@ -104,6 +108,10 @@ class Database : public Nan::ObjectWrap {
static NAN_METHOD(ApproximateSize);
static NAN_METHOD(CompactRange);
static NAN_METHOD(GetProperty);
static NAN_METHOD(GetSync);
static NAN_METHOD(PutSync);
static NAN_METHOD(DeleteSync);
static NAN_METHOD(DatabaseSnapshot);
};

} // namespace leveldown
Expand Down
146 changes: 146 additions & 0 deletions src/database_snapshot.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/* Copyright (c) 2012-2018 LevelDOWN contributors
* See list at <https://github.com/level/leveldown#contributing>
* MIT License <https://github.com/level/leveldown/blob/master/LICENSE.md>
*/

#include <node.h>
#include <node_buffer.h>

#include "database.h"
#include "database_snapshot.h"
#include "common.h"

namespace leveldown {

static Nan::Persistent<v8::FunctionTemplate> database_snapshot_constructor;

DatabaseSnapshot::DatabaseSnapshot (
Database* database
, uint32_t id
, bool fillCache
, bool keyAsBuffer
, bool valueAsBuffer
) : database(database)
, id(id)
, keyAsBuffer(keyAsBuffer)
, valueAsBuffer(valueAsBuffer)
{
Nan::HandleScope scope;

options = new leveldb::ReadOptions();
options->fill_cache = fillCache;
// get a snapshot of the current state
options->snapshot = database->NewSnapshot();
};

DatabaseSnapshot::~DatabaseSnapshot () {
delete options;
};

void DatabaseSnapshot::Release () {
database->ReleaseDatabaseSnapshot(id);
};

leveldb::Snapshot* DatabaseSnapshot::GetSnapshot() {
return (leveldb::Snapshot*)options->snapshot;
};

NAN_METHOD(DatabaseSnapshot::GetSync) {

if (info.Length() != 1) \
return Nan::ThrowError("getSync() requires a single argument: key");

leveldown::DatabaseSnapshot* databaseSnapshot =
Nan::ObjectWrap::Unwrap<leveldown::DatabaseSnapshot>(info.This());

v8::Local<v8::Object> keyHandle = info[0].As<v8::Object>();
LD_STRING_OR_BUFFER_TO_SLICE(key, keyHandle, key);

leveldb::ReadOptions options;
options.fill_cache = true;

std::string value;
leveldb::Status status = databaseSnapshot->database->GetFromDatabase(databaseSnapshot->options, key, value);

DisposeStringOrBufferFromSlice(keyHandle, key);

if (!status.ok())
return Nan::ThrowError(status.ToString().c_str());

v8::Local<v8::Value> returnValue
= Nan::New<v8::String>((char*)value.data(), value.size()).ToLocalChecked();

info.GetReturnValue().Set(returnValue);
}

NAN_METHOD(DatabaseSnapshot::Iterator) {

}

void DatabaseSnapshot::Init () {
v8::Local<v8::FunctionTemplate> tpl =
Nan::New<v8::FunctionTemplate>(DatabaseSnapshot::New);
database_snapshot_constructor.Reset(tpl);
tpl->SetClassName(Nan::New("DatabaseSnapshot").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(1);

Nan::SetPrototypeMethod(tpl, "getSync", DatabaseSnapshot::GetSync);
Nan::SetPrototypeMethod(tpl, "iterator", DatabaseSnapshot::Iterator);
}

v8::Local<v8::Object> DatabaseSnapshot::NewInstance (
v8::Local<v8::Object> database
, v8::Local<v8::Number> id
, v8::Local<v8::Object> optionsObj
) {
Nan::EscapableHandleScope scope;

Nan::MaybeLocal<v8::Object> maybeInstance;
v8::Local<v8::Object> instance;
v8::Local<v8::FunctionTemplate> constructorHandle =
Nan::New<v8::FunctionTemplate>(database_snapshot_constructor);

if (optionsObj.IsEmpty()) {
v8::Local<v8::Value> argv[2] = { database, id };
maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 2, argv);
} else {
v8::Local<v8::Value> argv[3] = { database, id, optionsObj };
maybeInstance = Nan::NewInstance(constructorHandle->GetFunction(), 3, argv);
}

if (maybeInstance.IsEmpty())
Nan::ThrowError("Could not create new Snapshot instance");
else
instance = maybeInstance.ToLocalChecked();

return scope.Escape(instance);
}

NAN_METHOD(DatabaseSnapshot::New) {
Database* database = Nan::ObjectWrap::Unwrap<Database>(info[0]->ToObject());

v8::Local<v8::Value> id = info[1];

v8::Local<v8::Object> optionsObj;
if (info.Length() > 1 && info[2]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(info[2]);
}

bool keyAsBuffer = BooleanOptionValue(optionsObj, "keyAsBuffer", true);
bool valueAsBuffer = BooleanOptionValue(optionsObj, "valueAsBuffer", true);
bool fillCache = BooleanOptionValue(optionsObj, "fillCache");

DatabaseSnapshot* databaseSnapshot = new DatabaseSnapshot(
database
, (uint32_t)id->Int32Value()
, fillCache
, keyAsBuffer
, valueAsBuffer
);

databaseSnapshot->Wrap(info.This());

info.GetReturnValue().Set(info.This());
}

} // namespace leveldown
Loading

0 comments on commit a78ddc1

Please sign in to comment.