Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent recursive try-catch memory leak in mergeInternals #7335

Open
wants to merge 1 commit into
base: 7.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 28 additions & 29 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,16 @@ export function mergeInternals<T, R>(
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;


let projected;
try {
projected = project(value, index++);
} catch (err) {
subscriber.error(err)
return
}
// Start our inner subscription.
innerFrom(project(value, index++)).subscribe(
innerFrom(projected).subscribe(
createOperatorSubscriber(
subscriber,
(innerValue) => {
Expand Down Expand Up @@ -97,35 +104,27 @@ export function mergeInternals<T, R>(
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
subscriber.error(err);
}
// Check to see if we can complete, and complete if so.
checkComplete();
}
}
)
Expand Down