From 97ec1d54a411f4ead3aa92e30c17cddfb9986375 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 14:59:39 +0200 Subject: [PATCH 01/11] Optimize checksums calculations after initial replication. --- .../storage/implementation/MongoCompactor.ts | 96 +++++++- .../implementation/MongoSyncBucketStorage.ts | 213 +++++++++++++----- .../src/storage/implementation/util.ts | 43 +++- packages/service-core/src/util/utils.ts | 28 ++- 4 files changed, 311 insertions(+), 69 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index f6bf44f2c..4262efb3b 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,11 +1,12 @@ -import { mongo } from '@powersync/lib-service-mongodb'; +import { mongo, MONGO_OPERATION_TIMEOUT_MS } from '@powersync/lib-service-mongodb'; import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework'; -import { addChecksums, InternalOpId, storage, utils } from '@powersync/service-core'; +import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; import { cacheKey } from './OperationBatch.js'; import { readSingleBatch } from './util.js'; +import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js'; interface CurrentBucketState { /** Bucket name */ @@ -68,12 +69,14 @@ export class MongoCompactor { private maxOpId: bigint; private buckets: string[] | undefined; private signal?: AbortSignal; + private group_id: number; constructor( + private storage: MongoSyncBucketStorage, private db: PowerSyncMongo, - private group_id: number, options?: MongoCompactOptions ) { + this.group_id = storage.group_id; this.idLimitBytes = (options?.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024; this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT; this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT; @@ -475,4 +478,91 @@ export class MongoCompactor { await session.endSession(); } } + + /** + * Subset of compact, only populating checksums where relevant. + */ + async populateChecksums() { + let lastId: BucketStateDocument['_id'] | null = null; + while (!this.signal?.aborted) { + // By filtering buckets, we effectively make this "resumeable". + let filter: mongo.Filter = { + compacted_state: { $exists: false } + }; + if (lastId) { + filter._id = { $gt: lastId }; + } + + const bucketsWithoutChecksums = await this.db.bucket_state + .find(filter, { + projection: { + _id: 1 + }, + sort: { + _id: 1 + }, + limit: 5_000, + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS + }) + .toArray(); + if (bucketsWithoutChecksums.length == 0) { + // All done + break; + } + + logger.info(`Calculating checksums for batch of ${bucketsWithoutChecksums.length} buckets`); + + await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b)); + + lastId = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id; + } + } + + private async updateChecksumsBatch(buckets: string[]) { + const checksums = await this.storage.queryPartialChecksums( + buckets.map((bucket) => { + return { + bucket, + end: this.maxOpId + }; + }) + ); + + for (let bucketChecksum of checksums.values()) { + if (isPartialChecksum(bucketChecksum)) { + // Should never happen since we don't specify `start` + throw new ServiceAssertionError(`Full checksum expected for bucket ${bucketChecksum.bucket}`); + } + + this.bucketStateUpdates.push({ + updateOne: { + filter: { + _id: { + g: this.group_id, + b: bucketChecksum.bucket + } + }, + update: { + $set: { + compacted_state: { + op_id: this.maxOpId, + count: bucketChecksum.count, + checksum: BigInt(bucketChecksum.checksum), + bytes: 0 // We don't calculate that here + } + }, + $setOnInsert: { + // Only set this if we're creating the document. + // In all other cases, the replication process will have a set a more accurate id. + last_op: this.maxOpId + } + }, + // We generally expect this to have been created before, but do handle cases of old unchanged buckets + upsert: true + } + }); + } + + await this.flush(); + } } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 781fc3cc8..09f7b0c3d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -2,11 +2,14 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { mongo } from '@powersync/lib-service-mongodb'; import { BaseObserver, + DatabaseQueryError, + ErrorCode, logger, ReplicationAbortedError, ServiceAssertionError } from '@powersync/lib-services-framework'; import { + addBucketChecksums, addPartialChecksums, BroadcastIterable, BucketChecksum, @@ -19,6 +22,7 @@ import { internalToExternalOpId, maxLsn, PartialChecksum, + PartialOrFullChecksum, ProtocolOpId, ReplicationCheckpoint, storage, @@ -37,7 +41,14 @@ import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; -import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; +import { + CHECKSUM_QUERY_GROUP_STAGE, + checksumFromAggregate, + idPrefixFilter, + mapOpEntry, + readSingleBatch, + setSessionSnapshotTime +} from './util.js'; export class MongoSyncBucketStorage extends BaseObserver @@ -538,8 +549,7 @@ export class MongoSyncBucketStorage } } - const filters: any[] = []; - for (let request of batch) { + const mappedRequests = batch.map((request) => { let start = request.start; if (start == null) { const preState = preStates.get(request.bucket); @@ -547,13 +557,51 @@ export class MongoSyncBucketStorage start = preState.opId; } } + return { + ...request, + start + }; + }); + + const queriedChecksums = await this.queryPartialChecksums(mappedRequests); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available + const preState = preStates.get(bucket); + // Could be null if we got no data + const partialChecksum = queriedChecksums.get(bucket); + const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); + return [bucket, merged]; + }) + ); + } + + async queryPartialChecksums(batch: storage.FetchPartialBucketChecksum[]): Promise { + try { + return await this.queryPartialChecksumsInternal(batch); + } catch (e) { + if (e.codeName == 'MaxTimeMSExpired') { + // Timeout - try the slower but more robust version + return await this.queryPartialChecksumsFallback(batch); + } + throw lib_mongo.mapQueryError(e, 'while reading checksums'); + } + } + + private async queryPartialChecksumsInternal( + batch: storage.FetchPartialBucketChecksum[] + ): Promise { + const filters: any[] = []; + for (let request of batch) { filters.push({ _id: { $gt: { g: this.group_id, b: request.bucket, - o: start ?? new bson.MinKey() + o: request.start ?? new bson.MinKey() }, $lte: { g: this.group_id, @@ -564,6 +612,20 @@ export class MongoSyncBucketStorage }); } + const group = { + _id: '$_id.b', + // Historically, checksum may be stored as 'int' or 'double'. + // More recently, this should be a 'long'. + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. + checksum_total: { $sum: { $toLong: '$checksum' } }, + count: { $sum: 1 }, + has_clear_op: { + $max: { + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] + } + } + }; + const aggregate = await this.db.bucket_data .aggregate( [ @@ -573,63 +635,105 @@ export class MongoSyncBucketStorage } }, { - $group: { - _id: '$_id.b', - // Historically, checksum may be stored as 'int' or 'double'. - // More recently, this should be a 'long'. - // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. - checksum_total: { $sum: { $toLong: '$checksum' } }, - count: { $sum: 1 }, - has_clear_op: { - $max: { - $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] - } - } - } + $group: group } ], { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.db.MONGO_CHECKSUM_TIMEOUT_MS } ) .toArray() - .catch((e) => { - throw lib_mongo.mapQueryError(e, 'while reading checksums'); + .catch((err) => { + throw lib_mongo.mapQueryError(err, 'while reading checksums'); }); const partialChecksums = new Map( aggregate.map((doc) => { - const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; const bucket = doc._id; - return [ - bucket, - doc.has_clear_op == 1 - ? ({ - // full checksum - replaces any previous one - bucket, - checksum: partialChecksum, - count: doc.count - } satisfies BucketChecksum) - : ({ - // partial checksum - is added to a previous one - bucket, - partialCount: doc.count, - partialChecksum - } satisfies PartialChecksum) - ]; + return [bucket, checksumFromAggregate(doc)]; }) ); - return new Map( - batch.map((request) => { - const bucket = request.bucket; - // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available - const preState = preStates.get(bucket); - // Could be null if we got no data - const partialChecksum = partialChecksums.get(bucket); - const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); + return partialChecksums; + } - return [bucket, merged]; - }) - ); + /** + * Checksums for large buckets can run over the query timeout. + * To avoid this, we query in batches. + * This version can handle larger amounts of data, but is slower, especially for many buckets. + */ + async queryPartialChecksumsFallback( + batch: storage.FetchPartialBucketChecksum[] + ): Promise { + const partialChecksums = new Map(); + for (let request of batch) { + const checksum = await this.slowChecksum(request); + partialChecksums.set(request.bucket, checksum); + } + + return partialChecksums; + } + + private async slowChecksum(request: storage.FetchPartialBucketChecksum): Promise { + const batchLimit = 100_000; + + let lowerBound = 0n; + const bucket = request.bucket; + + let runningChecksum: PartialOrFullChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + if (request.start == null) { + runningChecksum = { + bucket, + count: 0, + checksum: 0 + }; + } + + while (true) { + const filter = { + _id: { + $gt: { + g: this.group_id, + b: bucket, + o: lowerBound + }, + $lte: { + g: this.group_id, + b: bucket, + o: request.end + } + } + }; + const docs = await this.db.bucket_data + .aggregate( + [ + { + $match: filter + }, + // sort and limit _before_ grouping + { $sort: { _id: 1 } }, + { $limit: batchLimit }, + CHECKSUM_QUERY_GROUP_STAGE + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + .toArray(); + const doc = docs[0]; + if (doc == null) { + return runningChecksum; + } + const partial = checksumFromAggregate(doc); + runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); + const isFinal = doc.count == batchLimit; + if (isFinal) { + break; + } else { + lowerBound = doc.last_op; + } + } + return runningChecksum; } async terminate(options?: storage.TerminateOptions) { @@ -779,22 +883,25 @@ export class MongoSyncBucketStorage const checkpoint = await this.getCheckpointInternal(); maxOpId = checkpoint?.checkpoint ?? undefined; } - await new MongoCompactor(this.db, this.group_id, { ...options, maxOpId }).compact(); + await new MongoCompactor(this, this.db, { ...options, maxOpId }).compact(); + if (maxOpId != null && options?.compactParameterData) { await new MongoParameterCompactor(this.db, this.group_id, maxOpId, options).compact(); } } - async populatePersistentChecksumCache(options: Pick): Promise { + async populatePersistentChecksumCache(options: Required>): Promise { + logger.info(`Populating persistent checksum cache...`); const start = Date.now(); - // We do a minimal compact, primarily to populate the checksum cache - await this.compact({ + // We do a minimal compact here. + // We can optimize this in the future. + const compactor = new MongoCompactor(this, this.db, { ...options, - // Skip parameter data - compactParameterData: false, // Don't track updates for MOVE compacting memoryLimitMB: 0 }); + + await compactor.populateChecksums(); const duration = Date.now() - start; logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index 9cf5eabd6..9f80fb146 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -3,7 +3,7 @@ import * as crypto from 'crypto'; import * as uuid from 'uuid'; import { mongo } from '@powersync/lib-service-mongodb'; -import { storage, utils } from '@powersync/service-core'; +import { BucketChecksum, PartialChecksum, PartialOrFullChecksum, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument } from './models.js'; @@ -130,3 +130,44 @@ export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson. throw new ServiceAssertionError(`Session snapshotTime is already set`); } } + +export const CHECKSUM_QUERY_GROUP_STAGE = { + $group: { + _id: '$_id.b', + // Historically, checksum may be stored as 'int' or 'double'. + // More recently, this should be a 'long'. + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. + checksum_total: { $sum: { $toLong: '$checksum' } }, + count: { $sum: 1 }, + has_clear_op: { + $max: { + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] + } + }, + last_op: { $max: '$_id.o' } + } +}; + +/** + * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. + */ +export function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { + const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; + const bucket = doc._id; + + if (doc.has_clear_op == 1) { + return { + // full checksum - replaces any previous one + bucket, + checksum: partialChecksum, + count: doc.count + } satisfies BucketChecksum; + } else { + return { + // partial checksum - is added to a previous one + bucket, + partialCount: doc.count, + partialChecksum + } satisfies PartialChecksum; + } +} diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 85b9e2c75..411f9b49d 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -113,26 +113,30 @@ export function addBucketChecksums(a: BucketChecksum, b: PartialChecksum | Bucke export function addPartialChecksums( bucket: string, - a: BucketChecksum | null, + a: PartialChecksum | BucketChecksum | null, b: PartialChecksum | BucketChecksum | null ): PartialChecksum | BucketChecksum { if (a != null && b != null) { if (!isPartialChecksum(b)) { - // Replaces preState + // Replaces a return b; } // merge - return { - bucket, - checksum: addChecksums(a.checksum, b.partialChecksum), - count: a.count + b.partialCount - }; + if (!isPartialChecksum(a)) { + return { + bucket, + checksum: addChecksums(a.checksum, b.partialChecksum), + count: a.count + b.partialCount + }; + } else { + return { + bucket, + partialChecksum: addChecksums(a.partialChecksum, b.partialChecksum), + partialCount: a.partialCount + b.partialCount + }; + } } else if (a != null) { - return { - bucket, - checksum: a.checksum, - count: a.count - }; + return a; } else if (b != null) { return b; } else { From 32307ec1cb2da416d74883edea7562bb83716eec Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:19:22 +0200 Subject: [PATCH 02/11] Fix partial checksum issue. --- .../storage/implementation/MongoCompactor.ts | 4 +-- .../implementation/MongoSyncBucketStorage.ts | 25 ++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 4262efb3b..e5b0f8a74 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -4,9 +4,9 @@ import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; +import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js'; import { cacheKey } from './OperationBatch.js'; import { readSingleBatch } from './util.js'; -import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js'; interface CurrentBucketState { /** Bucket name */ @@ -531,7 +531,7 @@ export class MongoCompactor { for (let bucketChecksum of checksums.values()) { if (isPartialChecksum(bucketChecksum)) { // Should never happen since we don't specify `start` - throw new ServiceAssertionError(`Full checksum expected for bucket ${bucketChecksum.bucket}`); + throw new ServiceAssertionError(`Full checksum expected, got ${JSON.stringify(bucketChecksum)}`); } this.bucketStateUpdates.push({ diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 09f7b0c3d..22fb8015a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -20,6 +20,7 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, + isPartialChecksum, maxLsn, PartialChecksum, PartialOrFullChecksum, @@ -652,7 +653,29 @@ export class MongoSyncBucketStorage }) ); - return partialChecksums; + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if we got no data + let partialChecksum = partialChecksums.get(bucket); + if (partialChecksum == null) { + partialChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + } + if (request.start == null && isPartialChecksum(partialChecksum)) { + partialChecksum = { + bucket, + count: partialChecksum.partialChecksum, + checksum: partialChecksum.partialChecksum + }; + } + + return [bucket, partialChecksum]; + }) + ); } /** From 0b13b250b6636050ea9f7bfc196bc49a2e329b7f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:22:06 +0200 Subject: [PATCH 03/11] Reduce checksum query time limits again. --- libs/lib-mongodb/src/db/mongo.ts | 10 ++-------- .../storage/implementation/MongoSyncBucketStorage.ts | 1 + 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/libs/lib-mongodb/src/db/mongo.ts b/libs/lib-mongodb/src/db/mongo.ts index 6bd1f1a79..0f8f9dffd 100644 --- a/libs/lib-mongodb/src/db/mongo.ts +++ b/libs/lib-mongodb/src/db/mongo.ts @@ -9,11 +9,8 @@ export const MONGO_CONNECT_TIMEOUT_MS = 10_000; /** * Time for individual requests to timeout the socket. - * - * Currently increased to cater for slow checksum calculations - may be reduced to 60s again - * if we optimize those. */ -export const MONGO_SOCKET_TIMEOUT_MS = 90_000; +export const MONGO_SOCKET_TIMEOUT_MS = 60_000; /** * Time for individual requests to timeout the operation. @@ -30,11 +27,8 @@ export const MONGO_OPERATION_TIMEOUT_MS = 40_000; * This is time spent on the cursor, not total time. * * Must be less than MONGO_SOCKET_TIMEOUT_MS to ensure proper error handling. - * - * This is temporarily increased to cater for slow checksum calculations, - * may be reduced to MONGO_OPERATION_TIMEOUT_MS again if we optimize those. */ -export const MONGO_CHECKSUM_TIMEOUT_MS = 80_000; +export const MONGO_CHECKSUM_TIMEOUT_MS = 50_000; /** * Same as above, but specifically for clear operations. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 22fb8015a..f2469b7c9 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -585,6 +585,7 @@ export class MongoSyncBucketStorage return await this.queryPartialChecksumsInternal(batch); } catch (e) { if (e.codeName == 'MaxTimeMSExpired') { + logger.warn(`Checksum query timed out; falling back to slower version`, e); // Timeout - try the slower but more robust version return await this.queryPartialChecksumsFallback(batch); } From c6fb6f06e75aedec741759835721106129694a7a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:30:30 +0200 Subject: [PATCH 04/11] Fixes. --- .../implementation/MongoSyncBucketStorage.ts | 30 ++++--------------- 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index f2469b7c9..f8b913671 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -614,20 +614,6 @@ export class MongoSyncBucketStorage }); } - const group = { - _id: '$_id.b', - // Historically, checksum may be stored as 'int' or 'double'. - // More recently, this should be a 'long'. - // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. - checksum_total: { $sum: { $toLong: '$checksum' } }, - count: { $sum: 1 }, - has_clear_op: { - $max: { - $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] - } - } - }; - const aggregate = await this.db.bucket_data .aggregate( [ @@ -636,16 +622,12 @@ export class MongoSyncBucketStorage $or: filters } }, - { - $group: group - } + CHECKSUM_QUERY_GROUP_STAGE ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.db.MONGO_CHECKSUM_TIMEOUT_MS } + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } ) - .toArray() - .catch((err) => { - throw lib_mongo.mapQueryError(err, 'while reading checksums'); - }); + // Don't map the error here - we want to keep timeout errors as-is + .toArray(); const partialChecksums = new Map( aggregate.map((doc) => { @@ -669,7 +651,7 @@ export class MongoSyncBucketStorage if (request.start == null && isPartialChecksum(partialChecksum)) { partialChecksum = { bucket, - count: partialChecksum.partialChecksum, + count: partialChecksum.partialCount, checksum: partialChecksum.partialChecksum }; } @@ -750,7 +732,7 @@ export class MongoSyncBucketStorage } const partial = checksumFromAggregate(doc); runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); - const isFinal = doc.count == batchLimit; + const isFinal = doc.count != batchLimit; if (isFinal) { break; } else { From 7d16815d2e10b0a847f35f8749c104e259451cca Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:31:35 +0200 Subject: [PATCH 05/11] Further reduce batch limit. --- .../src/storage/implementation/MongoSyncBucketStorage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index f8b913671..6bf01c681 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -679,7 +679,7 @@ export class MongoSyncBucketStorage } private async slowChecksum(request: storage.FetchPartialBucketChecksum): Promise { - const batchLimit = 100_000; + const batchLimit = 50_000; let lowerBound = 0n; const bucket = request.bucket; From 92ce5b3d7e9ffd9d6da6a970c3ab448cd9b135ad Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:36:24 +0200 Subject: [PATCH 06/11] Fix "Replication error Unable to do postgres query on ended connection". --- modules/module-postgres/src/replication/WalStream.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 3813caa0a..b5e26db4f 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -840,9 +840,14 @@ WHERE oid = $1::regclass`, try { // If anything errors here, the entire replication process is halted, and // all connections automatically closed, including this one. - const replicationConnection = await this.connections.replicationConnection(); - await this.initReplication(replicationConnection); - await this.streamChanges(replicationConnection); + const initReplicationConnection = await this.connections.replicationConnection(); + await this.initReplication(initReplicationConnection); + await initReplicationConnection.end(); + + // At this point, the above connection has often timed out, so we start a new one + const streamReplicationConnection = await this.connections.replicationConnection(); + await this.streamChanges(streamReplicationConnection); + await streamReplicationConnection.end(); } catch (e) { await this.storage.reportError(e); throw e; From d74d874c157662b98f0c7b5fd257cd33ce29452d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 28 Aug 2025 15:41:59 +0200 Subject: [PATCH 07/11] Changeset. --- .changeset/chilly-melons-check.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/chilly-melons-check.md diff --git a/.changeset/chilly-melons-check.md b/.changeset/chilly-melons-check.md new file mode 100644 index 000000000..1f77e4d1e --- /dev/null +++ b/.changeset/chilly-melons-check.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-core': patch +'@powersync/lib-service-mongodb': patch +'@powersync/service-image': patch +--- + +Fix pre-computing of checksums after intial replication causing replication timeouts From 8e4fddc93c699855961d6f45b4a7113333cb234d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 29 Aug 2025 11:21:54 +0200 Subject: [PATCH 08/11] Fix compact performance issues. --- .../storage/implementation/MongoCompactor.ts | 109 +++++++++--------- .../src/storage/implementation/models.ts | 2 +- 2 files changed, 56 insertions(+), 55 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index e5b0f8a74..4f85e3c6b 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -139,33 +139,57 @@ export class MongoCompactor { o: new mongo.MaxKey() as any }; + const doneWithBucket = async () => { + // Free memory before clearing bucket + if (currentState == null) { + return; + } + currentState.seen.clear(); + if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { + logger.info( + `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` + ); + // Need flush() before clear() + await this.flush(); + await this.clearBucket(currentState); + } + + // Do this _after_ clearBucket so that we have accurate counts. + this.updateBucketChecksums(currentState); + }; + while (!this.signal?.aborted) { // Query one batch at a time, to avoid cursor timeouts - const cursor = this.db.bucket_data.aggregate([ - { - $match: { - _id: { - $gte: lowerBound, - $lt: upperBound + const cursor = this.db.bucket_data.aggregate( + [ + { + $match: { + _id: { + $gte: lowerBound, + $lt: upperBound + } + } + }, + { $sort: { _id: -1 } }, + { $limit: this.moveBatchQueryLimit }, + { + $project: { + _id: 1, + op: 1, + table: 1, + row_id: 1, + source_table: 1, + source_key: 1, + checksum: 1, + size: { $bsonSize: '$$ROOT' } } } - }, - { $sort: { _id: -1 } }, - { $limit: this.moveBatchQueryLimit }, - { - $project: { - _id: 1, - op: 1, - table: 1, - row_id: 1, - source_table: 1, - source_key: 1, - checksum: 1, - size: { $bsonSize: '$$ROOT' } - } - } - ]); - const { data: batch } = await readSingleBatch(cursor); + ], + { batchSize: this.moveBatchQueryLimit } + ); + // We don't limit to a single batch here, since that often causes MongoDB to scan through more than it returns. + // Instead, we load up to the limit. + const batch = await cursor.toArray(); if (batch.length == 0) { // We've reached the end @@ -177,24 +201,8 @@ export class MongoCompactor { for (let doc of batch) { if (currentState == null || doc._id.b != currentState.bucket) { - if (currentState != null) { - if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - // Important to flush before clearBucket() - // Does not have to happen before flushBucketChecksums() - await this.flush(); - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - - // Free memory before clearing bucket - currentState!.seen.clear(); - - await this.clearBucket(currentState); - } + doneWithBucket(); - // Should happen after clearBucket() for accurate stats - this.updateBucketChecksums(currentState); - } currentState = { bucket: doc._id.b, seen: new Map(), @@ -277,21 +285,14 @@ export class MongoCompactor { await this.flush(); } } - } - currentState?.seen.clear(); - if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - // Need flush() before clear() - await this.flush(); - await this.clearBucket(currentState); - } - if (currentState != null) { - // Do this _after_ clearBucket so that we have accurate counts. - this.updateBucketChecksums(currentState); + if (currentState != null) { + logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`); + } } + + doneWithBucket(); + // Need another flush after updateBucketChecksums() await this.flush(); } @@ -548,7 +549,7 @@ export class MongoCompactor { op_id: this.maxOpId, count: bucketChecksum.count, checksum: BigInt(bucketChecksum.checksum), - bytes: 0 // We don't calculate that here + bytes: null } }, $setOnInsert: { diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index b877f7f98..998deec2c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -106,7 +106,7 @@ export interface BucketStateDocument { op_id: InternalOpId; count: number; checksum: bigint; - bytes: number; + bytes: number | null; }; estimate_since_compact?: { From 0911bf840e258f4de50144a890f41d13b51f2777 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 29 Aug 2025 11:29:48 +0200 Subject: [PATCH 09/11] Refactor checksums queries into a separate class. --- .../storage/implementation/MongoChecksums.ts | 320 ++++++++++++++++++ .../storage/implementation/MongoCompactor.ts | 2 +- .../implementation/MongoSyncBucketStorage.ts | 261 +------------- .../src/storage/implementation/util.ts | 41 --- 4 files changed, 327 insertions(+), 297 deletions(-) create mode 100644 modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts new file mode 100644 index 000000000..ef7c41c03 --- /dev/null +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -0,0 +1,320 @@ +import { + addPartialChecksums, + bson, + BucketChecksum, + ChecksumCache, + ChecksumMap, + FetchPartialBucketChecksum, + InternalOpId, + isPartialChecksum, + PartialChecksum, + PartialChecksumMap, + PartialOrFullChecksum +} from '@powersync/service-core'; +import * as lib_mongo from '@powersync/lib-service-mongodb'; +import { logger } from '@powersync/lib-services-framework'; +import { PowerSyncMongo } from './db.js'; + +/** + * Checksum query implementation. + */ +export class MongoChecksums { + private cache = new ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); + + constructor( + private db: PowerSyncMongo, + private group_id: number + ) {} + + /** + * Calculate checksums, utilizing the cache. + */ + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { + return this.cache.getChecksumMap(checkpoint, buckets); + } + + clearCache() { + this.cache.clear(); + } + + /** + * Calculate (partial) checksums from bucket_state and the data collection. + * + * Results are not cached. + */ + private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + if (batch.length == 0) { + return new Map(); + } + + const preFilters: any[] = []; + for (let request of batch) { + if (request.start == null) { + preFilters.push({ + _id: { + g: this.group_id, + b: request.bucket + }, + 'compacted_state.op_id': { $exists: true, $lte: request.end } + }); + } + } + + const preStates = new Map(); + + if (preFilters.length > 0) { + // For un-cached bucket checksums, attempt to use the compacted state first. + const states = await this.db.bucket_state + .find({ + $or: preFilters + }) + .toArray(); + for (let state of states) { + const compactedState = state.compacted_state!; + preStates.set(state._id.b, { + opId: compactedState.op_id, + checksum: { + bucket: state._id.b, + checksum: Number(compactedState.checksum), + count: compactedState.count + } + }); + } + } + + const mappedRequests = batch.map((request) => { + let start = request.start; + if (start == null) { + const preState = preStates.get(request.bucket); + if (preState != null) { + start = preState.opId; + } + } + return { + ...request, + start + }; + }); + + const queriedChecksums = await this.queryPartialChecksums(mappedRequests); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available + const preState = preStates.get(bucket); + // Could be null if we got no data + const partialChecksum = queriedChecksums.get(bucket); + const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); + + return [bucket, merged]; + }) + ); + } + + /** + * Calculate (partial) checksums from the data collection directly. + */ + async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise { + try { + return await this.queryPartialChecksumsInternal(batch); + } catch (e) { + if (e.codeName == 'MaxTimeMSExpired') { + logger.warn(`Checksum query timed out; falling back to slower version`, e); + // Timeout - try the slower but more robust version + return await this.queryPartialChecksumsFallback(batch); + } + throw lib_mongo.mapQueryError(e, 'while reading checksums'); + } + } + + private async queryPartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + const filters: any[] = []; + for (let request of batch) { + filters.push({ + _id: { + $gt: { + g: this.group_id, + b: request.bucket, + o: request.start ?? new bson.MinKey() + }, + $lte: { + g: this.group_id, + b: request.bucket, + o: request.end + } + } + }); + } + + const aggregate = await this.db.bucket_data + .aggregate( + [ + { + $match: { + $or: filters + } + }, + CHECKSUM_QUERY_GROUP_STAGE + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + // Don't map the error here - we want to keep timeout errors as-is + .toArray(); + + const partialChecksums = new Map( + aggregate.map((doc) => { + const bucket = doc._id; + return [bucket, checksumFromAggregate(doc)]; + }) + ); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if we got no data + let partialChecksum = partialChecksums.get(bucket); + if (partialChecksum == null) { + partialChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + } + if (request.start == null && isPartialChecksum(partialChecksum)) { + partialChecksum = { + bucket, + count: partialChecksum.partialCount, + checksum: partialChecksum.partialChecksum + }; + } + + return [bucket, partialChecksum]; + }) + ); + } + + /** + * Checksums for large buckets can run over the query timeout. + * To avoid this, we query in batches. + * This version can handle larger amounts of data, but is slower, especially for many buckets. + */ + async queryPartialChecksumsFallback(batch: FetchPartialBucketChecksum[]): Promise { + const partialChecksums = new Map(); + for (let request of batch) { + const checksum = await this.slowChecksum(request); + partialChecksums.set(request.bucket, checksum); + } + + return partialChecksums; + } + + private async slowChecksum(request: FetchPartialBucketChecksum): Promise { + const batchLimit = 50_000; + + let lowerBound = 0n; + const bucket = request.bucket; + + let runningChecksum: PartialOrFullChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + if (request.start == null) { + runningChecksum = { + bucket, + count: 0, + checksum: 0 + }; + } + + while (true) { + const filter = { + _id: { + $gt: { + g: this.group_id, + b: bucket, + o: lowerBound + }, + $lte: { + g: this.group_id, + b: bucket, + o: request.end + } + } + }; + const docs = await this.db.bucket_data + .aggregate( + [ + { + $match: filter + }, + // sort and limit _before_ grouping + { $sort: { _id: 1 } }, + { $limit: batchLimit }, + CHECKSUM_QUERY_GROUP_STAGE + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + .toArray(); + const doc = docs[0]; + if (doc == null) { + return runningChecksum; + } + const partial = checksumFromAggregate(doc); + runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); + const isFinal = doc.count != batchLimit; + if (isFinal) { + break; + } else { + lowerBound = doc.last_op; + } + } + return runningChecksum; + } +} + +const CHECKSUM_QUERY_GROUP_STAGE = { + $group: { + _id: '$_id.b', + // Historically, checksum may be stored as 'int' or 'double'. + // More recently, this should be a 'long'. + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. + checksum_total: { $sum: { $toLong: '$checksum' } }, + count: { $sum: 1 }, + has_clear_op: { + $max: { + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] + } + }, + last_op: { $max: '$_id.o' } + } +}; + +/** + * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. + */ +function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { + const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; + const bucket = doc._id; + + if (doc.has_clear_op == 1) { + return { + // full checksum - replaces any previous one + bucket, + checksum: partialChecksum, + count: doc.count + } satisfies BucketChecksum; + } else { + return { + // partial checksum - is added to a previous one + bucket, + partialCount: doc.count, + partialChecksum + } satisfies PartialChecksum; + } +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 4f85e3c6b..3ca4dbc5c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -520,7 +520,7 @@ export class MongoCompactor { } private async updateChecksumsBatch(buckets: string[]) { - const checksums = await this.storage.queryPartialChecksums( + const checksums = await this.storage.checksums.queryPartialChecksums( buckets.map((bucket) => { return { bucket, diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 6bf01c681..e566620e9 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -2,17 +2,12 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { mongo } from '@powersync/lib-service-mongodb'; import { BaseObserver, - DatabaseQueryError, - ErrorCode, logger, ReplicationAbortedError, ServiceAssertionError } from '@powersync/lib-services-framework'; import { - addBucketChecksums, - addPartialChecksums, BroadcastIterable, - BucketChecksum, CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, CompactOptions, @@ -20,10 +15,7 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, - isPartialChecksum, maxLsn, - PartialChecksum, - PartialOrFullChecksum, ProtocolOpId, ReplicationCheckpoint, storage, @@ -39,28 +31,18 @@ import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; +import { MongoChecksums } from './MongoChecksums.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; -import { - CHECKSUM_QUERY_GROUP_STAGE, - checksumFromAggregate, - idPrefixFilter, - mapOpEntry, - readSingleBatch, - setSessionSnapshotTime -} from './util.js'; +import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; export class MongoSyncBucketStorage extends BaseObserver implements storage.SyncRulesBucketStorage { private readonly db: PowerSyncMongo; - private checksumCache = new storage.ChecksumCache({ - fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); - } - }); + readonly checksums: MongoChecksums; private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; private writeCheckpointAPI: MongoWriteCheckpointAPI; @@ -74,6 +56,7 @@ export class MongoSyncBucketStorage ) { super(); this.db = factory.db; + this.checksums = new MongoChecksums(this.db, this.group_id); this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ db: this.db, mode: writeCheckpointMode, @@ -503,243 +486,11 @@ export class MongoSyncBucketStorage } async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { - return this.checksumCache.getChecksumMap(checkpoint, buckets); + return this.checksums.getChecksums(checkpoint, buckets); } clearChecksumCache() { - this.checksumCache.clear(); - } - - private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { - if (batch.length == 0) { - return new Map(); - } - - const preFilters: any[] = []; - for (let request of batch) { - if (request.start == null) { - preFilters.push({ - _id: { - g: this.group_id, - b: request.bucket - }, - 'compacted_state.op_id': { $exists: true, $lte: request.end } - }); - } - } - - const preStates = new Map(); - - if (preFilters.length > 0) { - // For un-cached bucket checksums, attempt to use the compacted state first. - const states = await this.db.bucket_state - .find({ - $or: preFilters - }) - .toArray(); - for (let state of states) { - const compactedState = state.compacted_state!; - preStates.set(state._id.b, { - opId: compactedState.op_id, - checksum: { - bucket: state._id.b, - checksum: Number(compactedState.checksum), - count: compactedState.count - } - }); - } - } - - const mappedRequests = batch.map((request) => { - let start = request.start; - if (start == null) { - const preState = preStates.get(request.bucket); - if (preState != null) { - start = preState.opId; - } - } - return { - ...request, - start - }; - }); - - const queriedChecksums = await this.queryPartialChecksums(mappedRequests); - - return new Map( - batch.map((request) => { - const bucket = request.bucket; - // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available - const preState = preStates.get(bucket); - // Could be null if we got no data - const partialChecksum = queriedChecksums.get(bucket); - const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); - - return [bucket, merged]; - }) - ); - } - - async queryPartialChecksums(batch: storage.FetchPartialBucketChecksum[]): Promise { - try { - return await this.queryPartialChecksumsInternal(batch); - } catch (e) { - if (e.codeName == 'MaxTimeMSExpired') { - logger.warn(`Checksum query timed out; falling back to slower version`, e); - // Timeout - try the slower but more robust version - return await this.queryPartialChecksumsFallback(batch); - } - throw lib_mongo.mapQueryError(e, 'while reading checksums'); - } - } - - private async queryPartialChecksumsInternal( - batch: storage.FetchPartialBucketChecksum[] - ): Promise { - const filters: any[] = []; - for (let request of batch) { - filters.push({ - _id: { - $gt: { - g: this.group_id, - b: request.bucket, - o: request.start ?? new bson.MinKey() - }, - $lte: { - g: this.group_id, - b: request.bucket, - o: request.end - } - } - }); - } - - const aggregate = await this.db.bucket_data - .aggregate( - [ - { - $match: { - $or: filters - } - }, - CHECKSUM_QUERY_GROUP_STAGE - ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } - ) - // Don't map the error here - we want to keep timeout errors as-is - .toArray(); - - const partialChecksums = new Map( - aggregate.map((doc) => { - const bucket = doc._id; - return [bucket, checksumFromAggregate(doc)]; - }) - ); - - return new Map( - batch.map((request) => { - const bucket = request.bucket; - // Could be null if we got no data - let partialChecksum = partialChecksums.get(bucket); - if (partialChecksum == null) { - partialChecksum = { - bucket, - partialCount: 0, - partialChecksum: 0 - }; - } - if (request.start == null && isPartialChecksum(partialChecksum)) { - partialChecksum = { - bucket, - count: partialChecksum.partialCount, - checksum: partialChecksum.partialChecksum - }; - } - - return [bucket, partialChecksum]; - }) - ); - } - - /** - * Checksums for large buckets can run over the query timeout. - * To avoid this, we query in batches. - * This version can handle larger amounts of data, but is slower, especially for many buckets. - */ - async queryPartialChecksumsFallback( - batch: storage.FetchPartialBucketChecksum[] - ): Promise { - const partialChecksums = new Map(); - for (let request of batch) { - const checksum = await this.slowChecksum(request); - partialChecksums.set(request.bucket, checksum); - } - - return partialChecksums; - } - - private async slowChecksum(request: storage.FetchPartialBucketChecksum): Promise { - const batchLimit = 50_000; - - let lowerBound = 0n; - const bucket = request.bucket; - - let runningChecksum: PartialOrFullChecksum = { - bucket, - partialCount: 0, - partialChecksum: 0 - }; - if (request.start == null) { - runningChecksum = { - bucket, - count: 0, - checksum: 0 - }; - } - - while (true) { - const filter = { - _id: { - $gt: { - g: this.group_id, - b: bucket, - o: lowerBound - }, - $lte: { - g: this.group_id, - b: bucket, - o: request.end - } - } - }; - const docs = await this.db.bucket_data - .aggregate( - [ - { - $match: filter - }, - // sort and limit _before_ grouping - { $sort: { _id: 1 } }, - { $limit: batchLimit }, - CHECKSUM_QUERY_GROUP_STAGE - ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } - ) - .toArray(); - const doc = docs[0]; - if (doc == null) { - return runningChecksum; - } - const partial = checksumFromAggregate(doc); - runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); - const isFinal = doc.count != batchLimit; - if (isFinal) { - break; - } else { - lowerBound = doc.last_op; - } - } - return runningChecksum; + this.checksums.clearCache(); } async terminate(options?: storage.TerminateOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index 9f80fb146..cae1a5b9f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -130,44 +130,3 @@ export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson. throw new ServiceAssertionError(`Session snapshotTime is already set`); } } - -export const CHECKSUM_QUERY_GROUP_STAGE = { - $group: { - _id: '$_id.b', - // Historically, checksum may be stored as 'int' or 'double'. - // More recently, this should be a 'long'. - // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. - checksum_total: { $sum: { $toLong: '$checksum' } }, - count: { $sum: 1 }, - has_clear_op: { - $max: { - $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] - } - }, - last_op: { $max: '$_id.o' } - } -}; - -/** - * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. - */ -export function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { - const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; - const bucket = doc._id; - - if (doc.has_clear_op == 1) { - return { - // full checksum - replaces any previous one - bucket, - checksum: partialChecksum, - count: doc.count - } satisfies BucketChecksum; - } else { - return { - // partial checksum - is added to a previous one - bucket, - partialCount: doc.count, - partialChecksum - } satisfies PartialChecksum; - } -} From 40cf5f0101034c5659979ef0768c7cb162261f3e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 29 Aug 2025 11:34:03 +0200 Subject: [PATCH 10/11] Another changeset. --- .changeset/late-queens-sell.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/late-queens-sell.md diff --git a/.changeset/late-queens-sell.md b/.changeset/late-queens-sell.md new file mode 100644 index 000000000..fc643adf2 --- /dev/null +++ b/.changeset/late-queens-sell.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Improve performance of the compact job From 4f4b3d548a0dcdf346900110bdd4236a5d22c174 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 29 Aug 2025 11:46:16 +0200 Subject: [PATCH 11/11] Add missing await. --- .../src/storage/implementation/MongoCompactor.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 3ca4dbc5c..7c62717af 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -140,10 +140,10 @@ export class MongoCompactor { }; const doneWithBucket = async () => { - // Free memory before clearing bucket if (currentState == null) { return; } + // Free memory before clearing bucket currentState.seen.clear(); if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { logger.info( @@ -201,7 +201,7 @@ export class MongoCompactor { for (let doc of batch) { if (currentState == null || doc._id.b != currentState.bucket) { - doneWithBucket(); + await doneWithBucket(); currentState = { bucket: doc._id.b, @@ -291,7 +291,7 @@ export class MongoCompactor { } } - doneWithBucket(); + await doneWithBucket(); // Need another flush after updateBucketChecksums() await this.flush();