Skip to content

Commit da532c8

Browse files
authored
Skip small buckets in checksum pre-calculations (#378)
* Rewrite checksum pre-calculation to ignore small buckets. * Update changeset. * Fix count query. * Confirm in tests that small buckets are skipped.
1 parent a98cecb commit da532c8

File tree

8 files changed

+127
-112
lines changed

8 files changed

+127
-112
lines changed

.changeset/seven-mangos-sleep.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
'@powersync/service-image': patch
55
---
66

7-
[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting.
7+
[MongoDB Storage] Only compact modified buckets. Add index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting, and skip small buckets.

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 78 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import { mongo, MONGO_OPERATION_TIMEOUT_MS } from '@powersync/lib-service-mongodb';
22
import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework';
3-
import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@powersync/service-core';
3+
import {
4+
addChecksums,
5+
InternalOpId,
6+
isPartialChecksum,
7+
PopulateChecksumCacheResults,
8+
storage,
9+
utils
10+
} from '@powersync/service-core';
411

512
import { PowerSyncMongo } from './db.js';
613
import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js';
@@ -10,6 +17,7 @@ import { cacheKey } from './OperationBatch.js';
1017
interface CurrentBucketState {
1118
/** Bucket name */
1219
bucket: string;
20+
1321
/**
1422
* Rows seen in the bucket, with the last op_id of each.
1523
*/
@@ -96,75 +104,56 @@ export class MongoCompactor {
96104
// We can make this more efficient later on by iterating
97105
// through the buckets in a single query.
98106
// That makes batching more tricky, so we leave for later.
99-
await this.compactInternal(bucket);
107+
await this.compactSingleBucket(bucket);
100108
}
101109
} else {
102110
await this.compactDirtyBuckets();
103111
}
104112
}
105113

106114
private async compactDirtyBuckets() {
107-
for await (let buckets of this.iterateDirtyBuckets()) {
115+
while (!this.signal?.aborted) {
116+
// Process all buckets with 1 or more changes since last time
117+
const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 });
118+
if (buckets.length == 0) {
119+
// All done
120+
break;
121+
}
108122
for (let bucket of buckets) {
109-
await this.compactInternal(bucket);
123+
await this.compactSingleBucket(bucket);
110124
}
111125
}
112126
}
113127

