From d5eebe4f3abb2e0902fcd5f24454f953d3f92a9c Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Thu, 26 Oct 2023 17:16:01 +0200 Subject: [PATCH 01/20] feat: threadsafe open and close to allow workers access to already opened db --- binding.cc | 79 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/binding.cc b/binding.cc index cf66715..f6235c9 100644 --- a/binding.cc +++ b/binding.cc @@ -18,11 +18,26 @@ struct Database; struct Iterator; static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb); +static leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance); +static leveldb::Status threadsafe_close(Database &db_instance); /** - * Macros. + * Global declarations for multi-threaded access. These are not context-aware + * by definition and is specifically to allow for cross thread access to the + * single database handle. */ +struct LevelDbHandle +{ + leveldb::DB *db; + size_t open_handle_count; +}; +static std::mutex handles_mutex; +// only access this when protected by the handles_mutex!!! +static std::map db_handles; +/** + * Macros. + */ #define NAPI_DB_CONTEXT() \ Database* database = NULL; \ NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&database)); @@ -495,19 +510,20 @@ struct Database { ~Database () { if (db_ != NULL) { - delete db_; - db_ = NULL; + threadsafe_close(*this); } } leveldb::Status Open (const leveldb::Options& options, - const char* location) { - return leveldb::DB::Open(options, location, &db_); + const std::string &location) { + location_ = location; + return threadsafe_open(options, *this); } void CloseDatabase () { - delete db_; - db_ = NULL; + if (db_ != NULL) { + threadsafe_close(*this); + } if (blockCache_) { delete blockCache_; blockCache_ = NULL; @@ -600,8 +616,55 @@ struct Database { private: uint32_t priorityWork_; + std::string location_; + + // for separation of concerns the threadsafe functionality was kept at the global + // level and made a friend so it is explict where the threadsafe boundary exists + friend leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance); + friend leveldb::Status threadsafe_close(Database &db_instance); }; + +leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance) { + std::unique_lock lock(handles_mutex); + + auto it = db_handles.find(db_instance.location_); + if (it != db_handles.end()) { + // Database already opened for this location + ++(it->second.open_handle_count); + db_instance.db_ = it->second.db; + } else { + // Database not opened yet for this location + LevelDbHandle handle = {nullptr, 0}; + leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db); + if (!status.ok()) { + return status; + } + handle.open_handle_count++; + db_instance.db_ = handle.db; + db_handles[db_instance.location_] = handle; + } + + return leveldb::Status::OK(); +} + +leveldb::Status threadsafe_close(Database &db_instance) { + std::unique_lock lock(handles_mutex); + db_instance.db_ = NULL; // ensure db_ pointer is nullified in Database instance + + auto it = db_handles.find(db_instance.location_); + if (it != db_handles.end()) { + if (--(it->second.open_handle_count) == 0) { + delete it->second.db; + db_handles.erase(it); + } + } else { + return leveldb::Status::NotFound("Database handle not found for the given location"); + } + + return leveldb::Status::OK(); +} + /** * Base worker class for doing async work that defers closing the database. */ @@ -998,7 +1061,7 @@ struct OpenWorker final : public BaseWorker { ~OpenWorker () {} void DoExecute () override { - SetStatus(database_->Open(options_, location_.c_str())); + SetStatus(database_->Open(options_, location_)); } leveldb::Options options_; From 44784df8fda7fbc656eb6b3fcc678ee96000c8d7 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Thu, 26 Oct 2023 17:51:43 +0200 Subject: [PATCH 02/20] fix: add mutex include --- binding.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/binding.cc b/binding.cc index f6235c9..0653434 100644 --- a/binding.cc +++ b/binding.cc @@ -11,6 +11,7 @@ #include #include +#include /** * Forward declarations. From 1115e8b352f55f8dc324b7ebdaecaca6a09a835f Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Fri, 27 Oct 2023 23:26:27 +0200 Subject: [PATCH 03/20] feat: add allowMultiThreading option --- binding.cc | 31 +++++++++++++++++++++++-------- index.d.ts | 8 ++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/binding.cc b/binding.cc index 0653434..b3e4056 100644 --- a/binding.cc +++ b/binding.cc @@ -19,7 +19,9 @@ struct Database; struct Iterator; static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb); -static leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance); +static leveldb::Status threadsafe_open(const leveldb::Options &options, + bool allow_multi_threading, + Database &db_instance); static leveldb::Status threadsafe_close(Database &db_instance); /** @@ -516,9 +518,10 @@ struct Database { } leveldb::Status Open (const leveldb::Options& options, - const std::string &location) { + const std::string &location, + bool allow_multi_threading) { location_ = location; - return threadsafe_open(options, *this); + return threadsafe_open(options, allow_multi_threading, *this); } void CloseDatabase () { @@ -621,16 +624,23 @@ struct Database { // for separation of concerns the threadsafe functionality was kept at the global // level and made a friend so it is explict where the threadsafe boundary exists - friend leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance); + friend leveldb::Status threadsafe_open(const leveldb::Options &options, + bool allow_multi_threading, + Database &db_instance); friend leveldb::Status threadsafe_close(Database &db_instance); }; -leveldb::Status threadsafe_open(const leveldb::Options &options, Database &db_instance) { +leveldb::Status threadsafe_open(const leveldb::Options &options, + bool allow_multi_threading, + Database &db_instance) { std::unique_lock lock(handles_mutex); auto it = db_handles.find(db_instance.location_); if (it != db_handles.end()) { + if (!allow_multi_threading) { + return leveldb::Status::IOError("lock " + db_instance.location_, "already held by process"); + } // Database already opened for this location ++(it->second.open_handle_count); db_instance.db_ = it->second.db; @@ -1038,13 +1048,15 @@ struct OpenWorker final : public BaseWorker { const bool createIfMissing, const bool errorIfExists, const bool compression, + const bool allowMultiThreading, const uint32_t writeBufferSize, const uint32_t blockSize, const uint32_t maxOpenFiles, const uint32_t blockRestartInterval, const uint32_t maxFileSize) : BaseWorker(env, database, callback, "classic_level.db.open"), - location_(location) { + location_(location), + allow_multi_threading_(allowMultiThreading) { options_.block_cache = database->blockCache_; options_.filter_policy = database->filterPolicy_; options_.create_if_missing = createIfMissing; @@ -1062,11 +1074,12 @@ struct OpenWorker final : public BaseWorker { ~OpenWorker () {} void DoExecute () override { - SetStatus(database_->Open(options_, location_)); + SetStatus(database_->Open(options_, location_, allow_multi_threading_)); } leveldb::Options options_; std::string location_; + bool allow_multi_threading_; }; /** @@ -1081,6 +1094,7 @@ NAPI_METHOD(db_open) { const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); const bool compression = BooleanProperty(env, options, "compression", true); + const bool allowMultiThreading = BooleanProperty(env, options, "allowMultiThreading", false); const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); @@ -1095,7 +1109,8 @@ NAPI_METHOD(db_open) { napi_value callback = argv[3]; OpenWorker* worker = new OpenWorker(env, database, callback, location, createIfMissing, errorIfExists, - compression, writeBufferSize, blockSize, + compression, allowMultiThreading, + writeBufferSize, blockSize, maxOpenFiles, blockRestartInterval, maxFileSize); worker->Queue(env); diff --git a/index.d.ts b/index.d.ts index 6476950..6e757ae 100644 --- a/index.d.ts +++ b/index.d.ts @@ -213,6 +213,14 @@ export interface OpenOptions extends AbstractOpenOptions { * @defaultValue `2 * 1024 * 1024` */ maxFileSize?: number | undefined + + /** + * Allows multi-threaded access to a single DB instance for sharing a DB + * across multiple worker threads within the same process. + * + * NOTE: Does not work for child process + */ + allowMultiThreading?: boolean | undefined } /** From 884d1a05395bb327ca2f6a9b0c1b97831d98f1be Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Fri, 27 Oct 2023 23:27:47 +0200 Subject: [PATCH 04/20] test: add multithreading-test and worker-test and worker-utils --- test/multithreading-test.js | 44 +++++++++++++++++++++++++++++++++++++ test/worker-test.js | 15 +++++++++++++ test/worker-utils.js | 21 ++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 test/multithreading-test.js create mode 100644 test/worker-test.js create mode 100644 test/worker-utils.js diff --git a/test/multithreading-test.js b/test/multithreading-test.js new file mode 100644 index 0000000..2ee3831 --- /dev/null +++ b/test/multithreading-test.js @@ -0,0 +1,44 @@ +"use strict"; + +const test = require("tape"); +const tempy = require("tempy"); +const path = require("path"); +const { Worker } = require("worker_threads"); +const { ClassicLevel } = require(".."); +const { createRandomKeys, getRandomKeys } = require("./worker-utils"); + +test("allow multi-threading by same process", async function (t) { + t.plan(2); + const location = tempy.directory(); + const db = new ClassicLevel(location); + await db.open(); + await createRandomKeys(db); + + const worker = new Worker(path.join(__dirname, "worker-test.js"), { + workerData: { location }, + }); + + function onMessage(_) { + getRandomKeys(db, "main").catch((err) => { + worker.removeListener("error", onError); + onError(err); + }); + } + worker.on("message", onMessage); + + function onError(err) { + worker.removeListener("message", onMessage); + worker.removeListener("exit", onExit); + t.ifError(err, "worker error"); + db.close(t.ifError.bind(t)); + } + worker.once("error", onError); + + function onExit(code) { + worker.removeListener("message", onMessage); + worker.removeListener("error", onError); + t.equal(code, 0, 'child exited normally'); + db.close(t.ifError.bind(t)); + } + worker.once("exit", onExit); +}); diff --git a/test/worker-test.js b/test/worker-test.js new file mode 100644 index 0000000..2541664 --- /dev/null +++ b/test/worker-test.js @@ -0,0 +1,15 @@ +"use strict"; + +const { parentPort, workerData } = require("worker_threads"); +const { ClassicLevel } = require(".."); +const { getRandomKeys } = require("./worker-utils"); + +(async function main() { + const db = new ClassicLevel(workerData.location); + await db.open({ allowMultiThreading: true }); + + parentPort.postMessage("starting"); + + await getRandomKeys(db, "worker"); + await db.close(); +})(); diff --git a/test/worker-utils.js b/test/worker-utils.js new file mode 100644 index 0000000..389bd75 --- /dev/null +++ b/test/worker-utils.js @@ -0,0 +1,21 @@ +const MIN_KEY = 1; +const MAX_KEY = 100; +const TEST_INTERVAL_MS = 1000; + +exports.createRandomKeys = async (db) => { + for (let i = MIN_KEY; i <= MAX_KEY; i++) { + await db.put(`key${i}`, `value${i}`); + } +}; + +exports.getRandomKeys = async (db, thread) => { + const start = Date.now(); + while (Date.now() - start < TEST_INTERVAL_MS) { + const randomKey = Math.floor( + Math.random() * (MAX_KEY - MIN_KEY + 1) + MIN_KEY + ); + // console.log(thread + ": got " + + await db.get(`key${randomKey}`); + // ); + } +}; From f243ce962a7183b7c381789c80bf0f19e6ffc7e6 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Fri, 3 Nov 2023 19:32:03 +0300 Subject: [PATCH 05/20] test: clean up unit tests for worker --- test/multithreading-test.js | 97 ++++++++++++++++++++++++++----------- test/worker-test.js | 11 +++-- test/worker-utils.js | 25 ++-------- 3 files changed, 79 insertions(+), 54 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 2ee3831..2a5f1a3 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -5,40 +5,79 @@ const tempy = require("tempy"); const path = require("path"); const { Worker } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { createRandomKeys, getRandomKeys } = require("./worker-utils"); +const { CLOSED_DB_MESSAGE } = require("./worker-utils"); -test("allow multi-threading by same process", async function (t) { +/** + * Makes sure that the allowMultiThreading flag is working as expected + */ +test("check allowMultiThreading flag works as expected", async function (t) { + t.plan(5); + const location = tempy.directory(); + const db1 = new ClassicLevel(location); + await db1.open({ location }); + t.is(db1.location, location); + + const db2 = new ClassicLevel(location); + await db2.open({ location, allowMultiThreading: true }); + t.is(db2.location, location); + + const db3 = new ClassicLevel(location); + try { + await db3.open({ location, allowMultiThreading: false }); + } catch (err) { + t.is(err.code, "LEVEL_DATABASE_NOT_OPEN", "third instance failed to open"); + t.is(err.cause.code, "LEVEL_LOCKED", "third instance got lock error"); + } + + await db1.close(); + await db2.close(); + + const db4 = new ClassicLevel(location); + await db4.open({ location, allowMultiThreading: false }); + t.is(db4.location, location); + await db4.close(); +}); + +/** + * Tests for interleaved opening and closing of the database to check + * that the mutex for guarding the handles is working as expected + */ +test("open/close mutex works as expected", async function (t) { t.plan(2); const location = tempy.directory(); - const db = new ClassicLevel(location); - await db.open(); - await createRandomKeys(db); - - const worker = new Worker(path.join(__dirname, "worker-test.js"), { - workerData: { location }, - }); - - function onMessage(_) { - getRandomKeys(db, "main").catch((err) => { - worker.removeListener("error", onError); - onError(err); + const db1 = new ClassicLevel(location); + await db1.open({ location }); + t.is(db1.location, location); + + const activeWorkers = []; + + for (let i = 0; i < 100; i++) { + const worker = new Worker(path.join(__dirname, "worker-test.js"), { + workerData: { + location, + }, }); - } - worker.on("message", onMessage); - function onError(err) { - worker.removeListener("message", onMessage); - worker.removeListener("exit", onExit); - t.ifError(err, "worker error"); - db.close(t.ifError.bind(t)); + activeWorkers.push( + new Promise((resolve, reject) => { + worker.once("error", (err) => { + worker.removeAllListeners("message"); + reject(err); + }); + worker.once("message", (message) => { + if (message !== CLOSED_DB_MESSAGE) { + return reject("did not receive correct message"); + } + worker.removeAllListeners("error"); + resolve(); + }); + }) + ); } - worker.once("error", onError); - function onExit(code) { - worker.removeListener("message", onMessage); - worker.removeListener("error", onError); - t.equal(code, 0, 'child exited normally'); - db.close(t.ifError.bind(t)); - } - worker.once("exit", onExit); + const results = await Promise.allSettled(activeWorkers); + const rejected = results.filter((res) => res.status === "rejected"); + t.is(rejected.length, 0); + + await db1.close(); }); diff --git a/test/worker-test.js b/test/worker-test.js index 2541664..469da7d 100644 --- a/test/worker-test.js +++ b/test/worker-test.js @@ -2,14 +2,15 @@ const { parentPort, workerData } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { getRandomKeys } = require("./worker-utils"); +const { CLOSED_DB_MESSAGE, getRandomValue } = require("./worker-utils"); (async function main() { const db = new ClassicLevel(workerData.location); await db.open({ allowMultiThreading: true }); - parentPort.postMessage("starting"); - - await getRandomKeys(db, "worker"); - await db.close(); + setTimeout(() => { + db.close().then(() => { + parentPort.postMessage(CLOSED_DB_MESSAGE); + }); + }, getRandomValue(1, 100)); })(); diff --git a/test/worker-utils.js b/test/worker-utils.js index 389bd75..3bca598 100644 --- a/test/worker-utils.js +++ b/test/worker-utils.js @@ -1,21 +1,6 @@ -const MIN_KEY = 1; -const MAX_KEY = 100; -const TEST_INTERVAL_MS = 1000; +exports.CLOSED_DB_MESSAGE = "closed db"; -exports.createRandomKeys = async (db) => { - for (let i = MIN_KEY; i <= MAX_KEY; i++) { - await db.put(`key${i}`, `value${i}`); - } -}; - -exports.getRandomKeys = async (db, thread) => { - const start = Date.now(); - while (Date.now() - start < TEST_INTERVAL_MS) { - const randomKey = Math.floor( - Math.random() * (MAX_KEY - MIN_KEY + 1) + MIN_KEY - ); - // console.log(thread + ": got " + - await db.get(`key${randomKey}`); - // ); - } -}; +function getRandomValue(minValue, maxValue) { + return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue); +} +exports.getRandomValue = getRandomValue; From f8ea4074ecf2bf4822248ab76159de4a3da2bdc1 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Sun, 5 Nov 2023 14:01:29 +0300 Subject: [PATCH 06/20] test: add read/write multithreading case --- test/multithreading-test.js | 92 +++++++++++++++++++++++++++++++------ test/worker-test.js | 83 +++++++++++++++++++++++++++++++-- test/worker-utils.js | 24 ++++++++++ 3 files changed, 181 insertions(+), 18 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 2a5f1a3..8149c71 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -5,7 +5,18 @@ const tempy = require("tempy"); const path = require("path"); const { Worker } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { CLOSED_DB_MESSAGE } = require("./worker-utils"); +const { + MIN_KEY, + MID_KEY, + MAX_KEY, + CLOSED_DB_MESSAGE, + WORKER_CREATING_KEYS_MESSAGE, + WORKER_READY_TO_READ_MESSAGE, + WORKER_ERROR_MESSAGE, + START_READING_MESSAGE, + createRandomKeys, + getRandomKeys, +} = require("./worker-utils"); /** * Makes sure that the allowMultiThreading flag is working as expected @@ -53,23 +64,19 @@ test("open/close mutex works as expected", async function (t) { for (let i = 0; i < 100; i++) { const worker = new Worker(path.join(__dirname, "worker-test.js"), { - workerData: { - location, - }, + workerData: { location, workerStartup: true }, }); activeWorkers.push( new Promise((resolve, reject) => { - worker.once("error", (err) => { - worker.removeAllListeners("message"); - reject(err); - }); - worker.once("message", (message) => { - if (message !== CLOSED_DB_MESSAGE) { - return reject("did not receive correct message"); + worker.once("message", ({ message, error }) => { + if (message === WORKER_ERROR_MESSAGE) { + return reject(error); + } + if (message === CLOSED_DB_MESSAGE) { + return resolve(); } - worker.removeAllListeners("error"); - resolve(); + return reject("unexpected error\n>>> " + error); }); }) ); @@ -81,3 +88,62 @@ test("open/close mutex works as expected", async function (t) { await db1.close(); }); + +test("allow multi-threading by same process", async function (t) { + try { + const location = tempy.directory(); + const db = new ClassicLevel(location); + + const worker = new Worker(path.join(__dirname, "worker-test.js"), { + workerData: { location, readWrite: true }, + }); + + function cleanup(err) { + worker.removeAllListeners("message"); + worker.removeAllListeners("error"); + worker.terminate(); + if (err) { + throw err; + } + } + + worker.on("error", cleanup); + worker.on("message", ({ message, error }) => { + if (message === WORKER_ERROR_MESSAGE) { + cleanup(new Error(error)); + } + }); + + // Concurrently write keys to the db on both thread and wait + // until ready before attempting to concurrently read keys + const workerReady = new Promise((resolve) => { + let mainThreadReady = false; + worker.on("message", ({ message }) => { + if (message === WORKER_CREATING_KEYS_MESSAGE) { + createRandomKeys(db, MID_KEY, MAX_KEY).then(() => { + mainThreadReady = true; + }); + } else if (message === WORKER_READY_TO_READ_MESSAGE) { + const interval = setInterval(() => { + if (mainThreadReady) { + clearInterval(interval); + resolve(); + } + }, 100); + } + }); + }); + + await workerReady; + + // once db is seeded start reading keys from both threads + worker.postMessage({ message: START_READING_MESSAGE }); + await getRandomKeys(db, MIN_KEY, MAX_KEY); + await db.close(); + + t.end(); + } catch (error) { + t.fail(error.message); + t.end(); + } +}); diff --git a/test/worker-test.js b/test/worker-test.js index 469da7d..67fe889 100644 --- a/test/worker-test.js +++ b/test/worker-test.js @@ -2,15 +2,88 @@ const { parentPort, workerData } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { CLOSED_DB_MESSAGE, getRandomValue } = require("./worker-utils"); +const { + MIN_KEY, + MID_KEY, + MAX_KEY, + CLOSED_DB_MESSAGE, + WORKER_CREATING_KEYS_MESSAGE, + WORKER_READY_TO_READ_MESSAGE, + WORKER_ERROR_MESSAGE, + START_READING_MESSAGE, + getRandomValue, + createRandomKeys, + getRandomKeys, +} = require("./worker-utils"); (async function main() { const db = new ClassicLevel(workerData.location); await db.open({ allowMultiThreading: true }); - setTimeout(() => { - db.close().then(() => { - parentPort.postMessage(CLOSED_DB_MESSAGE); + try { + /** + * test "open/close mutex works as expected" + */ + if (workerData.workerStartup) { + setTimeout(() => { + db.close() + .catch((err) => { + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }); + }) + .then(() => { + parentPort.postMessage({ + message: CLOSED_DB_MESSAGE, + }); + }); + }, getRandomValue(1, 100)); + return; + } + + /** + * test "allow multi-threading by same process" + */ + if (workerData.readWrite) { + parentPort.once("message", ({ message }) => { + if (message !== START_READING_MESSAGE) { + return parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: `did not receive '${START_READING_MESSAGE}' message`, + }); + } + getRandomKeys(db, MIN_KEY, MAX_KEY) + .then(() => db.close()) + .catch((err) => + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }) + ); + }); + + parentPort.postMessage({ message: WORKER_CREATING_KEYS_MESSAGE }); + await createRandomKeys(db, MIN_KEY, MID_KEY).catch((err) => { + parentPort.removeAllListeners("message"); + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }); + }); + parentPort.postMessage({ message: WORKER_READY_TO_READ_MESSAGE }); + + return; + } + + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: "invalid workerData", + }); + } catch (err) { + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, }); - }, getRandomValue(1, 100)); + } })(); diff --git a/test/worker-utils.js b/test/worker-utils.js index 3bca598..486d1aa 100644 --- a/test/worker-utils.js +++ b/test/worker-utils.js @@ -1,6 +1,30 @@ +exports.TEST_INTERVAL_MS = 1000; + +exports.MIN_KEY = 1; +exports.MAX_KEY = 1000; +exports.MID_KEY = exports.MAX_KEY / 2; + exports.CLOSED_DB_MESSAGE = "closed db"; +exports.WORKER_CREATING_KEYS_MESSAGE = "worker creating keys"; +exports.WORKER_READY_TO_READ_MESSAGE = "worker ready to read keys"; +exports.WORKER_ERROR_MESSAGE = "worker error"; +exports.START_READING_MESSAGE = "start reading"; function getRandomValue(minValue, maxValue) { return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue); } exports.getRandomValue = getRandomValue; + +exports.createRandomKeys = async (db, minKey, maxKey) => { + for (let i = minKey; i <= maxKey; i++) { + await db.put(`key${i}`, `value${i}`); + } +}; + +exports.getRandomKeys = async (db, minKey, maxKey) => { + const start = Date.now(); + while (Date.now() - start < exports.TEST_INTERVAL_MS) { + const randomKey = getRandomValue(minKey, maxKey); + await db.get(`key${randomKey}`); + } +}; From 3519bc9ff078973b6213dca061999f5224e081af Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Mon, 6 Nov 2023 18:10:30 +0300 Subject: [PATCH 07/20] chore: lint new code --- test/multithreading-test.js | 166 ++++++++++++++++++------------------ test/worker-test.js | 66 +++++++------- test/worker-utils.js | 36 ++++---- 3 files changed, 134 insertions(+), 134 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 8149c71..ad357af 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -1,10 +1,10 @@ -"use strict"; +'use strict' -const test = require("tape"); -const tempy = require("tempy"); -const path = require("path"); -const { Worker } = require("worker_threads"); -const { ClassicLevel } = require(".."); +const test = require('tape') +const tempy = require('tempy') +const path = require('path') +const { Worker } = require('worker_threads') +const { ClassicLevel } = require('..') const { MIN_KEY, MID_KEY, @@ -15,135 +15,135 @@ const { WORKER_ERROR_MESSAGE, START_READING_MESSAGE, createRandomKeys, - getRandomKeys, -} = require("./worker-utils"); + getRandomKeys +} = require('./worker-utils') /** * Makes sure that the allowMultiThreading flag is working as expected */ -test("check allowMultiThreading flag works as expected", async function (t) { - t.plan(5); - const location = tempy.directory(); - const db1 = new ClassicLevel(location); - await db1.open({ location }); - t.is(db1.location, location); - - const db2 = new ClassicLevel(location); - await db2.open({ location, allowMultiThreading: true }); - t.is(db2.location, location); - - const db3 = new ClassicLevel(location); +test('check allowMultiThreading flag works as expected', async function (t) { + t.plan(5) + const location = tempy.directory() + const db1 = new ClassicLevel(location) + await db1.open({ location }) + t.is(db1.location, location) + + const db2 = new ClassicLevel(location) + await db2.open({ location, allowMultiThreading: true }) + t.is(db2.location, location) + + const db3 = new ClassicLevel(location) try { - await db3.open({ location, allowMultiThreading: false }); + await db3.open({ location, allowMultiThreading: false }) } catch (err) { - t.is(err.code, "LEVEL_DATABASE_NOT_OPEN", "third instance failed to open"); - t.is(err.cause.code, "LEVEL_LOCKED", "third instance got lock error"); + t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') + t.is(err.cause.code, 'LEVEL_LOCKED', 'third instance got lock error') } - await db1.close(); - await db2.close(); + await db1.close() + await db2.close() - const db4 = new ClassicLevel(location); - await db4.open({ location, allowMultiThreading: false }); - t.is(db4.location, location); - await db4.close(); -}); + const db4 = new ClassicLevel(location) + await db4.open({ location, allowMultiThreading: false }) + t.is(db4.location, location) + await db4.close() +}) /** * Tests for interleaved opening and closing of the database to check * that the mutex for guarding the handles is working as expected */ -test("open/close mutex works as expected", async function (t) { - t.plan(2); - const location = tempy.directory(); - const db1 = new ClassicLevel(location); - await db1.open({ location }); - t.is(db1.location, location); +test('open/close mutex works as expected', async function (t) { + t.plan(2) + const location = tempy.directory() + const db1 = new ClassicLevel(location) + await db1.open({ location }) + t.is(db1.location, location) - const activeWorkers = []; + const activeWorkers = [] for (let i = 0; i < 100; i++) { - const worker = new Worker(path.join(__dirname, "worker-test.js"), { - workerData: { location, workerStartup: true }, - }); + const worker = new Worker(path.join(__dirname, 'worker-test.js'), { + workerData: { location, workerStartup: true } + }) activeWorkers.push( new Promise((resolve, reject) => { - worker.once("message", ({ message, error }) => { + worker.once('message', ({ message, error }) => { if (message === WORKER_ERROR_MESSAGE) { - return reject(error); + return reject(error) } if (message === CLOSED_DB_MESSAGE) { - return resolve(); + return resolve() } - return reject("unexpected error\n>>> " + error); - }); + return reject(new Error('unexpected error\n>>> ' + error)) + }) }) - ); + ) } - const results = await Promise.allSettled(activeWorkers); - const rejected = results.filter((res) => res.status === "rejected"); - t.is(rejected.length, 0); + const results = await Promise.allSettled(activeWorkers) + const rejected = results.filter((res) => res.status === 'rejected') + t.is(rejected.length, 0) - await db1.close(); -}); + await db1.close() +}) -test("allow multi-threading by same process", async function (t) { +test('allow multi-threading by same process', async function (t) { try { - const location = tempy.directory(); - const db = new ClassicLevel(location); + const location = tempy.directory() + const db = new ClassicLevel(location) - const worker = new Worker(path.join(__dirname, "worker-test.js"), { - workerData: { location, readWrite: true }, - }); + const worker = new Worker(path.join(__dirname, 'worker-test.js'), { + workerData: { location, readWrite: true } + }) - function cleanup(err) { - worker.removeAllListeners("message"); - worker.removeAllListeners("error"); - worker.terminate(); + function cleanup (err) { + worker.removeAllListeners('message') + worker.removeAllListeners('error') + worker.terminate() if (err) { - throw err; + throw err } } - worker.on("error", cleanup); - worker.on("message", ({ message, error }) => { + worker.on('error', cleanup) + worker.on('message', ({ message, error }) => { if (message === WORKER_ERROR_MESSAGE) { - cleanup(new Error(error)); + cleanup(new Error(error)) } - }); + }) // Concurrently write keys to the db on both thread and wait // until ready before attempting to concurrently read keys const workerReady = new Promise((resolve) => { - let mainThreadReady = false; - worker.on("message", ({ message }) => { + let mainThreadReady = false + worker.on('message', ({ message }) => { if (message === WORKER_CREATING_KEYS_MESSAGE) { createRandomKeys(db, MID_KEY, MAX_KEY).then(() => { - mainThreadReady = true; - }); + mainThreadReady = true + }) } else if (message === WORKER_READY_TO_READ_MESSAGE) { const interval = setInterval(() => { if (mainThreadReady) { - clearInterval(interval); - resolve(); + clearInterval(interval) + resolve() } - }, 100); + }, 100) } - }); - }); + }) + }) - await workerReady; + await workerReady // once db is seeded start reading keys from both threads - worker.postMessage({ message: START_READING_MESSAGE }); - await getRandomKeys(db, MIN_KEY, MAX_KEY); - await db.close(); + worker.postMessage({ message: START_READING_MESSAGE }) + await getRandomKeys(db, MIN_KEY, MAX_KEY) + await db.close() - t.end(); + t.end() } catch (error) { - t.fail(error.message); - t.end(); + t.fail(error.message) + t.end() } -}); +}) diff --git a/test/worker-test.js b/test/worker-test.js index 67fe889..6b77e8d 100644 --- a/test/worker-test.js +++ b/test/worker-test.js @@ -1,7 +1,7 @@ -"use strict"; +'use strict' -const { parentPort, workerData } = require("worker_threads"); -const { ClassicLevel } = require(".."); +const { parentPort, workerData } = require('worker_threads') +const { ClassicLevel } = require('..') const { MIN_KEY, MID_KEY, @@ -13,12 +13,12 @@ const { START_READING_MESSAGE, getRandomValue, createRandomKeys, - getRandomKeys, -} = require("./worker-utils"); + getRandomKeys +} = require('./worker-utils'); -(async function main() { - const db = new ClassicLevel(workerData.location); - await db.open({ allowMultiThreading: true }); +(async function main () { + const db = new ClassicLevel(workerData.location) + await db.open({ allowMultiThreading: true }) try { /** @@ -30,60 +30,60 @@ const { .catch((err) => { parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: err.message, - }); + error: err.message + }) }) .then(() => { parentPort.postMessage({ - message: CLOSED_DB_MESSAGE, - }); - }); - }, getRandomValue(1, 100)); - return; + message: CLOSED_DB_MESSAGE + }) + }) + }, getRandomValue(1, 100)) + return } /** * test "allow multi-threading by same process" */ if (workerData.readWrite) { - parentPort.once("message", ({ message }) => { + parentPort.once('message', ({ message }) => { if (message !== START_READING_MESSAGE) { return parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: `did not receive '${START_READING_MESSAGE}' message`, - }); + error: `did not receive '${START_READING_MESSAGE}' message` + }) } getRandomKeys(db, MIN_KEY, MAX_KEY) .then(() => db.close()) .catch((err) => parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: err.message, + error: err.message }) - ); - }); + ) + }) - parentPort.postMessage({ message: WORKER_CREATING_KEYS_MESSAGE }); + parentPort.postMessage({ message: WORKER_CREATING_KEYS_MESSAGE }) await createRandomKeys(db, MIN_KEY, MID_KEY).catch((err) => { - parentPort.removeAllListeners("message"); + parentPort.removeAllListeners('message') parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: err.message, - }); - }); - parentPort.postMessage({ message: WORKER_READY_TO_READ_MESSAGE }); + error: err.message + }) + }) + parentPort.postMessage({ message: WORKER_READY_TO_READ_MESSAGE }) - return; + return } parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: "invalid workerData", - }); + error: 'invalid workerData' + }) } catch (err) { parentPort.postMessage({ message: WORKER_ERROR_MESSAGE, - error: err.message, - }); + error: err.message + }) } -})(); +})() diff --git a/test/worker-utils.js b/test/worker-utils.js index 486d1aa..0907731 100644 --- a/test/worker-utils.js +++ b/test/worker-utils.js @@ -1,30 +1,30 @@ -exports.TEST_INTERVAL_MS = 1000; +exports.TEST_INTERVAL_MS = 1000 -exports.MIN_KEY = 1; -exports.MAX_KEY = 1000; -exports.MID_KEY = exports.MAX_KEY / 2; +exports.MIN_KEY = 1 +exports.MAX_KEY = 1000 +exports.MID_KEY = exports.MAX_KEY / 2 -exports.CLOSED_DB_MESSAGE = "closed db"; -exports.WORKER_CREATING_KEYS_MESSAGE = "worker creating keys"; -exports.WORKER_READY_TO_READ_MESSAGE = "worker ready to read keys"; -exports.WORKER_ERROR_MESSAGE = "worker error"; -exports.START_READING_MESSAGE = "start reading"; +exports.CLOSED_DB_MESSAGE = 'closed db' +exports.WORKER_CREATING_KEYS_MESSAGE = 'worker creating keys' +exports.WORKER_READY_TO_READ_MESSAGE = 'worker ready to read keys' +exports.WORKER_ERROR_MESSAGE = 'worker error' +exports.START_READING_MESSAGE = 'start reading' -function getRandomValue(minValue, maxValue) { - return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue); +function getRandomValue (minValue, maxValue) { + return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue) } -exports.getRandomValue = getRandomValue; +exports.getRandomValue = getRandomValue exports.createRandomKeys = async (db, minKey, maxKey) => { for (let i = minKey; i <= maxKey; i++) { - await db.put(`key${i}`, `value${i}`); + await db.put(`key${i}`, `value${i}`) } -}; +} exports.getRandomKeys = async (db, minKey, maxKey) => { - const start = Date.now(); + const start = Date.now() while (Date.now() - start < exports.TEST_INTERVAL_MS) { - const randomKey = getRandomValue(minKey, maxKey); - await db.get(`key${randomKey}`); + const randomKey = getRandomValue(minKey, maxKey) + await db.get(`key${randomKey}`) } -}; +} From db8828547224d4cd29060d6ee05231f30dabc85e Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Mon, 6 Nov 2023 18:14:20 +0300 Subject: [PATCH 08/20] refactor: rename worker-test to worker --- test/multithreading-test.js | 4 ++-- test/{worker-test.js => worker.js} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename test/{worker-test.js => worker.js} (100%) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index ad357af..227e946 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -63,7 +63,7 @@ test('open/close mutex works as expected', async function (t) { const activeWorkers = [] for (let i = 0; i < 100; i++) { - const worker = new Worker(path.join(__dirname, 'worker-test.js'), { + const worker = new Worker(path.join(__dirname, 'worker.js'), { workerData: { location, workerStartup: true } }) @@ -94,7 +94,7 @@ test('allow multi-threading by same process', async function (t) { const location = tempy.directory() const db = new ClassicLevel(location) - const worker = new Worker(path.join(__dirname, 'worker-test.js'), { + const worker = new Worker(path.join(__dirname, 'worker.js'), { workerData: { location, readWrite: true } }) diff --git a/test/worker-test.js b/test/worker.js similarity index 100% rename from test/worker-test.js rename to test/worker.js From 3709516e57e0150b6ab50c798b55f2773ea9ff08 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Mon, 6 Nov 2023 18:29:10 +0300 Subject: [PATCH 09/20] test: add case for passing allowMultiThreading to constructor --- test/multithreading-test.js | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 227e946..37615d1 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -22,19 +22,23 @@ const { * Makes sure that the allowMultiThreading flag is working as expected */ test('check allowMultiThreading flag works as expected', async function (t) { - t.plan(5) + t.plan(6) const location = tempy.directory() const db1 = new ClassicLevel(location) - await db1.open({ location }) + await db1.open() t.is(db1.location, location) const db2 = new ClassicLevel(location) - await db2.open({ location, allowMultiThreading: true }) + await db2.open({ allowMultiThreading: true }) t.is(db2.location, location) - const db3 = new ClassicLevel(location) + const db3 = new ClassicLevel(location, { allowMultiThreading: true }) + await db3.open() + t.is(db3.location, location) + + const db4 = new ClassicLevel(location) try { - await db3.open({ location, allowMultiThreading: false }) + await db4.open({ location, allowMultiThreading: false }) } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') t.is(err.cause.code, 'LEVEL_LOCKED', 'third instance got lock error') @@ -42,11 +46,12 @@ test('check allowMultiThreading flag works as expected', async function (t) { await db1.close() await db2.close() + await db3.close() - const db4 = new ClassicLevel(location) - await db4.open({ location, allowMultiThreading: false }) - t.is(db4.location, location) - await db4.close() + const db5 = new ClassicLevel(location) + await db5.open({ location, allowMultiThreading: false }) + t.is(db5.location, location) + await db5.close() }) /** From fb1a25cadc414d111c11ff14ee299b0ad23da9b6 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Mon, 6 Nov 2023 18:34:08 +0300 Subject: [PATCH 10/20] docs: add flag to README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 252aad0..719f87d 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,7 @@ The optional `options` object may contain: - `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail. - `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail. - `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database. +- `allowMultiThreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [`node.js worker_threads`](https://nodejs.org/api/worker_threads.html) For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application. From 1f4f0c03f86da603f9deabead7f7d662481f8f6f Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Mon, 6 Nov 2023 18:42:17 +0300 Subject: [PATCH 11/20] fix: remove nested conditional --- binding.cc | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/binding.cc b/binding.cc index b3e4056..28f9011 100644 --- a/binding.cc +++ b/binding.cc @@ -637,10 +637,7 @@ leveldb::Status threadsafe_open(const leveldb::Options &options, std::unique_lock lock(handles_mutex); auto it = db_handles.find(db_instance.location_); - if (it != db_handles.end()) { - if (!allow_multi_threading) { - return leveldb::Status::IOError("lock " + db_instance.location_, "already held by process"); - } + if (it != db_handles.end() && allow_multi_threading) { // Database already opened for this location ++(it->second.open_handle_count); db_instance.db_ = it->second.db; @@ -664,13 +661,13 @@ leveldb::Status threadsafe_close(Database &db_instance) { db_instance.db_ = NULL; // ensure db_ pointer is nullified in Database instance auto it = db_handles.find(db_instance.location_); - if (it != db_handles.end()) { - if (--(it->second.open_handle_count) == 0) { - delete it->second.db; - db_handles.erase(it); - } - } else { + if (it == db_handles.end()) { return leveldb::Status::NotFound("Database handle not found for the given location"); + } + + if (--(it->second.open_handle_count) == 0) { + delete it->second.db; + db_handles.erase(it); } return leveldb::Status::OK(); From 7e41b4bce4c041939a1f12bb32ff3219a18760b6 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Tue, 7 Nov 2023 09:39:15 +0300 Subject: [PATCH 12/20] refactor: change allowMultiThreading flag to multithreading --- README.md | 2 +- binding.cc | 24 ++++++++++++------------ index.d.ts | 2 +- test/multithreading-test.js | 12 ++++++------ test/worker.js | 2 +- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 719f87d..bf8911d 100644 --- a/README.md +++ b/README.md @@ -201,7 +201,7 @@ The optional `options` object may contain: - `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail. - `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail. - `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database. -- `allowMultiThreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [`node.js worker_threads`](https://nodejs.org/api/worker_threads.html) +- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [`node.js worker_threads`](https://nodejs.org/api/worker_threads.html) For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application. diff --git a/binding.cc b/binding.cc index 28f9011..9c16ca9 100644 --- a/binding.cc +++ b/binding.cc @@ -20,7 +20,7 @@ struct Database; struct Iterator; static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb); static leveldb::Status threadsafe_open(const leveldb::Options &options, - bool allow_multi_threading, + bool multithreading, Database &db_instance); static leveldb::Status threadsafe_close(Database &db_instance); @@ -519,9 +519,9 @@ struct Database { leveldb::Status Open (const leveldb::Options& options, const std::string &location, - bool allow_multi_threading) { + bool multithreading) { location_ = location; - return threadsafe_open(options, allow_multi_threading, *this); + return threadsafe_open(options, multithreading, *this); } void CloseDatabase () { @@ -625,19 +625,19 @@ struct Database { // for separation of concerns the threadsafe functionality was kept at the global // level and made a friend so it is explict where the threadsafe boundary exists friend leveldb::Status threadsafe_open(const leveldb::Options &options, - bool allow_multi_threading, + bool multithreading, Database &db_instance); friend leveldb::Status threadsafe_close(Database &db_instance); }; leveldb::Status threadsafe_open(const leveldb::Options &options, - bool allow_multi_threading, + bool multithreading, Database &db_instance) { std::unique_lock lock(handles_mutex); auto it = db_handles.find(db_instance.location_); - if (it != db_handles.end() && allow_multi_threading) { + if (it != db_handles.end() && multithreading) { // Database already opened for this location ++(it->second.open_handle_count); db_instance.db_ = it->second.db; @@ -1045,7 +1045,7 @@ struct OpenWorker final : public BaseWorker { const bool createIfMissing, const bool errorIfExists, const bool compression, - const bool allowMultiThreading, + const bool multithreading, const uint32_t writeBufferSize, const uint32_t blockSize, const uint32_t maxOpenFiles, @@ -1053,7 +1053,7 @@ struct OpenWorker final : public BaseWorker { const uint32_t maxFileSize) : BaseWorker(env, database, callback, "classic_level.db.open"), location_(location), - allow_multi_threading_(allowMultiThreading) { + multithreading_(multithreading) { options_.block_cache = database->blockCache_; options_.filter_policy = database->filterPolicy_; options_.create_if_missing = createIfMissing; @@ -1071,12 +1071,12 @@ struct OpenWorker final : public BaseWorker { ~OpenWorker () {} void DoExecute () override { - SetStatus(database_->Open(options_, location_, allow_multi_threading_)); + SetStatus(database_->Open(options_, location_, multithreading_)); } leveldb::Options options_; std::string location_; - bool allow_multi_threading_; + bool multithreading_; }; /** @@ -1091,7 +1091,7 @@ NAPI_METHOD(db_open) { const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); const bool compression = BooleanProperty(env, options, "compression", true); - const bool allowMultiThreading = BooleanProperty(env, options, "allowMultiThreading", false); + const bool multithreading = BooleanProperty(env, options, "multithreading", false); const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); @@ -1106,7 +1106,7 @@ NAPI_METHOD(db_open) { napi_value callback = argv[3]; OpenWorker* worker = new OpenWorker(env, database, callback, location, createIfMissing, errorIfExists, - compression, allowMultiThreading, + compression, multithreading, writeBufferSize, blockSize, maxOpenFiles, blockRestartInterval, maxFileSize); diff --git a/index.d.ts b/index.d.ts index 6e757ae..1759048 100644 --- a/index.d.ts +++ b/index.d.ts @@ -220,7 +220,7 @@ export interface OpenOptions extends AbstractOpenOptions { * * NOTE: Does not work for child process */ - allowMultiThreading?: boolean | undefined + multithreading?: boolean | undefined } /** diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 37615d1..dedf568 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -19,9 +19,9 @@ const { } = require('./worker-utils') /** - * Makes sure that the allowMultiThreading flag is working as expected + * Makes sure that the multithreading flag is working as expected */ -test('check allowMultiThreading flag works as expected', async function (t) { +test('check multithreading flag works as expected', async function (t) { t.plan(6) const location = tempy.directory() const db1 = new ClassicLevel(location) @@ -29,16 +29,16 @@ test('check allowMultiThreading flag works as expected', async function (t) { t.is(db1.location, location) const db2 = new ClassicLevel(location) - await db2.open({ allowMultiThreading: true }) + await db2.open({ multithreading: true }) t.is(db2.location, location) - const db3 = new ClassicLevel(location, { allowMultiThreading: true }) + const db3 = new ClassicLevel(location, { multithreading: true }) await db3.open() t.is(db3.location, location) const db4 = new ClassicLevel(location) try { - await db4.open({ location, allowMultiThreading: false }) + await db4.open({ location, multithreading: false }) } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') t.is(err.cause.code, 'LEVEL_LOCKED', 'third instance got lock error') @@ -49,7 +49,7 @@ test('check allowMultiThreading flag works as expected', async function (t) { await db3.close() const db5 = new ClassicLevel(location) - await db5.open({ location, allowMultiThreading: false }) + await db5.open({ location, multithreading: false }) t.is(db5.location, location) await db5.close() }) diff --git a/test/worker.js b/test/worker.js index 6b77e8d..6f447d2 100644 --- a/test/worker.js +++ b/test/worker.js @@ -18,7 +18,7 @@ const { (async function main () { const db = new ClassicLevel(workerData.location) - await db.open({ allowMultiThreading: true }) + await db.open({ multithreading: true }) try { /** From 4aa3c125b56e22f5ef2da284778c41bc5e5f8fb6 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Tue, 7 Nov 2023 09:47:39 +0300 Subject: [PATCH 13/20] docs: PR comment docstring changes --- README.md | 2 +- binding.cc | 2 +- index.d.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bf8911d..3221bd6 100644 --- a/README.md +++ b/README.md @@ -201,7 +201,7 @@ The optional `options` object may contain: - `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail. - `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail. - `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database. -- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [`node.js worker_threads`](https://nodejs.org/api/worker_threads.html) +- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [worker threads](https://nodejs.org/api/worker_threads.html) For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application. diff --git a/binding.cc b/binding.cc index 9c16ca9..9701ba3 100644 --- a/binding.cc +++ b/binding.cc @@ -35,7 +35,7 @@ struct LevelDbHandle size_t open_handle_count; }; static std::mutex handles_mutex; -// only access this when protected by the handles_mutex!!! +// only access this when protected by the handles_mutex! static std::map db_handles; /** diff --git a/index.d.ts b/index.d.ts index 1759048..4f3cfb0 100644 --- a/index.d.ts +++ b/index.d.ts @@ -218,7 +218,7 @@ export interface OpenOptions extends AbstractOpenOptions { * Allows multi-threaded access to a single DB instance for sharing a DB * across multiple worker threads within the same process. * - * NOTE: Does not work for child process + * @defaultValue `false` */ multithreading?: boolean | undefined } From f47200b5b98049be0e32d01fd1cdf52dedd66c02 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Tue, 7 Nov 2023 09:48:48 +0300 Subject: [PATCH 14/20] fix: change NotFound to OK status for invalid handle map iterator --- binding.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/binding.cc b/binding.cc index 9701ba3..1668492 100644 --- a/binding.cc +++ b/binding.cc @@ -662,7 +662,9 @@ leveldb::Status threadsafe_close(Database &db_instance) { auto it = db_handles.find(db_instance.location_); if (it == db_handles.end()) { - return leveldb::Status::NotFound("Database handle not found for the given location"); + // this should never happen in theory but silently fail and return OK to + // prevent segfault if it does + return leveldb::Status::OK(); } if (--(it->second.open_handle_count) == 0) { From 5d4d40e7e4bd4ca01421d4854e4ffa48f8ace677 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Wed, 8 Nov 2023 14:44:41 +0300 Subject: [PATCH 15/20] feat: update multithreading so 'false' removes ability to open multiple --- binding.cc | 18 +++++++---- test/multithreading-test.js | 61 ++++++++++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/binding.cc b/binding.cc index 1668492..df23a3b 100644 --- a/binding.cc +++ b/binding.cc @@ -33,6 +33,7 @@ struct LevelDbHandle { leveldb::DB *db; size_t open_handle_count; + bool multithreading; }; static std::mutex handles_mutex; // only access this when protected by the handles_mutex! @@ -637,13 +638,9 @@ leveldb::Status threadsafe_open(const leveldb::Options &options, std::unique_lock lock(handles_mutex); auto it = db_handles.find(db_instance.location_); - if (it != db_handles.end() && multithreading) { - // Database already opened for this location - ++(it->second.open_handle_count); - db_instance.db_ = it->second.db; - } else { + if (it == db_handles.end()) { // Database not opened yet for this location - LevelDbHandle handle = {nullptr, 0}; + LevelDbHandle handle = {nullptr, 0, multithreading}; leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db); if (!status.ok()) { return status; @@ -651,8 +648,17 @@ leveldb::Status threadsafe_open(const leveldb::Options &options, handle.open_handle_count++; db_instance.db_ = handle.db; db_handles[db_instance.location_] = handle; + return leveldb::Status::OK(); } + if (!(it->second.multithreading && multithreading)) { + // Database already opened for this location that disallows multithreading + return leveldb::Status::InvalidArgument("Database already opened. Must set multithreading flag to true for all instances"); + } + + ++(it->second.open_handle_count); + db_instance.db_ = it->second.db; + return leveldb::Status::OK(); } diff --git a/test/multithreading-test.js b/test/multithreading-test.js index dedf568..d5ee6ef 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -22,28 +22,45 @@ const { * Makes sure that the multithreading flag is working as expected */ test('check multithreading flag works as expected', async function (t) { - t.plan(6) + t.plan(9) const location = tempy.directory() const db1 = new ClassicLevel(location) + const db2 = new ClassicLevel(location) + + // check that must set multithreading flag on all instances await db1.open() t.is(db1.location, location) + try { + await db2.open({ multithreading: true }) + } catch (err) { + t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') + t.is( + err.cause.message, + 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', + 'third instance got lock error' + ) + } + await db1.close() - const db2 = new ClassicLevel(location) + await db1.open({ multithreading: true }) + t.is(db1.location, location) await db2.open({ multithreading: true }) t.is(db2.location, location) - + // test that passing to the constructor works const db3 = new ClassicLevel(location, { multithreading: true }) await db3.open() t.is(db3.location, location) - const db4 = new ClassicLevel(location) try { await db4.open({ location, multithreading: false }) } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') - t.is(err.cause.code, 'LEVEL_LOCKED', 'third instance got lock error') + t.is( + err.cause.message, + 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', + 'third instance got lock error' + ) } - await db1.close() await db2.close() await db3.close() @@ -56,13 +73,18 @@ test('check multithreading flag works as expected', async function (t) { /** * Tests for interleaved opening and closing of the database to check - * that the mutex for guarding the handles is working as expected + * that the mutex for guarding the handles is working as expected. Creates + * many workers that only open and then close the db after a random delay. Goal + * is to interleave the open and close processes to ensure that the mutex is + * guarding the handles correctly. After all workers have completed the main + * thread closes the db and then opens it again as a non-multi-threaded instance + * to make sure the handle was deleted correctly. */ test('open/close mutex works as expected', async function (t) { - t.plan(2) + t.plan(3) const location = tempy.directory() const db1 = new ClassicLevel(location) - await db1.open({ location }) + await db1.open({ multithreading: true }) t.is(db1.location, location) const activeWorkers = [] @@ -90,14 +112,31 @@ test('open/close mutex works as expected', async function (t) { const results = await Promise.allSettled(activeWorkers) const rejected = results.filter((res) => res.status === 'rejected') t.is(rejected.length, 0) - + await db1.close() + + // reopen the db non-multithreaded to check that the handle record was fully + // deleted from the handle map + await db1.open({ multithreading: false }) + t.is(db1.location, location) await db1.close() }) +/** + * Tests for reading and writing to a single db from multiple threads. + * + * Starts by setting up worker and then worker reports its ready and immediately + * starts writing to the database. Main thread gets message and also writes to + * the same db but to a different key space. Goal is to concurrently write + * consecutively numbered records. Once records are all written the worker + * reports to the main thread and the main thread waits until both threads are + * complete with the writing process. When both are ready they concurrently read + * random records from the full key space for a set interval. + */ test('allow multi-threading by same process', async function (t) { try { const location = tempy.directory() - const db = new ClassicLevel(location) + const db = new ClassicLevel(location, { multithreading: true }) + await db.open() const worker = new Worker(path.join(__dirname, 'worker.js'), { workerData: { location, readWrite: true } From 8427a707180e1d1f60f4e2c306c524f017bc971f Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Wed, 8 Nov 2023 14:46:18 +0300 Subject: [PATCH 16/20] chore: run lint --- test/multithreading-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index d5ee6ef..50ea45a 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -113,7 +113,7 @@ test('open/close mutex works as expected', async function (t) { const rejected = results.filter((res) => res.status === 'rejected') t.is(rejected.length, 0) await db1.close() - + // reopen the db non-multithreaded to check that the handle record was fully // deleted from the handle map await db1.open({ multithreading: false }) From b60e6eea28e5e77bb21e1314a087b3f4cb36b7f1 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Wed, 8 Nov 2023 14:49:14 +0300 Subject: [PATCH 17/20] fix: change test error strings --- test/multithreading-test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 50ea45a..9b0ea6c 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -37,7 +37,7 @@ test('check multithreading flag works as expected', async function (t) { t.is( err.cause.message, 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', - 'third instance got lock error' + 'second instance got lock error' ) } await db1.close() @@ -58,7 +58,7 @@ test('check multithreading flag works as expected', async function (t) { t.is( err.cause.message, 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', - 'third instance got lock error' + 'fourth instance got lock error' ) } await db1.close() From 8d45bbaa1077b42b7278160b533603506cceb5a0 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Wed, 8 Nov 2023 14:49:46 +0300 Subject: [PATCH 18/20] fix: change test error strings --- test/multithreading-test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 9b0ea6c..872516a 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -33,7 +33,7 @@ test('check multithreading flag works as expected', async function (t) { try { await db2.open({ multithreading: true }) } catch (err) { - t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') + t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open') t.is( err.cause.message, 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', @@ -54,7 +54,7 @@ test('check multithreading flag works as expected', async function (t) { try { await db4.open({ location, multithreading: false }) } catch (err) { - t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'third instance failed to open') + t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'fourth instance failed to open') t.is( err.cause.message, 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', From 860daf1c8180e3699bdf21d306a1a01be470211f Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Wed, 8 Nov 2023 14:51:42 +0300 Subject: [PATCH 19/20] fix: change expected error in lock-test --- test/lock-test.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/lock-test.js b/test/lock-test.js index cbc0aeb..7991b91 100644 --- a/test/lock-test.js +++ b/test/lock-test.js @@ -18,7 +18,11 @@ test('lock held by same process', async function (t) { await db2.open() } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open') - t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error') + t.is( + err.cause.message, + 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', + 'second instance got lock error' + ) } return db1.close() From 70da60acf6e938ffdafd47afbb1ba45a91a99c93 Mon Sep 17 00:00:00 2001 From: Matthew Keil Date: Thu, 9 Nov 2023 17:00:24 +0300 Subject: [PATCH 20/20] refactor: update multithreading handling per PR --- binding.cc | 41 ++++++++++++++++++------------------- test/lock-test.js | 6 +----- test/multithreading-test.js | 12 ++--------- 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/binding.cc b/binding.cc index df23a3b..015b283 100644 --- a/binding.cc +++ b/binding.cc @@ -33,7 +33,6 @@ struct LevelDbHandle { leveldb::DB *db; size_t open_handle_count; - bool multithreading; }; static std::mutex handles_mutex; // only access this when protected by the handles_mutex! @@ -635,25 +634,27 @@ struct Database { leveldb::Status threadsafe_open(const leveldb::Options &options, bool multithreading, Database &db_instance) { + // Bypass lock and handles if multithreading is disabled + if (!multithreading) { + return leveldb::DB::Open(options, db_instance.location_, &db_instance.db_); + } + std::unique_lock lock(handles_mutex); auto it = db_handles.find(db_instance.location_); if (it == db_handles.end()) { - // Database not opened yet for this location - LevelDbHandle handle = {nullptr, 0, multithreading}; + // Database not opened yet for this location, unless it was with + // multithreading disabled, in which case we're expected to fail here. + LevelDbHandle handle = {nullptr, 0}; leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db); - if (!status.ok()) { - return status; - } - handle.open_handle_count++; - db_instance.db_ = handle.db; - db_handles[db_instance.location_] = handle; - return leveldb::Status::OK(); - } - if (!(it->second.multithreading && multithreading)) { - // Database already opened for this location that disallows multithreading - return leveldb::Status::InvalidArgument("Database already opened. Must set multithreading flag to true for all instances"); + if (status.ok()) { + handle.open_handle_count++; + db_instance.db_ = handle.db; + db_handles[db_instance.location_] = handle; + } + + return status; } ++(it->second.open_handle_count); @@ -664,20 +665,18 @@ leveldb::Status threadsafe_open(const leveldb::Options &options, leveldb::Status threadsafe_close(Database &db_instance) { std::unique_lock lock(handles_mutex); - db_instance.db_ = NULL; // ensure db_ pointer is nullified in Database instance auto it = db_handles.find(db_instance.location_); if (it == db_handles.end()) { - // this should never happen in theory but silently fail and return OK to - // prevent segfault if it does - return leveldb::Status::OK(); - } - - if (--(it->second.open_handle_count) == 0) { + // Was not opened with multithreading enabled + delete db_instance.db_; + } else if (--(it->second.open_handle_count) == 0) { delete it->second.db; db_handles.erase(it); } + // ensure db_ pointer is nullified in Database instance + db_instance.db_ = NULL; return leveldb::Status::OK(); } diff --git a/test/lock-test.js b/test/lock-test.js index 7991b91..cbc0aeb 100644 --- a/test/lock-test.js +++ b/test/lock-test.js @@ -18,11 +18,7 @@ test('lock held by same process', async function (t) { await db2.open() } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open') - t.is( - err.cause.message, - 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', - 'second instance got lock error' - ) + t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error') } return db1.close() diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 872516a..74e58d1 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -34,11 +34,7 @@ test('check multithreading flag works as expected', async function (t) { await db2.open({ multithreading: true }) } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open') - t.is( - err.cause.message, - 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', - 'second instance got lock error' - ) + t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error') } await db1.close() @@ -55,11 +51,7 @@ test('check multithreading flag works as expected', async function (t) { await db4.open({ location, multithreading: false }) } catch (err) { t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'fourth instance failed to open') - t.is( - err.cause.message, - 'Invalid argument: Database already opened. Must set multithreading flag to true for all instances', - 'fourth instance got lock error' - ) + t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error') } await db1.close() await db2.close()