Skip to content

Commit 5328802

Browse files
authored
[MongoDB Storage] Only compact changed buckets / Indexed bucket_state (#375)
* Add partial index for bucket_state checksum calculations. * Only compact dirty buckets. * Include more logging when pre-calculating checksums. * Changeset. * Log current bucket when pre-populating checksums.
1 parent da7ecc6 commit 5328802

File tree

5 files changed

+89
-11
lines changed

5 files changed

+89
-11
lines changed

.changeset/seven-mangos-sleep.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { migrations } from '@powersync/service-core';
2+
import * as storage from '../../../storage/storage-index.js';
3+
import { MongoStorageConfig } from '../../../types/types.js';
4+
5+
const INDEX_NAME = 'dirty_buckets';
6+
7+
export const up: migrations.PowerSyncMigrationFunction = async (context) => {
8+
const {
9+
service_context: { configuration }
10+
} = context;
11+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
12+
13+
try {
14+
await db.createBucketStateIndex2();
15+
} finally {
16+
await db.client.close();
17+
}
18+
};
19+
20+
export const down: migrations.PowerSyncMigrationFunction = async (context) => {
21+
const {
22+
service_context: { configuration }
23+
} = context;
24+
25+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
26+
27+
try {
28+
if (await db.bucket_state.indexExists(INDEX_NAME)) {
29+
await db.bucket_state.dropIndex(INDEX_NAME);
30+
}
31+
} finally {
32+
await db.client.close();
33+
}
34+
};

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,19 @@ export class MongoCompactor {
9999
await this.compactInternal(bucket);
100100
}
101101
} else {
102-
await this.compactInternal(undefined);
102+
await this.compactDirtyBuckets();
103103
}
104104
}
105105

106-
async compactInternal(bucket: string | undefined) {
106+
private async compactDirtyBuckets() {
107+
for await (let buckets of this.iterateDirtyBuckets()) {
108+
for (let bucket of buckets) {
109+
await this.compactInternal(bucket);
110+
}
111+
}
112+
}
113+
114+
private async compactInternal(bucket: string | undefined) {
107115
const idLimitBytes = this.idLimitBytes;
108116

109117
let currentState: CurrentBucketState | null = null;
@@ -483,6 +491,16 @@ export class MongoCompactor {
483491
* Subset of compact, only populating checksums where relevant.
484492
*/
485493
async populateChecksums() {
494+
for await (let buckets of this.iterateDirtyBuckets()) {
495+
const start = Date.now();
496+
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);
497+
498+
await this.updateChecksumsBatch(buckets);
499+
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
500+
}
501+
}
502+
503+
private async *iterateDirtyBuckets(): AsyncGenerator<string[]> {
486504
// This is updated after each batch
487505
let lowerBound: BucketStateDocument['_id'] = {
488506
g: this.group_id,
@@ -500,10 +518,11 @@ export class MongoCompactor {
500518
$gt: lowerBound,
501519
$lt: upperBound
502520
},
503-
compacted_state: { $exists: false }
521+
// Partial index exists on this
522+
'estimate_since_compact.count': { $gt: 0 }
504523
};
505524

506-
const bucketsWithoutChecksums = await this.db.bucket_state
525+
const dirtyBuckets = await this.db.bucket_state
507526
.find(filter, {
508527
projection: {
509528
_id: 1
@@ -512,19 +531,19 @@ export class MongoCompactor {
512531
_id: 1
513532
},
514533
limit: 5_000,
515-
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
534+
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
535+
// Make sure we use the partial index
536+
hint: 'dirty_buckets'
516537
})
517538
.toArray();
518-
if (bucketsWithoutChecksums.length == 0) {
519-
// All done
539+
540+
if (dirtyBuckets.length == 0) {
520541
break;
521542
}
522543

523-
logger.info(`Calculating checksums for batch of ${bucketsWithoutChecksums.length} buckets`);
524-
525-
await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b));
544+
yield dirtyBuckets.map((bucket) => bucket._id.b);
526545

527-
lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
546+
lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id;
528547
}
529548
}
530549

@@ -559,6 +578,10 @@ export class MongoCompactor {
559578
count: bucketChecksum.count,
560579
checksum: BigInt(bucketChecksum.checksum),
561580
bytes: null
581+
},
582+
estimate_since_compact: {
583+
count: 0,
584+
bytes: 0
562585
}
563586
}
564587
},

modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag
2525
// Full migrations are not currently run for tests, so we manually create the important ones
2626
await db.createCheckpointEventsCollection();
2727
await db.createBucketStateIndex();
28+
await db.createBucketStateIndex2();
2829

2930
return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions);
3031
};

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@ export class PowerSyncMongo {
141141
{ name: 'bucket_updates', unique: true }
142142
);
143143
}
144+
/**
145+
* Only use in migrations and tests.
146+
*/
147+
async createBucketStateIndex2() {
148+
// TODO: Implement a better mechanism to use migrations in tests
149+
await this.bucket_state.createIndex(
150+
{
151+
_id: 1,
152+
'estimate_since_compact.count': 1
153+
},
154+
{ name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } }
155+
);
156+
}
144157
}
145158

146159
export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {

0 commit comments

Comments
 (0)