114-
private async compactInternal(bucket: string | undefined) {
128+
private async compactSingleBucket(bucket: string) {
115129
const idLimitBytes = this.idLimitBytes;
116130

117-
let currentState: CurrentBucketState | null = null;
118-
119-
let bucketLower: string | mongo.MinKey;
120-
let bucketUpper: string | mongo.MaxKey;
131+
let currentState: CurrentBucketState = {
132+
bucket,
133+
seen: new Map(),
134+
trackingSize: 0,
135+
lastNotPut: null,
136+
opsSincePut: 0,
121137

122-
if (bucket == null) {
123-
bucketLower = new mongo.MinKey();
124-
bucketUpper = new mongo.MaxKey();
125-
} else if (bucket.includes('[')) {
126-
// Exact bucket name
127-
bucketLower = bucket;
128-
bucketUpper = bucket;
129-
} else {
130-
// Bucket definition name
131-
bucketLower = `${bucket}[`;
132-
bucketUpper = `${bucket}[\uFFFF`;
133-
}
138+
checksum: 0,
139+
opCount: 0,
140+
opBytes: 0
141+
};
134142

135143
// Constant lower bound
136144
const lowerBound: BucketDataKey = {
137145
g: this.group_id,
138-
b: bucketLower as string,
146+
b: bucket,
139147
o: new mongo.MinKey() as any
140148
};
141149

142150
// Upper bound is adjusted for each batch
143151
let upperBound: BucketDataKey = {
144152
g: this.group_id,
145-
b: bucketUpper as string,
153+
b: bucket,
146154
o: new mongo.MaxKey() as any
147155
};
148156

149-
const doneWithBucket = async () => {
150-
if (currentState == null) {
151-
return;
152-
}
153-
// Free memory before clearing bucket
154-
currentState.seen.clear();
155-
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
156-
logger.info(
157-
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
158-
);
159-
// Need flush() before clear()
160-
await this.flush();
161-
await this.clearBucket(currentState);
162-
}
163-
164-
// Do this _after_ clearBucket so that we have accurate counts.
165-
this.updateBucketChecksums(currentState);
166-
};
167-
168157
while (!this.signal?.aborted) {
169158
// Query one batch at a time, to avoid cursor timeouts
170159
const cursor = this.db.bucket_data.aggregate<BucketDataDocument & { size: number | bigint }>(
@@ -211,22 +200,6 @@ export class MongoCompactor {
211200
upperBound = batch[batch.length - 1]._id;
212201

213202
for (let doc of batch) {
214-
if (currentState == null || doc._id.b != currentState.bucket) {
215-
await doneWithBucket();
216-
217-
currentState = {
218-
bucket: doc._id.b,
219-
seen: new Map(),
220-
trackingSize: 0,
221-
lastNotPut: null,
222-
opsSincePut: 0,
223-
224-
checksum: 0,
225-
opCount: 0,
226-
opBytes: 0
227-
};
228-
}
229-
230203
if (doc._id.o > this.maxOpId) {
231204
continue;
232205
}
@@ -297,12 +270,22 @@ export class MongoCompactor {
297270
}
298271
}
299272

300-
if (currentState != null) {
301-
logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`);
302-
}
273+
logger.info(`Processed batch of length ${batch.length} current bucket: ${bucket}`);
303274
}
304275

305-
await doneWithBucket();
276+
// Free memory before clearing bucket
277+
currentState.seen.clear();
278+
if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
279+
logger.info(
280+
`Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
281+
);
282+
// Need flush() before clear()
283+
await this.flush();
284+
await this.clearBucket(currentState);
285+
}
286+
287+
// Do this _after_ clearBucket so that we have accurate counts.
288+
this.updateBucketChecksums(currentState);
306289

307290
// Need another flush after updateBucketChecksums()
308291
await this.flush();
@@ -490,61 +473,55 @@ export class MongoCompactor {
490473
/**
491474
* Subset of compact, only populating checksums where relevant.
492475
*/
493-
async populateChecksums() {
494-
for await (let buckets of this.iterateDirtyBuckets()) {
476+
async populateChecksums(options: { minBucketChanges: number }): Promise<PopulateChecksumCacheResults> {
477+
let count = 0;
478+
while (!this.signal?.aborted) {
479+
const buckets = await this.dirtyBucketBatch(options);
480+
if (buckets.length == 0) {
481+
// All done
482+
break;
483+
}
495484
const start = Date.now();
496485
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);
497486

498487
await this.updateChecksumsBatch(buckets);
499488
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
489+
count += buckets.length;
500490
}
491+
return { buckets: count };
501492
}
502493

503-
private async *iterateDirtyBuckets(): AsyncGenerator<string[]> {
504-
// This is updated after each batch
505-
let lowerBound: BucketStateDocument['_id'] = {
506-
g: this.group_id,
507-
b: new mongo.MinKey() as any
508-
};
509-
// This is static
510-
const upperBound: BucketStateDocument['_id'] = {
511-
g: this.group_id,
512-
b: new mongo.MaxKey() as any
513-
};
514-
while (!this.signal?.aborted) {
515-
// By filtering buckets, we effectively make this "resumeable".
516-
const filter: mongo.Filter<BucketStateDocument> = {
517-
_id: {
518-
$gt: lowerBound,
519-
$lt: upperBound
494+
/**
495+
* Returns a batch of dirty buckets - buckets with most changes first.
496+
*
497+
* This cannot be used to iterate on its own - the client is expected to process these buckets and
498+
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
499+
*/
500+
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
501+
if (options.minBucketChanges <= 0) {
502+
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
503+
}
504+
// We make use of an index on {_id.g: 1, 'estimate_since_compact.count': -1}
505+
const dirtyBuckets = await this.db.bucket_state
506+
.find(
507+
{
508+
'_id.g': this.group_id,
509+
'estimate_since_compact.count': { $gte: options.minBucketChanges }
520510
},
521-
// Partial index exists on this
522-
'estimate_since_compact.count': { $gt: 0 }
523-
};
524-
525-
const dirtyBuckets = await this.db.bucket_state
526-
.find(filter, {
511+
{
527512
projection: {
528513
_id: 1
529514
},
530515
sort: {
531-
_id: 1
516+
'estimate_since_compact.count': -1
532517
},
533518
limit: 5_000,
534-
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
535-
// Make sure we use the partial index
536-
hint: 'dirty_buckets'
537-
})
538-
.toArray();
539-
540-
if (dirtyBuckets.length == 0) {
541-
break;
542-
}
543-
544-
yield dirtyBuckets.map((bucket) => bucket._id.b);
519+
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
520+
}
521+
)
522+
.toArray();
545523

546-
lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id;
547-
}
524+
return dirtyBuckets.map((bucket) => bucket._id.b);
548525
}
549526

550527
private async updateChecksumsBatch(buckets: string[]) {

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import {
1616
InternalOpId,
1717
internalToExternalOpId,
1818
maxLsn,
19+
PopulateChecksumCacheOptions,
20+
PopulateChecksumCacheResults,
1921
ProtocolOpId,
2022
ReplicationCheckpoint,
2123
storage,
@@ -665,7 +667,7 @@ export class MongoSyncBucketStorage
665667
}
666668
}
667669

668-
async populatePersistentChecksumCache(options: Required<Pick<CompactOptions, 'signal' | 'maxOpId'>>): Promise<void> {
670+
async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise<PopulateChecksumCacheResults> {
669671
logger.info(`Populating persistent checksum cache...`);
670672
const start = Date.now();
671673
// We do a minimal compact here.
@@ -676,9 +678,14 @@ export class MongoSyncBucketStorage
676678
memoryLimitMB: 0
677679
});
678680

679-
await compactor.populateChecksums();
681+
const result = await compactor.populateChecksums({
682+
// There are cases with millions of small buckets, in which case it can take very long to
683+
// populate the checksums, with minimal benefit. We skip the small buckets here.
684+
minBucketChanges: options.minBucketChanges ?? 10
685+
});
680686
const duration = Date.now() - start;
681687
logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`);
688+
return result;
682689
}
683690

684691
/**

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,10 @@ export class PowerSyncMongo {
148148
// TODO: Implement a better mechanism to use migrations in tests
149149
await this.bucket_state.createIndex(
150150
{
151-
_id: 1,
152-
'estimate_since_compact.count': 1
151+
'_id.g': 1,
152+
'estimate_since_compact.count': -1
153153
},
154-
{ name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } }
154+
{ name: 'dirty_count' }
155155
);
156156
}
157157
}

modules/module-mongodb-storage/test/src/storage_compacting.test.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,25 @@ bucket_definitions:
9797
await populate(bucketStorage);
9898
const { checkpoint } = await bucketStorage.getCheckpoint();
9999

100-
await bucketStorage.populatePersistentChecksumCache({
100+
// Default is to small small numbers - should be a no-op
101+
const result0 = await bucketStorage.populatePersistentChecksumCache({
102+
maxOpId: checkpoint
103+
});
104+
expect(result0.buckets).toEqual(0);
105+
106+
// This should cache the checksums for the two buckets
107+
const result1 = await bucketStorage.populatePersistentChecksumCache({
108+
maxOpId: checkpoint,
109+
minBucketChanges: 1
110+
});
111+
expect(result1.buckets).toEqual(2);
112+
113+
// This should be a no-op, as the checksums are already cached
114+
const result2 = await bucketStorage.populatePersistentChecksumCache({
101115
maxOpId: checkpoint,
102-
signal: new AbortController().signal
116+
minBucketChanges: 1
103117
});
118+
expect(result2.buckets).toEqual(0);
104119

105120
const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']);
106121
expect(checksumAfter.get('by_user2["u1"]')).toEqual({

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import {
1111
LastValueSink,
1212
maxLsn,
1313
PartialChecksum,
14+
PopulateChecksumCacheOptions,
15+
PopulateChecksumCacheResults,
1416
ReplicationCheckpoint,
1517
storage,
1618
utils,
@@ -112,8 +114,9 @@ export class PostgresSyncRulesStorage
112114
return new PostgresCompactor(this.db, this.group_id, options).compact();
113115
}
114116

115-
async populatePersistentChecksumCache(options: Pick<CompactOptions, 'signal' | 'maxOpId'>): Promise<void> {
117+
async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise<PopulateChecksumCacheResults> {
116118
// no-op - checksum cache is not implemented for Postgres yet
119+
return { buckets: 0 };
117120
}
118121

119122
lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise<bigint | null> {

packages/service-core/src/entry/commands/compact-action.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024);
2525
export function registerCompactAction(program: Command) {
2626
const compactCommand = program
2727
.command(COMMAND_NAME)
28-
.option(`-b, --buckets [buckets]`, 'Bucket or bucket definition name (optional, comma-separate multiple names)');
28+
.option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)');
2929

3030
wrapConfigCommand(compactCommand);
3131

0 commit comments

Comments
 (0)