Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement Request: Add Key-Based Prefetch Parameter to groupBy Operator #3937

Closed
fcappi opened this issue Nov 15, 2024 · 2 comments
Closed
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor

Comments

@fcappi
Copy link

fcappi commented Nov 15, 2024

Motivation

The current implementation of the groupBy operator in Project Reactor allows for a fixed prefetch size, which is not optimal for scenarios where different group keys have varying data volumes and characteristics. This can lead to inefficient resource management and suboptimal performance.

Desired solution

I propose extending the groupBy operator to accept a function that determines the prefetch size based on the group key. This function would take the group key as an input and return an integer value representing the prefetch size for that specific group.

Example API

<K> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper,
                                Function<? super K, Integer> prefetchMapper);
  • keySelector: Function to extract the group key from each element.
  • prefetchMapper: Function to determine the prefetch size based on the group key.

Example Usage

Flux<MyData> dataFlux = ...;

dataFlux.groupBy(
    MyData::getKey,
    key -> {
        if ("highVolumeKey".equals(key)) {
            return 256; // larger prefetch for high volume key
        } else {
            return 32; // default prefetch for other keys
        }
    })
    .flatMap(groupedFlux -> groupedFlux.collectList())
    .subscribe(result -> System.out.println("Grouped result: " + result));

Considered alternatives

An alternative approach is to manually implement the grouping logic with custom prefetch management outside the groupBy operator. However, this approach would be more complex and less efficient compared to having built-in support within the groupBy operator.

Additional context

This feature would be beneficial for applications processing streams with highly variable group sizes and characteristics. It would provide a more flexible and optimized way to handle such scenarios, leading to better performance and resource utilization.

@fcappi fcappi added the type/enhancement A general enhancement label Nov 15, 2024
@chemicL
Copy link
Member

chemicL commented Nov 26, 2024

Hey @fcappi !

Thanks for the detailed description and the proposal.

I had a look at the existing implementation and one thing surprised me - the UnicastGroupedFlux.limit field is not used! It is derived from the original prefetch that the FluxGroupBy receives but eventually is ignored.

What is interesting to me is quite related. My understanding of the groupBy operator is that it has an upstream source which produces items sequentially. These items are then mapped and pushed into relevant groups' inner queues. The operators that follow need to be capable of dealing with all the groups to avoid stalling the processing. However, once the cardinality of groups is dealt with, there is no individuality in these groups' emission process -> only the upstream prefetching is relevant as that is the source of any groups' data. They are not at all independent. From my understanding, that makes individual group's prefetch a non-existent concern.

We recently had a very useful contribution related to this nature of groupBy: #3872.

Can you please comment on the above?

@chemicL chemicL added status/need-user-input This needs user input to proceed status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor and removed type/enhancement A general enhancement status/need-user-input This needs user input to proceed labels Nov 26, 2024
@chemicL
Copy link
Member

chemicL commented Dec 2, 2024

I'll close the issue with the above explanation. Feel free to reopen should you have feedback.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Dec 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor
Projects
None yet
Development

No branches or pull requests

2 participants