Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory memory management for flatMap #376

Merged
merged 2 commits into from
Jan 19, 2025

Conversation

thepont
Copy link
Contributor

@thepont thepont commented Oct 7, 2024

Description:

This change prevents us from requesting a value from the outer iterator if we are currently maxed out for concurrency on our inner iterators, This prevents a potential OOM exception where the inner iterators process at a slower rate the the outer iterator can produce.

Related issue (if exists): #375

prevent the flatMap operator from accumulating in an internal array while concurrency is maxed out
Comment on lines 124 to 125
// add the outer iterator to the race
results[0] = outer.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe, since here we are still potentially waiting on the latest promise from the outer iterator to resolve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'll see if I can't figure out a better way of doing it without potentially loosing values.

Copy link
Contributor Author

@thepont thepont Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @trxcllnt thanks for pointing this out, 8f90bc7 resolves the tests locally for me, as you Identified some were failing because I was loosing items due to the above calling next when a previous item still remained unresolved and other tests were failing because next was called on the outer after it had been completed above.

Copy link
Member

@trxcllnt trxcllnt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests currently failing, possibly due to L124-L125 changes. Let me know if you'd like help running or debugging the tests.

Comment on lines +96 to -99
results[0] = outer.next();
} else {
// remove the outer iterator from the race, we're full
results[0] = NEVER_PROMISE;
outerValues.push(value as TSource);
}
results[0] = outer.next();
Copy link
Member

@trxcllnt trxcllnt Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out.

This operator is technically working as designed, i.e. we are intentionally pulling and buffering values from the outer sequence as quickly as they're emitted, but only flattening concurrent number of inner sequences at a time (similar to the Observable implementation).

But I can see how the internal buffering is problematic, and I agree that we shouldn't need to buffer the outer sequence values.

Copy link
Contributor Author

@thepont thepont Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @trxcllnt, I manged to get the tests running locally, thanks for your help identifying the problems with my code.

When I started looking at the actual code I figured it must have been by design.

My intent was to fan out to do multiple API requests ( to AWS SQS) concurrently based on the result of of a dynamodb scan and the speed that I could processes the items is reduced significantly when I removed the concurrency and tried a concatMap ( although this technically works, and solves my memory issues, although a little slower for my application).

My initial motivation for moving from RX to IX was to fix this issue, figuring it would be difficult to prevent my memory issues unless I had a way of throttling the producer, I assume another approach that would work as the library currently stands is to use a throttle before the flatMap and attempt to tweak that time so the consumer can keep up, and the buffer never explodes.

On the tests, I managed to run the unit tests although the contribution guide mentions performance tests but I seem unable to locate these?

 * loosing values due to using .next while still waiting for outer iterator
 * adding completed iterator to race resulting in exception
@trxcllnt
Copy link
Member

@thepont Apologies for the delay, I somehow lost track of this PR. I'll try to get this merged and publish a new version later today. Thanks for understanding!

@trxcllnt trxcllnt merged commit 7221be2 into ReactiveX:master Jan 19, 2025
72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants