Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Use Lettuce client to run clear keys across Redis nodes #36862

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ public void addPluginNameForGoogleSheets(MongoTemplate mongoTemplate) {
mongoTemplate.save(googleSheetsPlugin);
}

protected static void doClearRedisKeys(ReactiveRedisOperations<String, String> reactiveRedisOperations) {
public static void doClearRedisKeys(ReactiveRedisOperations<String, String> reactiveRedisOperations) {
nidhi-nair marked this conversation as resolved.
Show resolved Hide resolved
final String script = "for _,k in ipairs(redis.call('keys','spring:session:sessions:*'))"
+ " do redis.call('del',k) " + "end";
final Flux<Object> flushdb = reactiveRedisOperations.execute(RedisScript.of(script));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import io.mongock.api.annotations.RollbackExecution;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ScanOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;

@Slf4j
@ChangeUnit(order = "063", id = "reset_session_oauth2_spring_3_3")
Expand All @@ -16,12 +21,25 @@ public void rollbackExecution() {}

@Execution
public void execute(
@Qualifier("reactiveRedisTemplate") final ReactiveRedisTemplate<String, Object> reactiveRedisTemplate) {
reactiveRedisTemplate
.getConnectionFactory()
.getReactiveConnection()
.serverCommands()
.flushDb()
.block();
@Qualifier("reactiveRedisOperations") ReactiveRedisOperations<String, Object> reactiveRedisOperations) {
scanForKeysAcrossCluster(reactiveRedisOperations, "*").block();
}

private Mono<Void> scanForKeysAcrossCluster(
ReactiveRedisOperations<String, Object> reactiveRedisOperations, String pattern) {
return reactiveRedisOperations
.execute(connection -> {
Flux<ByteBuffer> scanFlux = connection
.keyCommands()
.scan(ScanOptions.scanOptions()
.match(pattern)
.count(1000)
.build());
return scanFlux.flatMap(scannedKey -> {
return connection.keyCommands().del(scannedKey);
})
.then();
})
.then();
Comment on lines +28 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimizing Key Scanning and Deletion in Redis

You've effectively encapsulated the Redis key scanning and deletion logic within the scanForKeysAcrossCluster method. Here are some suggestions to enhance its performance and reliability:

  1. Refine the Scan Pattern: Scanning with a wildcard "*" can be resource-intensive, especially in a production environment with a large number of keys. If possible, narrow down the pattern to target only the relevant keys that need deletion.

  2. Batch Deletion for Efficiency: Deleting keys individually may lead to increased overhead. Batching the keys and performing a bulk delete can improve performance significantly.

  3. Handle Potential Exceptions: It's important to anticipate and handle any exceptions that might occur during the scanning and deletion process to prevent unexpected failures.

Here's how you could modify the method to incorporate these suggestions:

 private Mono<Void> scanForKeysAcrossCluster(
         ReactiveRedisOperations<String, Object> reactiveRedisOperations, String pattern) {
     return reactiveRedisOperations
             .execute(connection -> {
                 Flux<ByteBuffer> scanFlux = connection
                         .keyCommands()
                         .scan(ScanOptions.scanOptions()
                                 .match(pattern)
                                 .count(1000)
                                 .build());
-                return scanFlux.flatMap(scannedKey -> {
-                            return connection.keyCommands().del(scannedKey);
-                        })
+                return scanFlux.buffer(1000)
+                        .flatMap(keys -> connection.keyCommands().del(keys.toArray(new ByteBuffer[0])))
+                        .onErrorContinue((throwable, obj) -> {
+                            log.error("Error deleting keys: {}", throwable.getMessage());
+                        });
             })
             .then();
 }
  • Batching with buffer(1000): This groups the keys into batches of 1000 before deletion.
  • Bulk Deletion: Deleting the keys in bulk reduces the number of network calls and improves efficiency.
  • Error Handling with onErrorContinue: This ensures that if an error occurs during deletion, it logs the error but continues processing the remaining keys.

If you'd like further assistance in implementing these changes or have questions about Redis key management, please let me know!

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private Mono<Void> scanForKeysAcrossCluster(
ReactiveRedisOperations<String, Object> reactiveRedisOperations, String pattern) {
return reactiveRedisOperations
.execute(connection -> {
Flux<ByteBuffer> scanFlux = connection
.keyCommands()
.scan(ScanOptions.scanOptions()
.match(pattern)
.count(1000)
.build());
return scanFlux.flatMap(scannedKey -> {
return connection.keyCommands().del(scannedKey);
})
.then();
})
.then();
private Mono<Void> scanForKeysAcrossCluster(
ReactiveRedisOperations<String, Object> reactiveRedisOperations, String pattern) {
return reactiveRedisOperations
.execute(connection -> {
Flux<ByteBuffer> scanFlux = connection
.keyCommands()
.scan(ScanOptions.scanOptions()
.match(pattern)
.count(1000)
.build());
return scanFlux.buffer(1000)
.flatMap(keys -> connection.keyCommands().del(keys.toArray(new ByteBuffer[0])))
.onErrorContinue((throwable, obj) -> {
log.error("Error deleting keys: {}", throwable.getMessage());
});
})
.then();
}

}
}
Loading