Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add opt-in multithreading #85

Merged
merged 20 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The optional `options` object may contain:
- `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail.
- `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail.
- `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database.
- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [worker threads](https://nodejs.org/api/worker_threads.html)

For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application.

Expand Down
103 changes: 93 additions & 10 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@

#include <map>
#include <vector>
#include <mutex>

/**
* Forward declarations.
*/
struct Database;
struct Iterator;
static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb);
static leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance);
static leveldb::Status threadsafe_close(Database &db_instance);

/**
* Macros.
* Global declarations for multi-threaded access. These are not context-aware
* by definition and is specifically to allow for cross thread access to the
* single database handle.
*/
struct LevelDbHandle
{
leveldb::DB *db;
size_t open_handle_count;
};
static std::mutex handles_mutex;
// only access this when protected by the handles_mutex!
static std::map<std::string, LevelDbHandle> db_handles;

/**
* Macros.
*/
#define NAPI_DB_CONTEXT() \
Database* database = NULL; \
NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&database));
Expand Down Expand Up @@ -495,19 +513,21 @@ struct Database {

~Database () {
if (db_ != NULL) {
delete db_;
db_ = NULL;
threadsafe_close(*this);
}
}

leveldb::Status Open (const leveldb::Options& options,
const char* location) {
return leveldb::DB::Open(options, location, &db_);
const std::string &location,
bool multithreading) {
location_ = location;
return threadsafe_open(options, multithreading, *this);
}

void CloseDatabase () {
delete db_;
db_ = NULL;
if (db_ != NULL) {
threadsafe_close(*this);
}
if (blockCache_) {
delete blockCache_;
blockCache_ = NULL;
Expand Down Expand Up @@ -600,8 +620,66 @@ struct Database {

private:
uint32_t priorityWork_;
std::string location_;

// for separation of concerns the threadsafe functionality was kept at the global
// level and made a friend so it is explict where the threadsafe boundary exists
friend leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance);
friend leveldb::Status threadsafe_close(Database &db_instance);
};


leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance) {
// Bypass lock and handles if multithreading is disabled
if (!multithreading) {
return leveldb::DB::Open(options, db_instance.location_, &db_instance.db_);
}

std::unique_lock<std::mutex> lock(handles_mutex);

auto it = db_handles.find(db_instance.location_);
if (it == db_handles.end()) {
// Database not opened yet for this location, unless it was with
// multithreading disabled, in which case we're expected to fail here.
LevelDbHandle handle = {nullptr, 0};
leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db);

if (status.ok()) {
handle.open_handle_count++;
db_instance.db_ = handle.db;
db_handles[db_instance.location_] = handle;
}

return status;
}

++(it->second.open_handle_count);
db_instance.db_ = it->second.db;

return leveldb::Status::OK();
}

leveldb::Status threadsafe_close(Database &db_instance) {
std::unique_lock<std::mutex> lock(handles_mutex);

auto it = db_handles.find(db_instance.location_);
if (it == db_handles.end()) {
// Was not opened with multithreading enabled
delete db_instance.db_;
} else if (--(it->second.open_handle_count) == 0) {
delete it->second.db;
db_handles.erase(it);
}

// ensure db_ pointer is nullified in Database instance
db_instance.db_ = NULL;
return leveldb::Status::OK();
}

/**
* Base worker class for doing async work that defers closing the database.
*/
Expand Down Expand Up @@ -974,13 +1052,15 @@ struct OpenWorker final : public BaseWorker {
const bool createIfMissing,
const bool errorIfExists,
const bool compression,
const bool multithreading,
const uint32_t writeBufferSize,
const uint32_t blockSize,
const uint32_t maxOpenFiles,
const uint32_t blockRestartInterval,
const uint32_t maxFileSize)
: BaseWorker(env, database, callback, "classic_level.db.open"),
location_(location) {
location_(location),
multithreading_(multithreading) {
options_.block_cache = database->blockCache_;
options_.filter_policy = database->filterPolicy_;
options_.create_if_missing = createIfMissing;
Expand All @@ -998,11 +1078,12 @@ struct OpenWorker final : public BaseWorker {
~OpenWorker () {}

void DoExecute () override {
SetStatus(database_->Open(options_, location_.c_str()));
SetStatus(database_->Open(options_, location_, multithreading_));
}

leveldb::Options options_;
std::string location_;
bool multithreading_;
};

/**
Expand All @@ -1017,6 +1098,7 @@ NAPI_METHOD(db_open) {
const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true);
const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false);
const bool compression = BooleanProperty(env, options, "compression", true);
const bool multithreading = BooleanProperty(env, options, "multithreading", false);

const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20);
const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20);
Expand All @@ -1031,7 +1113,8 @@ NAPI_METHOD(db_open) {
napi_value callback = argv[3];
OpenWorker* worker = new OpenWorker(env, database, callback, location,
createIfMissing, errorIfExists,
compression, writeBufferSize, blockSize,
compression, multithreading,
writeBufferSize, blockSize,
maxOpenFiles, blockRestartInterval,
maxFileSize);
worker->Queue(env);
Expand Down
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ export interface OpenOptions extends AbstractOpenOptions {
* @defaultValue `2 * 1024 * 1024`
*/
maxFileSize?: number | undefined

/**
* Allows multi-threaded access to a single DB instance for sharing a DB
* across multiple worker threads within the same process.
*
* @defaultValue `false`
*/
multithreading?: boolean | undefined
}

/**
Expand Down
185 changes: 185 additions & 0 deletions test/multithreading-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'use strict'

const test = require('tape')
const tempy = require('tempy')
const path = require('path')
const { Worker } = require('worker_threads')
const { ClassicLevel } = require('..')
const {
MIN_KEY,
MID_KEY,
MAX_KEY,
CLOSED_DB_MESSAGE,
WORKER_CREATING_KEYS_MESSAGE,
WORKER_READY_TO_READ_MESSAGE,
WORKER_ERROR_MESSAGE,
START_READING_MESSAGE,
createRandomKeys,
getRandomKeys
} = require('./worker-utils')

/**
* Makes sure that the multithreading flag is working as expected
*/
test('check multithreading flag works as expected', async function (t) {
t.plan(9)
const location = tempy.directory()
const db1 = new ClassicLevel(location)
const db2 = new ClassicLevel(location)

// check that must set multithreading flag on all instances
await db1.open()
t.is(db1.location, location)
try {
await db2.open({ multithreading: true })
} catch (err) {
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open')
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
}
await db1.close()

await db1.open({ multithreading: true })
t.is(db1.location, location)
await db2.open({ multithreading: true })
t.is(db2.location, location)
// test that passing to the constructor works
const db3 = new ClassicLevel(location, { multithreading: true })
await db3.open()
t.is(db3.location, location)
const db4 = new ClassicLevel(location)
try {
await db4.open({ location, multithreading: false })
} catch (err) {
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'fourth instance failed to open')
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
}
await db1.close()
await db2.close()
await db3.close()

const db5 = new ClassicLevel(location)
await db5.open({ location, multithreading: false })
t.is(db5.location, location)
await db5.close()
})

/**
* Tests for interleaved opening and closing of the database to check
* that the mutex for guarding the handles is working as expected. Creates
* many workers that only open and then close the db after a random delay. Goal
* is to interleave the open and close processes to ensure that the mutex is
* guarding the handles correctly. After all workers have completed the main
* thread closes the db and then opens it again as a non-multi-threaded instance
* to make sure the handle was deleted correctly.
*/
test('open/close mutex works as expected', async function (t) {
t.plan(3)
const location = tempy.directory()
const db1 = new ClassicLevel(location)
await db1.open({ multithreading: true })
t.is(db1.location, location)

const activeWorkers = []

for (let i = 0; i < 100; i++) {
const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { location, workerStartup: true }
})

activeWorkers.push(
new Promise((resolve, reject) => {
worker.once('message', ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
return reject(error)
}
if (message === CLOSED_DB_MESSAGE) {
return resolve()
}
return reject(new Error('unexpected error\n>>> ' + error))
})
})
)
}

const results = await Promise.allSettled(activeWorkers)
const rejected = results.filter((res) => res.status === 'rejected')
t.is(rejected.length, 0)
await db1.close()

// reopen the db non-multithreaded to check that the handle record was fully
// deleted from the handle map
await db1.open({ multithreading: false })
t.is(db1.location, location)
await db1.close()
})

/**
* Tests for reading and writing to a single db from multiple threads.
*
* Starts by setting up worker and then worker reports its ready and immediately
* starts writing to the database. Main thread gets message and also writes to
* the same db but to a different key space. Goal is to concurrently write
* consecutively numbered records. Once records are all written the worker
* reports to the main thread and the main thread waits until both threads are
* complete with the writing process. When both are ready they concurrently read
* random records from the full key space for a set interval.
*/
test('allow multi-threading by same process', async function (t) {
try {
const location = tempy.directory()
const db = new ClassicLevel(location, { multithreading: true })
await db.open()

const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { location, readWrite: true }
})

function cleanup (err) {
worker.removeAllListeners('message')
worker.removeAllListeners('error')
worker.terminate()
if (err) {
throw err
}
}

worker.on('error', cleanup)
worker.on('message', ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
cleanup(new Error(error))
}
})

// Concurrently write keys to the db on both thread and wait
// until ready before attempting to concurrently read keys
const workerReady = new Promise((resolve) => {
let mainThreadReady = false
worker.on('message', ({ message }) => {
if (message === WORKER_CREATING_KEYS_MESSAGE) {
createRandomKeys(db, MID_KEY, MAX_KEY).then(() => {
mainThreadReady = true
})
} else if (message === WORKER_READY_TO_READ_MESSAGE) {
const interval = setInterval(() => {
if (mainThreadReady) {
clearInterval(interval)
resolve()
}
}, 100)
}
})
})

await workerReady

// once db is seeded start reading keys from both threads
worker.postMessage({ message: START_READING_MESSAGE })
await getRandomKeys(db, MIN_KEY, MAX_KEY)
await db.close()

t.end()
} catch (error) {
t.fail(error.message)
t.end()
}
})
Loading
Loading