diff --git a/.changeset/sweet-years-tickle.md b/.changeset/sweet-years-tickle.md new file mode 100644 index 000000000..02cc3f6ef --- /dev/null +++ b/.changeset/sweet-years-tickle.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-mongodb': patch +'@powersync/lib-service-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB Replication] Fix resumeTokens going back in time on busy change streams. diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index f85de233b..9d0b19821 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -947,6 +947,16 @@ export class ChangeStream { timestamp: changeDocument.clusterTime!, resume_token: changeDocument._id }); + if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { + // Checkpoint out of order - should never happen with MongoDB. + // If it does happen, we throw an error to stop the replication - restarting should recover. + // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. + // This is a workaround for the issue below, but we can keep this as a safety-check even if the issue is fixed. + // Driver issue report: https://jira.mongodb.org/browse/NODE-7042 + throw new ReplicationAssertionError( + `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` + ); + } if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { waitForCheckpointLsn = null; diff --git a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts index 17b65c66c..3c770fb9b 100644 --- a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts +++ b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts @@ -28,7 +28,7 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter { // Could be fail2ban or similar this.setDelay(120_000); } else { - this.setDelay(30_000); + this.setDelay(5_000); } }