From f051afc567a435d87660038bc2857a3dcd9d2922 Mon Sep 17 00:00:00 2001 From: jonapgar-groupby <128390641+jonapgar-groupby@users.noreply.github.com> Date: Mon, 11 Sep 2023 13:00:42 -0400 Subject: [PATCH] fix: prevent recursive try-catch memory leak in mergeInternals fixes #7334 --- src/internal/operators/mergeInternals.ts | 57 ++++++++++++------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts index dab3a2b4db..c0e2b894d7 100644 --- a/src/internal/operators/mergeInternals.ts +++ b/src/internal/operators/mergeInternals.ts @@ -66,9 +66,16 @@ export function mergeInternals( // This is checked during finalization to see if we should // move to the next item in the buffer, if there is on. let innerComplete = false; - + + let projected; + try { + projected = project(value, index++); + } catch (err) { + subscriber.error(err) + return + } // Start our inner subscription. - innerFrom(project(value, index++)).subscribe( + innerFrom(projected).subscribe( createOperatorSubscriber( subscriber, (innerValue) => { @@ -97,35 +104,27 @@ export function mergeInternals( // cancelled), then we want to try the next item in the buffer if // there is one. if (innerComplete) { - // We have to wrap this in a try/catch because it happens during - // finalization, possibly asynchronously, and we want to pass - // any errors that happen (like in a projection function) to - // the outer Subscriber. - try { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - while (buffer.length && active < concurrent) { - const bufferedValue = buffer.shift()!; - // Particularly for `expand`, we need to check to see if a scheduler was provided - // for when we want to start our inner subscription. Otherwise, we just start - // are next inner subscription. - if (innerSubScheduler) { - executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); - } else { - doInnerSub(bufferedValue); - } + // INNER SOURCE COMPLETE + // Decrement the active count to ensure that the next time + // we try to call `doInnerSub`, the number is accurate. + active--; + // If we have more values in the buffer, try to process those + // Note that this call will increment `active` ahead of the + // next conditional, if there were any more inner subscriptions + // to start. + while (buffer.length && active < concurrent) { + const bufferedValue = buffer.shift()!; + // Particularly for `expand`, we need to check to see if a scheduler was provided + // for when we want to start our inner subscription. Otherwise, we just start + // are next inner subscription. + if (innerSubScheduler) { + executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); + } else { + doInnerSub(bufferedValue); } - // Check to see if we can complete, and complete if so. - checkComplete(); - } catch (err) { - subscriber.error(err); } + // Check to see if we can complete, and complete if so. + checkComplete(); } } )