Skip to content

Commit b1b96d8

Browse files
committed
Draft : filter out commands to resent after a re-connect
1 parent ce92e6a commit b1b96d8

File tree

5 files changed

+169
-18
lines changed

5 files changed

+169
-18
lines changed

docs/advanced-usage.md

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ client.setOptions(ClientOptions.builder()
327327
<tbody>
328328
<tr>
329329
<td>PING before activating connection</td>
330-
<td><code>pingBefor eActivateConnection</code></td>
330+
<td><code>pingBeforeActivateConnection</code></td>
331331
<td><code>true</code></td>
332332
</tr>
333333
<tr>
@@ -362,8 +362,21 @@ queued commands.</p>
362362
refuse commands and cancel these with an exception.</p></td>
363363
</tr>
364364
<tr>
365+
<td>Replay filter</td>
366+
<td><code>replayFilter</code></td>
367+
<td><code>(cmd) -> false</code></td>
368+
</tr>
369+
<tr>
370+
<td colspan="3"><p>Since: 6.6</p>
371+
<p>Controls which commands are to be filtered out in case the driver
372+
attempts to reconnect to the server. Returning <code>false</code> means
373+
that the command would not be filtered out.</p>
374+
<p>This flag has no effect in case the autoReconnect feature is not
375+
enabled.</p></td>
376+
</tr>
377+
<tr>
365378
<td>Cancel commands on reconnect failure</td>
366-
<td><code>cancelCommand sOnReconnectFailure</code></td>
379+
<td><code>cancelCommandsOnReconnectFailure</code></td>
367380
<td><code>false</code></td>
368381
</tr>
369382
<tr>
@@ -486,7 +499,7 @@ store/trust store.</p></td>
486499
<tr>
487500
<td>Timeout Options</td>
488501
<td><code>timeoutOptions</code></td>
489-
<td><code>Do n ot timeout commands.</code></td>
502+
<td><code>Do not timeout commands.</code></td>
490503
</tr>
491504
<tr>
492505
<td colspan="3"><p>Since: 5.1</p>
@@ -550,7 +563,7 @@ client.setOptions(ClusterClientOptions.builder()
550563
<tbody>
551564
<tr>
552565
<td>Periodic cluster topology refresh</td>
553-
<td><code>en ablePeriodicRefresh</code></td>
566+
<td><code>enablePeriodicRefresh</code></td>
554567
<td><code>false</code></td>
555568
</tr>
556569
<tr>
@@ -2399,14 +2412,14 @@ independent connections to Redis.
23992412
Lettuce provides two levels of consistency; these are the rules for
24002413
Redis command sends:
24012414

2402-
Depending on the chosen consistency level:
2415+
#### Depending on the chosen consistency level
24032416

2404-
- **at-most-once execution**, i. e. no guaranteed execution
2417+
- **at-most-once execution**, i.e. no guaranteed execution
24052418

2406-
- **at-least-once execution**, i. e. guaranteed execution (with [some
2419+
- **at-least-once execution**, i.e. guaranteed execution (with [some
24072420
exceptions](#exceptions-to-at-least-once))
24082421

2409-
Always:
2422+
#### Always
24102423

24112424
- command ordering in the order of invocations
24122425

@@ -2602,9 +2615,44 @@ re-established, queued commands are re-sent for execution. While a
26022615
connection failure persists, issued commands are buffered.
26032616

26042617
To change into *at-most-once* consistency level, disable auto-reconnect
2605-
mode. Connections cannot be longer reconnected and thus no retries are
2606-
issued. Not successfully commands are canceled. New commands are
2607-
rejected.
2618+
mode. Connections can no longer be reconnected and thus no retries are
2619+
issued. Unsuccessful commands are canceled. New commands are rejected.
2620+
2621+
#### Controlling replay of commands in *at-lease-once* mode
2622+
2623+
!!! NOTE
2624+
This feature is only available since Lettuce 6.6
2625+
2626+
One can achieve a more fine-grained control over the commands that are
2627+
replayed after a reconnection by using the option to specify a filter
2628+
predicate. This option is part of the ClientOptions configuration. See
2629+
[Client Options](advanced-usage.md#client-options) for further reference.
2630+
2631+
``` java
2632+
Predicate<RedisCommand<?, ?, ?> > filter = cmd ->
2633+
cmd.getType().toString().equalsIgnoreCase("DECR");
2634+
2635+
client.setOptions(ClientOptions.builder()
2636+
.autoReconnect(true)
2637+
.replayFilter(filter)
2638+
.build());
2639+
```
2640+
2641+
The code above would filter out all `DECR` commands from being replayed
2642+
after a reconnection. Another, perhaps more popular example, would be:
2643+
2644+
``` java
2645+
Predicate<RedisCommand<?, ?, ?> > filter = cmd -> true;
2646+
2647+
client.setOptions(ClientOptions.builder()
2648+
.autoReconnect(true)
2649+
.replayFilter(filter)
2650+
.build());
2651+
```
2652+
2653+
... which disables any command replay, but still allows the driver to
2654+
re-connect, basically providing a way to have auto-reconnect without
2655+
auto-replay of commands.
26082656

26092657
### Clustered operations
26102658

src/main/java/io/lettuce/core/ClientOptions.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Iterator;
2626
import java.util.ServiceConfigurationError;
2727
import java.util.ServiceLoader;
28+
import java.util.function.Predicate;
2829

2930
import io.lettuce.core.api.StatefulConnection;
3031
import io.lettuce.core.internal.LettuceAssert;
@@ -34,6 +35,7 @@
3435
import io.lettuce.core.protocol.DecodeBufferPolicy;
3536
import io.lettuce.core.protocol.ProtocolVersion;
3637
import io.lettuce.core.protocol.ReadOnlyCommands;
38+
import io.lettuce.core.protocol.RedisCommand;
3739
import io.lettuce.core.resource.ClientResources;
3840
import reactor.core.publisher.Mono;
3941

@@ -49,6 +51,8 @@ public class ClientOptions implements Serializable {
4951

5052
public static final boolean DEFAULT_AUTO_RECONNECT = true;
5153

54+
public static final Predicate<RedisCommand<?, ?, ?>> DEFAULT_REPLAY_FILTER = (cmd) -> false;
55+
5256
public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;
5357

5458
public static final boolean DEFAULT_CANCEL_CMD_RECONNECT_FAIL = false;
@@ -91,6 +95,8 @@ public class ClientOptions implements Serializable {
9195

9296
private final boolean autoReconnect;
9397

98+
private final Predicate<RedisCommand<?, ?, ?>> replayFilter;
99+
94100
private final boolean cancelCommandsOnReconnectFailure;
95101

96102
private final DecodeBufferPolicy decodeBufferPolicy;
@@ -125,6 +131,7 @@ public class ClientOptions implements Serializable {
125131

126132
protected ClientOptions(Builder builder) {
127133
this.autoReconnect = builder.autoReconnect;
134+
this.replayFilter = builder.replayFilter;
128135
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
129136
this.decodeBufferPolicy = builder.decodeBufferPolicy;
130137
this.disconnectedBehavior = builder.disconnectedBehavior;
@@ -145,6 +152,7 @@ protected ClientOptions(Builder builder) {
145152

146153
protected ClientOptions(ClientOptions original) {
147154
this.autoReconnect = original.isAutoReconnect();
155+
this.replayFilter = original.getReplayFilter();
148156
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
149157
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
150158
this.disconnectedBehavior = original.getDisconnectedBehavior();
@@ -198,6 +206,8 @@ public static class Builder {
198206

199207
private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;
200208

209+
private Predicate<RedisCommand<?, ?, ?>> replayFilter = DEFAULT_REPLAY_FILTER;
210+
201211
private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
202212

203213
private DecodeBufferPolicy decodeBufferPolicy = DecodeBufferPolicies.ratio(DEFAULT_BUFFER_USAGE_RATIO);
@@ -245,6 +255,21 @@ public Builder autoReconnect(boolean autoReconnect) {
245255
return this;
246256
}
247257

258+
/**
259+
* When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when
260+
* the connection is reestablished after a disconnect. Returning <code>false</code> means the command will not be
261+
* filtered out and will be replayed. Defaults to replaying all queued commands.
262+
*
263+
* @param replayFilter a {@link Predicate} to filter commands to replay. Must not be {@code null}.
264+
* @see #DEFAULT_REPLAY_FILTER
265+
* @return {@code this}
266+
* @since 6.6
267+
*/
268+
public Builder replayFilter(Predicate<RedisCommand<?, ?, ?>> replayFilter) {
269+
this.replayFilter = replayFilter;
270+
return this;
271+
}
272+
248273
/**
249274
* Allows cancelling queued commands in case a reconnect fails.Defaults to {@code false}. See
250275
* {@link #DEFAULT_CANCEL_CMD_RECONNECT_FAIL}. <b>This flag is deprecated and should not be used as it can lead to race
@@ -526,13 +551,13 @@ public ClientOptions.Builder mutate() {
526551
Builder builder = new Builder();
527552

528553
builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
529-
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
530-
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
531-
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
532-
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
533-
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
534-
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
535-
.timeoutOptions(getTimeoutOptions());
554+
.replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy())
555+
.disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour())
556+
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
557+
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
558+
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
559+
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
560+
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
536561

537562
return builder;
538563
}
@@ -550,6 +575,16 @@ public boolean isAutoReconnect() {
550575
return autoReconnect;
551576
}
552577

578+
/**
579+
* Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns <code>true</code>
580+
* if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}.
581+
*
582+
* @return the currently set {@link Predicate} used to filter out commands to replay
583+
*/
584+
public Predicate<RedisCommand<?, ?, ?>> getReplayFilter() {
585+
return replayFilter;
586+
}
587+
553588
/**
554589
* If this flag is {@code true} any queued commands will be canceled when a reconnect fails within the activation sequence.
555590
* Default is {@code false}.

src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3434
import java.util.concurrent.atomic.AtomicLong;
3535
import java.util.function.Consumer;
36+
import java.util.function.Predicate;
3637
import java.util.function.Supplier;
3738

3839
import io.lettuce.core.ClientOptions;
@@ -81,6 +82,8 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle
8182

8283
private final Reliability reliability;
8384

85+
private final Predicate<RedisCommand<?, ?, ?>> replayFilter;
86+
8487
private final ClientOptions clientOptions;
8588

8689
private final ClientResources clientResources;
@@ -139,6 +142,7 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour
139142
this.clientOptions = clientOptions;
140143
this.clientResources = clientResources;
141144
this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
145+
this.replayFilter = clientOptions.getReplayFilter();
142146
this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
143147
this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
144148
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
@@ -343,6 +347,13 @@ private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
343347
return;
344348
}
345349

350+
if (replayFilter.test(command)) {
351+
if (debugEnabled) {
352+
logger.debug("{} writeToDisconnectedBuffer() Filtering out command {}", logPrefix(), command);
353+
}
354+
return;
355+
}
356+
346357
if (debugEnabled) {
347358
logger.debug("{} writeToDisconnectedBuffer() buffering (disconnected) command {}", logPrefix(), command);
348359
}
@@ -1033,10 +1044,16 @@ private void doComplete(Future<Void> future) {
10331044
private void potentiallyRequeueCommands(Channel channel, RedisCommand<?, ?, ?> sentCommand,
10341045
Collection<? extends RedisCommand<?, ?, ?>> sentCommands) {
10351046

1047+
// do not requeue commands that are done
10361048
if (sentCommand != null && sentCommand.isDone()) {
10371049
return;
10381050
}
10391051

1052+
// do not requeue commands that are to be filtered out
1053+
if (this.endpoint.replayFilter.test(sentCommand)) {
1054+
return;
1055+
}
1056+
10401057
if (sentCommands != null) {
10411058

10421059
boolean foundToSend = false;

src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ClusterNodeEndpointUnitTests {
5656
@BeforeEach
5757
void before() {
5858

59+
when(clientOptions.getReplayFilter()).thenReturn((cmd) -> false);
5960
when(clientOptions.getRequestQueueSize()).thenReturn(1000);
6061
when(clientOptions.getDisconnectedBehavior()).thenReturn(ClientOptions.DisconnectedBehavior.DEFAULT);
6162

src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import java.util.concurrent.CountDownLatch;
99
import java.util.concurrent.ExecutionException;
1010
import java.util.concurrent.TimeUnit;
11+
import java.util.function.Predicate;
1112

1213
import io.lettuce.core.TimeoutOptions;
14+
import io.lettuce.core.protocol.RedisCommand;
1315
import org.junit.jupiter.api.BeforeEach;
1416
import org.junit.jupiter.api.Tag;
1517
import org.junit.jupiter.api.Test;
@@ -372,6 +374,54 @@ void retryAfterConnectionIsDisconnected() throws Exception {
372374
verificationConnection.getStatefulConnection().close();
373375
}
374376

377+
@Test
378+
void retryAfterConnectionIsDisconnectedButFiltered() throws Exception {
379+
// Do not replay DECR commands after reconnect for some reason
380+
Predicate<RedisCommand<?, ?, ?>> filter = cmd -> cmd.getType().toString().equalsIgnoreCase("DECR");
381+
382+
client.setOptions(ClientOptions.builder().autoReconnect(true).replayFilter(filter)
383+
.timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build());
384+
385+
// needs to be increased on slow systems...perhaps...
386+
client.setDefaultTimeout(3, TimeUnit.SECONDS);
387+
388+
StatefulRedisConnection<String, String> connection = client.connect();
389+
RedisCommands<String, String> verificationConnection = client.connect().sync();
390+
391+
connection.sync().set(key, "1");
392+
393+
ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection);
394+
connectionWatchdog.setListenOnChannelInactive(false);
395+
396+
connection.async().quit();
397+
while (connection.isOpen()) {
398+
Delay.delay(Duration.ofMillis(100));
399+
}
400+
401+
assertThat(connection.async().incr(key).await(1, TimeUnit.SECONDS)).isFalse();
402+
assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
403+
assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
404+
405+
assertThat(verificationConnection.get("key")).isEqualTo("1");
406+
407+
assertThat(ConnectionTestUtil.getDisconnectedBuffer(connection).size()).isGreaterThan(0);
408+
assertThat(ConnectionTestUtil.getCommandBuffer(connection)).isEmpty();
409+
410+
connectionWatchdog.setListenOnChannelInactive(true);
411+
connectionWatchdog.scheduleReconnect();
412+
413+
while (!ConnectionTestUtil.getCommandBuffer(connection).isEmpty()
414+
|| !ConnectionTestUtil.getDisconnectedBuffer(connection).isEmpty()) {
415+
Delay.delay(Duration.ofMillis(10));
416+
}
417+
418+
assertThat(connection.sync().get(key)).isEqualTo("2");
419+
assertThat(verificationConnection.get(key)).isEqualTo("2");
420+
421+
connection.close();
422+
verificationConnection.getStatefulConnection().close();
423+
}
424+
375425
private Throwable getException(RedisFuture<?> command) {
376426
try {
377427
command.get();

0 commit comments

Comments
 (0)