Skip to content

Commit 42f45f2

Browse files
committed
Fix error mapping for ChangeStreamInvalidatedError.
1 parent 8335e29 commit 42f45f2

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isMongoNetworkTimeoutError, mongo } from '@powersync/lib-service-mongodb';
1+
import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb';
22
import {
33
container,
44
DatabaseConnectionError,
@@ -605,13 +605,7 @@ export class ChangeStream {
605605
}
606606

607607
const originalChangeDocument = await stream.tryNext().catch((e) => {
608-
if (isMongoNetworkTimeoutError(e)) {
609-
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
610-
// We wrap the error to make it more useful.
611-
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
612-
} else {
613-
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
614-
}
608+
throw mapChangeStreamError(e);
615609
});
616610
// The stream was closed, we will only ever receive `null` from it
617611
if (!originalChangeDocument && stream.closed) {
@@ -793,3 +787,21 @@ async function touch() {
793787
// or reduce PING_INTERVAL here.
794788
return container.probes.touch();
795789
}
790+
791+
function mapChangeStreamError(e: any) {
792+
if (isMongoNetworkTimeoutError(e)) {
793+
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
794+
// We wrap the error to make it more useful.
795+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
796+
} else if (
797+
isMongoServerError(e) &&
798+
e.codeName == 'NoMatchingDocument' &&
799+
e.errmsg?.includes('post-image was not found')
800+
) {
801+
throw new ChangeStreamInvalidatedError(e.errmsg, e);
802+
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
803+
throw new ChangeStreamInvalidatedError(e.message, e);
804+
} else {
805+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
806+
}
807+
}

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
8686
}
8787
if (e instanceof ChangeStreamInvalidatedError) {
8888
throw e;
89-
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
90-
throw new ChangeStreamInvalidatedError(e.message, e);
9189
} else {
9290
// Report the error if relevant, before retrying
9391
container.reporter.captureException(e, {

modules/module-mongodb/test/src/resume.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { describe, expect, test, vi } from 'vitest';
88
import { ChangeStreamTestContext } from './change_stream_utils.js';
99
import { env } from './env.js';
1010
import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js';
11+
import { ChangeStreamInvalidatedError } from '@module/replication/ChangeStream.js';
1112

1213
describe('mongo lsn', () => {
1314
test('LSN with resume tokens should be comparable', () => {
@@ -145,8 +146,7 @@ function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Pr
145146
context2.storage = factory.getInstance(activeContent!);
146147

147148
const error = await context2.startStreaming().catch((ex) => ex);
148-
expect(error).exist;
149149
// The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError
150-
expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError'));
150+
expect(error).toBeInstanceOf(ChangeStreamInvalidatedError);
151151
});
152152
}

0 commit comments

Comments
 (0)