@@ -10,12 +10,12 @@ import { promisify } from 'util';
10
10
import {
11
11
AbstractCursor ,
12
12
ChangeStream ,
13
+ ChangeStreamDocument ,
13
14
ChangeStreamOptions ,
14
15
Collection ,
15
16
CommandStartedEvent ,
16
17
Db ,
17
18
Long ,
18
- MongoAPIError ,
19
19
MongoChangeStreamError ,
20
20
MongoClient ,
21
21
MongoServerError ,
@@ -989,137 +989,45 @@ describe('Change Streams', function () {
989
989
async function ( ) {
990
990
changeStream = collection . watch ( [ ] ) ;
991
991
await initIteratorMode ( changeStream ) ;
992
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
993
992
994
993
const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
995
994
await collection . insertMany ( docs ) ;
996
995
997
- for ( const doc of docs ) {
998
- const change = await changeStreamIterator . next ( ) ;
999
- const { fullDocument } = change . value ;
1000
- expect ( fullDocument . city ) . to . equal ( doc . city ) ;
996
+ for await ( const change of changeStream ) {
997
+ const { fullDocument } = change ;
998
+ const expectedDoc = docs . shift ( ) ;
999
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1000
+ if ( docs . length === 0 ) {
1001
+ break ;
1002
+ }
1001
1003
}
1004
+ expect ( docs ) . to . have . length ( 0 , 'expected to find all docs before exiting loop' ) ;
1002
1005
}
1003
1006
) ;
1004
1007
1005
1008
it (
1006
- 'should close the change stream when return is called ' ,
1009
+ 'cannot resume from partial iteration ' ,
1007
1010
{ requires : { topology : '!single' } } ,
1008
1011
async function ( ) {
1009
1012
changeStream = collection . watch ( [ ] ) ;
1010
1013
await initIteratorMode ( changeStream ) ;
1011
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1012
1014
1013
1015
const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1014
1016
await collection . insertMany ( docs ) ;
1015
1017
1016
- await changeStreamIterator . next ( ) ;
1017
- await changeStreamIterator . return ( ) ;
1018
- expect ( changeStream . closed ) . to . be . true ;
1019
- expect ( changeStream . cursor . closed ) . to . be . true ;
1020
- }
1021
- ) ;
1022
-
1023
- it (
1024
- 'should close the change stream when an error is thrown' ,
1025
- { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1026
- async function ( ) {
1027
- changeStream = collection . watch ( [ ] ) ;
1028
- await initIteratorMode ( changeStream ) ;
1029
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1030
-
1031
- const unresumableErrorCode = 1000 ;
1032
- await client . db ( 'admin' ) . command ( {
1033
- configureFailPoint : is4_2Server ( this . configuration . version )
1034
- ? 'failCommand'
1035
- : 'failGetMoreAfterCursorCheckout' ,
1036
- mode : { times : 1 } ,
1037
- data : {
1038
- failCommands : [ 'getMore' ] ,
1039
- errorCode : unresumableErrorCode
1040
- }
1041
- } as FailPoint ) ;
1042
-
1043
- await collection . insertOne ( { city : 'New York City' } ) ;
1044
- try {
1045
- await changeStreamIterator . next ( ) ;
1046
- expect . fail (
1047
- 'Change stream did not throw unresumable error and did not produce any events'
1048
- ) ;
1049
- } catch ( error ) {
1050
- expect ( changeStream . closed ) . to . be . true ;
1051
- expect ( changeStream . cursor . closed ) . to . be . true ;
1018
+ for await ( const change of changeStream ) {
1019
+ const { fullDocument } = change ;
1020
+ const expectedDoc = docs . shift ( ) ;
1021
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1022
+ break ;
1052
1023
}
1053
- }
1054
- ) ;
1055
-
1056
- it (
1057
- 'should not produce events on closed stream' ,
1058
- { requires : { topology : '!single' } } ,
1059
- async function ( ) {
1060
- changeStream = collection . watch ( [ ] ) ;
1061
- changeStream . close ( ) ;
1062
1024
1063
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1064
- const change = await changeStreamIterator . next ( ) ;
1065
-
1066
- expect ( change . value ) . to . be . undefined ;
1067
- }
1068
- ) ;
1069
-
1070
- it (
1071
- 'cannot be used with emitter-based iteration' ,
1072
- { requires : { topology : '!single' } } ,
1073
- async function ( ) {
1074
- changeStream = collection . watch ( [ ] ) ;
1075
- changeStream . on ( 'change' , sinon . stub ( ) ) ;
1076
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1077
-
1078
- const error = await changeStreamIterator . next ( ) . catch ( e => e ) ;
1079
- expect ( error ) . to . be . instanceOf ( MongoAPIError ) ;
1080
- }
1081
- ) ;
1082
-
1083
- it (
1084
- 'can be used with raw iterator API' ,
1085
- { requires : { topology : '!single' } } ,
1086
- async function ( ) {
1087
- changeStream = collection . watch ( [ ] ) ;
1088
- await initIteratorMode ( changeStream ) ;
1089
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1090
-
1091
- const docs = [ { city : 'Los Angeles' } , { city : 'Miami' } ] ;
1092
- await collection . insertMany ( docs ) ;
1093
-
1094
- await changeStream . next ( ) ;
1095
-
1096
- try {
1097
- const change = await changeStreamIterator . next ( ) ;
1098
- expect ( change . value ) . to . not . be . undefined ;
1099
-
1100
- const { fullDocument } = change . value ;
1101
- expect ( fullDocument . city ) . to . equal ( docs [ 1 ] . city ) ;
1102
- } catch ( error ) {
1103
- expect . fail ( 'Async could not be used with raw iterator API' ) ;
1025
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
1026
+ for await ( const change of changeStream ) {
1027
+ expect . fail ( 'Change stream resumed from partial iteration' ) ;
1104
1028
}
1105
- }
1106
- ) ;
1107
1029
1108
- it (
1109
- 'ignores errors thrown from close' ,
1110
- { requires : { topology : '!single' } } ,
1111
- async function ( ) {
1112
- changeStream = collection . watch ( [ ] ) ;
1113
- await initIteratorMode ( changeStream ) ;
1114
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1115
-
1116
- sinon . stub ( changeStream . cursor , 'close' ) . throws ( new MongoAPIError ( 'testing' ) ) ;
1117
-
1118
- try {
1119
- await changeStreamIterator . return ( ) ;
1120
- } catch ( error ) {
1121
- expect . fail ( 'Async iterator threw an error on close' ) ;
1122
- }
1030
+ expect ( docs ) . to . have . length ( 2 , 'expected to find remaining docs after partial iteration' ) ;
1123
1031
}
1124
1032
) ;
1125
1033
} ) ;
@@ -2352,7 +2260,6 @@ describe('ChangeStream resumability', function () {
2352
2260
async function ( ) {
2353
2261
changeStream = collection . watch ( [ ] ) ;
2354
2262
await initIteratorMode ( changeStream ) ;
2355
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2356
2263
2357
2264
await client . db ( 'admin' ) . command ( {
2358
2265
configureFailPoint : is4_2Server ( this . configuration . version )
@@ -2366,10 +2273,18 @@ describe('ChangeStream resumability', function () {
2366
2273
}
2367
2274
} as FailPoint ) ;
2368
2275
2369
- await collection . insertOne ( { city : 'New York City' } ) ;
2370
- await changeStreamIterator . next ( ) ;
2276
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2277
+ await collection . insertMany ( docs ) ;
2371
2278
2372
- expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2279
+ for await ( const change of changeStream ) {
2280
+ const { fullDocument } = change ;
2281
+ const expectedDoc = docs . shift ( ) ;
2282
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
2283
+ if ( docs . length === 0 ) {
2284
+ break ;
2285
+ }
2286
+ }
2287
+ expect ( docs ) . to . have . length ( 0 , 'expected to find all docs before exiting loop' ) ;
2373
2288
}
2374
2289
) ;
2375
2290
}
@@ -2383,12 +2298,15 @@ describe('ChangeStream resumability', function () {
2383
2298
await initIteratorMode ( changeStream ) ;
2384
2299
const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2385
2300
2301
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2302
+ await collection . insertMany ( docs ) ;
2303
+
2386
2304
// on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response.
2387
2305
// This means that a resume token isn't cached until the first change has been iterated.
2388
2306
// In order to test the resume, we need to ensure that at least one document has
2389
2307
// been iterated so we have a resume token to resume on.
2390
- await collection . insertOne ( { city : 'New York City' } ) ;
2391
2308
await changeStreamIterator . next ( ) ;
2309
+ docs . shift ( ) ;
2392
2310
2393
2311
const mock = sinon
2394
2312
. stub ( changeStream . cursor , '_getMore' )
@@ -2399,10 +2317,16 @@ describe('ChangeStream resumability', function () {
2399
2317
callback ( error ) ;
2400
2318
} ) ;
2401
2319
2402
- await collection . insertOne ( { city : 'New York City' } ) ;
2403
- await changeStreamIterator . next ( ) ;
2320
+ for await ( const change of changeStream ) {
2321
+ const { fullDocument } = change ;
2322
+ const expectedDoc = docs . shift ( ) ;
2323
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
2324
+ if ( docs . length === 0 ) {
2325
+ break ;
2326
+ }
2327
+ }
2404
2328
2405
- expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2329
+ expect ( docs ) . to . have . length ( 0 , 'expected to find all docs before exiting loop' ) ;
2406
2330
}
2407
2331
) ;
2408
2332
}
@@ -2447,25 +2371,34 @@ describe('ChangeStream resumability', function () {
2447
2371
async function ( ) {
2448
2372
changeStream = collection . watch ( [ ] ) ;
2449
2373
await initIteratorMode ( changeStream ) ;
2450
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2451
2374
2452
- const unresumableErrorCode = 1000 ;
2453
- await client . db ( 'admin' ) . command ( {
2454
- configureFailPoint : is4_2Server ( this . configuration . version )
2455
- ? 'failCommand'
2456
- : 'failGetMoreAfterCursorCheckout' ,
2457
- mode : { times : 1 } ,
2458
- data : {
2459
- failCommands : [ 'getMore' ] ,
2460
- errorCode : unresumableErrorCode
2461
- }
2462
- } as FailPoint ) ;
2375
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2376
+ await collection . insertMany ( docs ) ;
2463
2377
2464
- await collection . insertOne ( { city : 'New York City' } ) ;
2378
+ try {
2379
+ for await ( const change of changeStream ) {
2380
+ const { fullDocument } = change ;
2381
+ const expectedDoc = docs . shift ( ) ;
2382
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
2383
+
2384
+ const unresumableErrorCode = 1000 ;
2385
+ await client . db ( 'admin' ) . command ( {
2386
+ configureFailPoint : is4_2Server ( this . configuration . version )
2387
+ ? 'failCommand'
2388
+ : 'failGetMoreAfterCursorCheckout' ,
2389
+ mode : { times : 1 } ,
2390
+ data : {
2391
+ failCommands : [ 'getMore' ] ,
2392
+ errorCode : unresumableErrorCode
2393
+ }
2394
+ } as FailPoint ) ;
2395
+ }
2465
2396
2466
- const error = await changeStreamIterator . next ( ) . catch ( e => e ) ;
2467
- expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2468
- expect ( aggregateEvents ) . to . have . lengthOf ( 1 ) ;
2397
+ expect . fail ( 'Async did not throw on an unresumable error' ) ;
2398
+ } catch ( error ) {
2399
+ expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2400
+ expect ( aggregateEvents ) . to . have . lengthOf ( 1 ) ;
2401
+ }
2469
2402
}
2470
2403
) ;
2471
2404
} ) ;
0 commit comments