Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expandDeep uses unbounded queue which results in OutOfMemoryException #3411

Closed
troydm opened this issue Mar 22, 2023 · 7 comments
Closed

expandDeep uses unbounded queue which results in OutOfMemoryException #3411

troydm opened this issue Mar 22, 2023 · 7 comments
Labels
status/need-design This needs more in depth design work type/enhancement A general enhancement

Comments

@troydm
Copy link

troydm commented Mar 22, 2023

expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher

Expected Behavior

not sure

Actual Behavior

OutOfMemoryException thrown

Steps to Reproduce

@Test
void reproCase() {
    val genBytes = { Array(1024 * 1024 * 100) { 0 } }
    val i = AtomicInteger(0)

    Flux. just(genBytes()).expandDeep({
        if (i.incrementAndGet() > 100)
            Flux.empty()
        else
            Flux.just(genBytes())
    }, 1)
   .delayElements(Duration.ofMinutes(5))
   .subscribe {
     println("byte array processed") 
   } 

}

Possible Solution

add variety of expand and expandDeep methods with bounded queue in order to limit memory usage

Your Environment

  • Reactor version(s) used: 3.4.27
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): 11
  • OS and version (eg uname -a): Windows 10
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 22, 2023
@OlegDokuka OlegDokuka added type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 23, 2023
@OlegDokuka OlegDokuka added this to the 3.4.x Backlog milestone Mar 23, 2023
@injae-kim
Copy link
Contributor

Supplier<List<Byte>> genBytes = () -> Arrays.asList(new Byte[1024*1024*100]);
Flux.just(genBytes.get()).subscribe(); // OOM

Hmm maybe expand's unbounded queue is not root cause of OOM?

Like above, Flux.just(genBytes()) also trigger OOM in my local env without expandDeep().

@troydm
Copy link
Author

troydm commented Feb 17, 2024

@injae-kim strange, genBytes() generates 1gb byte array which shouldn't cause oom, how much memory do you have set as maximum, you can modify it to generate less memory, the point of this issue still lays in expandDeep which internally is using unbounded queue

@injae-kim
Copy link
Contributor

injae-kim commented Feb 19, 2024

this.queue = Queues.<Publisher<? extends T>>unbounded(capacityHint).get();

this.subscriptionStack = new ArrayDeque<>(capacityHint);

the point of this issue still lays in expandDeep which internally is using unbounded queue

oh I see. I checked that expand() and expandDeep() uses unbounded queue, deque internally and it's the root cause of OOM in this case.

My Idea

public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint,
    int maxCapacity,  BiConsumer<? super Queue, ? super T> onMaxCapacity) { // 👈👈
		return onAssembly(new FluxExpand<>(this, expander, false, capacityHint, maxCapacity, onMaxCapacity));
	}

// e.g.
Flux.just(..)
  .expandDeep(publisher -> .., ccapacityHint,
              maxCapacity: 1024,
              onMaxCapacity: (queue, elem) -> { /* just drop or replace last element, ..*/  } // 👈👈

If we use bounded queue/deque on expand/expandDeep, I think we can allow user to set maxCapacity and custom onMaxCapacity strategy.
Then user can determine what should queue/deque do when it reaches maxCapacity. (e.g. just drop new element or replace last element, leave log ..)

or, we can just simply drop new element when queue/deque reaches maxCapacity and don't allow user to set onMaxCapacity strategy.

Hi @OlegDokuka , can you share your opinion please? 🙇
After checking maintainer's opinion, I'll create PR :) thank you!

@chemicL
Copy link
Member

chemicL commented Feb 21, 2024

@injae-kim thanks for taking the time to analyze this issue 👏
@troydm can you please have a look and provide a potential use case for the proposed API?

I am to be honest a bit blurry about practical applications of this and wonder what sort of algorithms can be implemented on top of a bounded capacity variant that includes a strategy for dealing with overflows. If there's a practical use for it, I suppose we can base the actual design on the use cases instead of first designing and then considering what can be done with that :)

Thanks in advance for providing use cases that are currently impossible and what could be the usage of a bounded capacity-based implementation, regardless of the final design -> we can see what the final requirements are once we see what the overflow strategies dictate.

@chemicL chemicL added status/need-design This needs more in depth design work status/need-user-input This needs user input to proceed for/user-attention This issue needs user attention (feedback, rework, etc...) labels Feb 21, 2024
@chemicL chemicL modified the milestones: 3.4.x Backlog, 3.5.x backlog Feb 21, 2024
@chemicL
Copy link
Member

chemicL commented Mar 20, 2024

Closing due to inactivity for a month. Please reopen in case there's feedback to the above.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Mar 20, 2024
@chemicL chemicL removed this from the 3.5.x Backlog milestone Mar 20, 2024
@chemicL chemicL removed for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-user-input This needs user input to proceed labels Mar 20, 2024
@troydm
Copy link
Author

troydm commented Mar 29, 2024

@chemicL my concrete problem was caused by fast producer while having slow consumer (this happened in production system processing big chunks of data) , not sure if having strategy is a good use case, from my point of view just having maxCapacity option for such use cases as mine would suffice, so that in case the producer hits queue capacity limit it would wait for the consumer to empty the queue before inserting new object to queue

@chemicL
Copy link
Member

chemicL commented Aug 19, 2024

Hey, @troydm. I missed your comment in this issue. From what you say this means that the standard mechanisms of reactive programming should prevent such a case, which would be the natural backpressure mechanism. Once the producer is signalled it can produce, it will. Having a bound on the queue means that at some point if the producer was asked to produce more, but the queue in the middle is full, something needs to happen - e.g. discarding the newly produced data. There's no means to delay on the producer side - the delay is a consequence of having no request(N) calls - and if the data was produced that means the request(N) were made. Please consider looking into other means of controlling the backpressure, e.g. with limitRate(N).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-design This needs more in depth design work type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

5 participants