Skip to content

Commit

Permalink
WIP data item flushing contention fix
Browse files Browse the repository at this point in the history
  • Loading branch information
djwhitt committed Oct 4, 2024
1 parent 6c29438 commit a519a35
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"url": "https://github.com/ar-io/ar-io-node"
},
"dependencies": {
"@ar.io/sdk": "^2.0.0",
"@ar.io/sdk": "^2.2.5",
"@aws-lite/client": "^0.21.7",
"@aws-lite/s3": "^0.1.21",
"@clickhouse/client": "^1.3.0",
Expand Down
53 changes: 38 additions & 15 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,17 +941,25 @@ export class StandaloneSqliteDatabaseWorker {
const endHeight = block.height - MAX_FORK_DEPTH;

this.saveCoreStableDataFn(endHeight);
this.saveBundlesStableDataFn(endHeight);

this.deleteCoreStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS,
);
this.deleteBundlesStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS,
);

return { endHeight, maxStableBlockTimestamp };
}

return {};
}

flushStableDataItems(endHeight: number, maxStableBlockTimestamp: number) {
this.saveBundlesStableDataFn(endHeight);

this.deleteBundlesStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS,
);
}

getDataAttributes(id: string) {
Expand Down Expand Up @@ -2370,7 +2378,7 @@ const WORKER_POOL_NAMES: Array<WorkerPoolName> = [
'bundles',
];

type WorkerMethodName = keyof StandaloneSqliteDatabase;
type WorkerMethodName = keyof StandaloneSqliteDatabaseWorker;

type WorkerRoleName = 'read' | 'write';
const WORKER_ROLE_NAMES: Array<WorkerRoleName> = ['read', 'write'];
Expand Down Expand Up @@ -2742,16 +2750,22 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}

saveBlockAndTxs(
async saveBlockAndTxs(
block: PartialJsonBlock,
txs: PartialJsonTransaction[],
missingTxIds: string[],
): Promise<void> {
return this.queueWrite('core', 'saveBlockAndTxs', [
block,
txs,
missingTxIds,
]);
const { endHeight, maxStableBlockTimestamp } = await this.queueWrite(
'core',
'saveBlockAndTxs',
[block, txs, missingTxIds],
);
if (maxStableBlockTimestamp !== undefined) {
await this.queueWrite('bundles', 'flushStableDataItems', [
endHeight,
maxStableBlockTimestamp,
]);
}
}

async getDataAttributes(
Expand Down Expand Up @@ -3068,9 +3082,18 @@ if (!isMainThread) {
parentPort?.postMessage(null);
break;
case 'saveBlockAndTxs':
const [block, txs, missingTxIds] = args;
worker.saveBlockAndTxs(block, txs, missingTxIds);
parentPort?.postMessage(null);
{
const [block, txs, missingTxIds] = args;
const ret = worker.saveBlockAndTxs(block, txs, missingTxIds);
parentPort?.postMessage(ret);
}
break;
case 'flushStableDataItems':
{
const [endHeight, maxStableBlockTimestamp] = args;
worker.flushStableDataItems(endHeight, maxStableBlockTimestamp);
parentPort?.postMessage(null);
}
break;
case 'getDataAttributes':
const dataAttributes = worker.getDataAttributes(args[0]);
Expand Down

0 comments on commit a519a35

Please sign in to comment.