From a519a351841184bcd1cbe508e66ea482b832c186 Mon Sep 17 00:00:00 2001 From: David Whittington Date: Thu, 3 Oct 2024 15:54:11 -0500 Subject: [PATCH] WIP data item flushing contention fix --- package.json | 2 +- src/database/standalone-sqlite.ts | 53 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/package.json b/package.json index 3259c07d..7cd17cf1 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "url": "https://github.com/ar-io/ar-io-node" }, "dependencies": { - "@ar.io/sdk": "^2.0.0", + "@ar.io/sdk": "^2.2.5", "@aws-lite/client": "^0.21.7", "@aws-lite/s3": "^0.1.21", "@clickhouse/client": "^1.3.0", diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index e72c67e6..33fc5b96 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -941,17 +941,25 @@ export class StandaloneSqliteDatabaseWorker { const endHeight = block.height - MAX_FORK_DEPTH; this.saveCoreStableDataFn(endHeight); - this.saveBundlesStableDataFn(endHeight); this.deleteCoreStaleNewDataFn( endHeight, maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS, ); - this.deleteBundlesStaleNewDataFn( - endHeight, - maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS, - ); + + return { endHeight, maxStableBlockTimestamp }; } + + return {}; + } + + flushStableDataItems(endHeight: number, maxStableBlockTimestamp: number) { + this.saveBundlesStableDataFn(endHeight); + + this.deleteBundlesStaleNewDataFn( + endHeight, + maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS, + ); } getDataAttributes(id: string) { @@ -2370,7 +2378,7 @@ const WORKER_POOL_NAMES: Array = [ 'bundles', ]; -type WorkerMethodName = keyof StandaloneSqliteDatabase; +type WorkerMethodName = keyof StandaloneSqliteDatabaseWorker; type WorkerRoleName = 'read' | 'write'; const WORKER_ROLE_NAMES: Array = ['read', 'write']; @@ -2742,16 +2750,22 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveBundle', [bundle]); } - saveBlockAndTxs( + async saveBlockAndTxs( block: PartialJsonBlock, txs: PartialJsonTransaction[], missingTxIds: string[], ): Promise { - return this.queueWrite('core', 'saveBlockAndTxs', [ - block, - txs, - missingTxIds, - ]); + const { endHeight, maxStableBlockTimestamp } = await this.queueWrite( + 'core', + 'saveBlockAndTxs', + [block, txs, missingTxIds], + ); + if (maxStableBlockTimestamp !== undefined) { + await this.queueWrite('bundles', 'flushStableDataItems', [ + endHeight, + maxStableBlockTimestamp, + ]); + } } async getDataAttributes( @@ -3068,9 +3082,18 @@ if (!isMainThread) { parentPort?.postMessage(null); break; case 'saveBlockAndTxs': - const [block, txs, missingTxIds] = args; - worker.saveBlockAndTxs(block, txs, missingTxIds); - parentPort?.postMessage(null); + { + const [block, txs, missingTxIds] = args; + const ret = worker.saveBlockAndTxs(block, txs, missingTxIds); + parentPort?.postMessage(ret); + } + break; + case 'flushStableDataItems': + { + const [endHeight, maxStableBlockTimestamp] = args; + worker.flushStableDataItems(endHeight, maxStableBlockTimestamp); + parentPort?.postMessage(null); + } break; case 'getDataAttributes': const dataAttributes = worker.getDataAttributes(args[0]);