Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor mget Method in RedisAdvancedClusterReactiveCommandsImpl.java to Use Java Streams for Improved Readability and Efficiency #3060

Open
ori0o0p opened this issue Nov 23, 2024 · 0 comments · May be fixed by #3061
Labels
for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage

Comments

@ori0o0p
Copy link

ori0o0p commented Nov 23, 2024

Feature Request

Is your feature request related to a problem? Please describe

Currently, the mget method in the RedisAdvancedClusterReactiveCommandsImpl.java class relies on a for loop and manually adding publishers to a list. This approach makes the code harder to read and maintain, especially when dealing with partitioned data, leading to reduced efficiency.

Describe the solution you'd like

I propose refactoring the mget method in the RedisAdvancedClusterReactiveCommandsImpl.java class to use Java 8 Streams to build the list of publishers in a more concise way. Instead of manually iterating through partitions and adding them, we can use Stream.map() to transform the partitioned data into a list of publishers. Additionally, I suggest using flatMapMany(Flux::fromIterable) for more readable and efficient handling of the result, as opposed to using flatMapIterable.

Describe alternatives you've considered

An alternative approach would be to keep the current for loop implementation, but this would result in more verbose and harder-to-understand code. Using Stream and flatMapMany provides a more modern, readable, and efficient solution.

Teachability, Documentation, Adoption, Migration Strategy

Users will be able to use this improved method by simply updating their codebase with the refactored logic. No additional changes to the public API are required.
The updated code will be easier to maintain and understand, particularly for new developers.
The documentation will need to reflect the updated code flow, especially around how partitioning and merging are handled.
Migration should be smooth, as the method signature remains unchanged; only the internal implementation is improved.

Existing Code

hxxps://github.com/redis/lettuce/blob/main/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java

@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
    List<K> keyList = LettuceLists.newList(keys);
    Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
    
    if (partitioned.size() < 2) {
        return super.mget(keyList);
    }
    
    List<Publisher<KeyValue<K, V>>> publishers = new ArrayList<>();
    
    for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
        publishers.add(super.mget(entry.getValue()));
    }
    
    Flux<KeyValue<K, V>> fluxes = Flux.mergeSequential(publishers);
    
    Mono<List<KeyValue<K, V>>> map = fluxes.collectList().map(vs -> {
        KeyValue<K, V>[] values = new KeyValue[vs.size()];
        int offset = 0;
        
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            for (int i = 0; i < keyList.size(); i++) {
                int index = entry.getValue().indexOf(keyList.get(i));
                if (index == -1) {
                    continue;
                }
                
                values[i] = vs.get(offset + index);
            }
            offset += entry.getValue().size();
        }
        
        return Arrays.asList(values);
    });
    
    return map.flatMapIterable(keyValues -> keyValues);
}

Improved Code

@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
    List<K> keyList = LettuceLists.newList(keys);
    Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
    
    if (partitioned.size() < 2) {
        return super.mget(keyList);
    }
    
    List<Publisher<KeyValue<K, V>>> publishers = partitioned.values().stream()
            .map(super::mget)
            .collect(Collectors.toList());
    
    return Flux.mergeSequential(publishers)
               .collectList()
               .map(results -> {
                   KeyValue<K, V>[] values = new KeyValue[keyList.size()];
                   int offset = 0;
                   
                   for (List<K> partitionKeys : partitioned.values()) {
                       for (int i = 0; i < keyList.size(); i++) {
                           int index = partitionKeys.indexOf(keyList.get(i));
                           if (index != -1) {
                               values[i] = results.get(offset + index);
                           }
                       }
                       offset += partitionKeys.size();
                   }
                   
                   return Arrays.asList(values);
               })
               .flatMapMany(Flux::fromIterable);
}

Thank you :)

@ori0o0p ori0o0p changed the title Refactor mget Method in RedisAdvancedClusterReactiveCommandsImpl.java to Use Java 8 Streams for Improved Readability and Efficiency Refactor mget Method in RedisAdvancedClusterReactiveCommandsImpl.java to Use Java Streams for Improved Readability and Efficiency Nov 23, 2024
@tishun tishun added for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage labels Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants