Skip to content

Commit 78f40fb

Browse files
sokomishalovmp911de
authored andcommitted
Implement command listeners API #1382
Original pull request: #1424.
1 parent 7410f5d commit 78f40fb

File tree

12 files changed

+681
-1
lines changed

12 files changed

+681
-1
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.lettuce.core;
18+
19+
import io.lettuce.core.models.events.CommandFailedEvent;
20+
import io.lettuce.core.models.events.CommandStartedEvent;
21+
import io.lettuce.core.models.events.CommandSucceededEvent;
22+
23+
/**
24+
* @author Mikhael Sokolov
25+
*/
26+
public interface CommandListener {
27+
28+
/**
29+
* Listener for command started events.
30+
*
31+
* @param event the event
32+
*/
33+
default <K, V, T> void commandStarted(CommandStartedEvent<K, V, T> event) {
34+
}
35+
36+
/**
37+
* Listener for command completed events
38+
*
39+
* @param event the event
40+
*/
41+
default <K, V, T> void commandSucceeded(CommandSucceededEvent<K, V, T> event) {
42+
}
43+
44+
/**
45+
* Listener for command failure events
46+
*
47+
* @param event the event
48+
*/
49+
default <K, V, T> void commandFailed(CommandFailedEvent<K, V, T> event) {
50+
}
51+
52+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.lettuce.core;
18+
19+
import io.lettuce.core.models.events.CommandFailedEvent;
20+
import io.lettuce.core.models.events.CommandStartedEvent;
21+
import io.lettuce.core.models.events.CommandSucceededEvent;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Wraps multiple command listeners into one multicaster
27+
*
28+
* @author Mikhael Sokolov
29+
*/
30+
public class CommandListenerMulticaster implements CommandListener {
31+
private final List<CommandListener> listeners;
32+
33+
public CommandListenerMulticaster(List<CommandListener> listeners) {
34+
this.listeners = listeners;
35+
}
36+
37+
@Override
38+
public <K, V, T> void commandStarted(CommandStartedEvent<K, V, T> event) {
39+
for (CommandListener listener : listeners) {
40+
listener.commandStarted(event);
41+
}
42+
}
43+
44+
@Override
45+
public <K, V, T> void commandSucceeded(CommandSucceededEvent<K, V, T> event) {
46+
for (CommandListener listener : listeners) {
47+
listener.commandSucceeded(event);
48+
}
49+
}
50+
51+
@Override
52+
public <K, V, T> void commandFailed(CommandFailedEvent<K, V, T> event) {
53+
for (CommandListener listener : listeners) {
54+
listener.commandFailed(event);
55+
}
56+
}
57+
58+
public List<CommandListener> getListeners() {
59+
return listeners;
60+
}
61+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.lettuce.core;
18+
19+
import io.lettuce.core.internal.LettuceAssert;
20+
import io.lettuce.core.models.events.CommandFailedEvent;
21+
import io.lettuce.core.models.events.CommandStartedEvent;
22+
import io.lettuce.core.models.events.CommandSucceededEvent;
23+
import io.lettuce.core.output.CommandOutput;
24+
import io.lettuce.core.protocol.*;
25+
import io.lettuce.core.resource.ClientResources;
26+
import io.netty.buffer.ByteBuf;
27+
28+
import java.util.ArrayList;
29+
import java.util.Collection;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.CompletableFuture;
33+
34+
/**
35+
* Writer for command listeners.
36+
*
37+
* @author Mikhael Sokolov
38+
*/
39+
public class CommandListenerWriter implements RedisChannelWriter {
40+
41+
private final CommandListener listener;
42+
private final RedisChannelWriter delegate;
43+
44+
public CommandListenerWriter(RedisChannelWriter delegate, CommandListener listener) {
45+
this.delegate = delegate;
46+
this.listener = listener;
47+
}
48+
49+
/**
50+
* Check whether {@link ClientResources} is configured to listen commands.
51+
*
52+
* @param clientResources must not be {@code null}.
53+
* @return {@code true} if {@link ClientResources} are configured to listen commands.
54+
*/
55+
public static boolean isSupported(ClientResources clientResources) {
56+
LettuceAssert.notNull(clientResources, "ClientResources must not be null");
57+
58+
return !clientResources.commandListeners().isEmpty();
59+
}
60+
61+
62+
@Override
63+
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
64+
long now = System.currentTimeMillis();
65+
CommandStartedEvent<K, V, T> startedEvent = new CommandStartedEvent<>(command, now);
66+
listener.commandStarted(startedEvent);
67+
68+
return delegate.write(new RedisCommandListenerCommand<>(command, startedEvent.getContext(), now, listener));
69+
}
70+
71+
@Override
72+
public <K, V> Collection<RedisCommand<K, V, ?>> write(Collection<? extends RedisCommand<K, V, ?>> redisCommands) {
73+
List<RedisCommandListenerCommand<K, V, ?>> listenedCommands = new ArrayList<>();
74+
for (RedisCommand<K, V, ?> redisCommand : redisCommands) {
75+
long now = System.currentTimeMillis();
76+
CommandStartedEvent<K, V, ?> startedEvent = new CommandStartedEvent<>(redisCommand, now);
77+
listener.commandStarted(startedEvent);
78+
RedisCommandListenerCommand<K, V, ?> command = new RedisCommandListenerCommand<>(redisCommand, startedEvent.getContext(), now, listener);
79+
listenedCommands.add(command);
80+
}
81+
82+
return delegate.write(listenedCommands);
83+
}
84+
85+
@Override
86+
public void close() {
87+
delegate.close();
88+
}
89+
90+
@Override
91+
public CompletableFuture<Void> closeAsync() {
92+
return delegate.closeAsync();
93+
}
94+
95+
@Override
96+
@SuppressWarnings("deprecation")
97+
public void reset() {
98+
delegate.reset();
99+
}
100+
101+
@Override
102+
public void setConnectionFacade(ConnectionFacade connection) {
103+
delegate.setConnectionFacade(connection);
104+
}
105+
106+
@Override
107+
public void setAutoFlushCommands(boolean autoFlush) {
108+
delegate.setAutoFlushCommands(autoFlush);
109+
}
110+
111+
@Override
112+
public void flushCommands() {
113+
delegate.flushCommands();
114+
}
115+
116+
@Override
117+
public ClientResources getClientResources() {
118+
return delegate.getClientResources();
119+
}
120+
121+
122+
private static class RedisCommandListenerCommand<K, V, T> implements RedisCommand<K, V, T>, DecoratedCommand<K, V, T> {
123+
124+
private final RedisCommand<K, V, T> command;
125+
private final Map<String, ?> context;
126+
private final long startedAt;
127+
private final CommandListener listener;
128+
129+
public RedisCommandListenerCommand(RedisCommand<K, V, T> command, Map<String, ?> context, long startedAt, CommandListener listener) {
130+
this.command = command;
131+
this.context = context;
132+
this.startedAt = startedAt;
133+
this.listener = listener;
134+
}
135+
136+
@Override
137+
public RedisCommand<K, V, T> getDelegate() {
138+
return command;
139+
}
140+
141+
@Override
142+
public CommandOutput<K, V, T> getOutput() {
143+
return command.getOutput();
144+
}
145+
146+
@Override
147+
public void complete() {
148+
if (getOutput().hasError()) {
149+
CommandFailedEvent<K, V, T> failedEvent = new CommandFailedEvent<>(command, context, new RedisCommandExecutionException(getOutput().getError()));
150+
listener.commandFailed(failedEvent);
151+
} else {
152+
long now = System.currentTimeMillis();
153+
CommandSucceededEvent<K, V, T> succeededEvent = new CommandSucceededEvent<>(command, context, startedAt, now);
154+
listener.commandSucceeded(succeededEvent);
155+
}
156+
command.complete();
157+
}
158+
159+
@Override
160+
public void cancel() {
161+
command.cancel();
162+
}
163+
164+
@Override
165+
public CommandArgs<K, V> getArgs() {
166+
return command.getArgs();
167+
}
168+
169+
@Override
170+
public boolean completeExceptionally(Throwable throwable) {
171+
CommandFailedEvent<K, V, T> failedEvent = new CommandFailedEvent<>(command, context, throwable);
172+
listener.commandFailed(failedEvent);
173+
174+
return command.completeExceptionally(throwable);
175+
}
176+
177+
@Override
178+
public ProtocolKeyword getType() {
179+
return command.getType();
180+
}
181+
182+
@Override
183+
public void encode(ByteBuf buf) {
184+
command.encode(buf);
185+
}
186+
187+
@Override
188+
public boolean isCancelled() {
189+
return command.isCancelled();
190+
}
191+
192+
@Override
193+
public boolean isDone() {
194+
return command.isDone();
195+
}
196+
197+
@Override
198+
public void setOutput(CommandOutput<K, V, T> output) {
199+
command.setOutput(output);
200+
}
201+
}
202+
}

src/main/java/io/lettuce/core/RedisClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
273273
if (CommandExpiryWriter.isSupported(getOptions())) {
274274
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
275275
}
276+
if (CommandListenerWriter.isSupported(getResources())) {
277+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
278+
}
276279

277280
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);
278281
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
@@ -401,6 +404,9 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
401404
if (CommandExpiryWriter.isSupported(getOptions())) {
402405
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
403406
}
407+
if (CommandListenerWriter.isSupported(getResources())) {
408+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
409+
}
404410

405411
StatefulRedisPubSubConnectionImpl<K, V> connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);
406412

@@ -564,6 +570,9 @@ private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnect
564570
if (CommandExpiryWriter.isSupported(getOptions())) {
565571
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
566572
}
573+
if (CommandListenerWriter.isSupported(getResources())) {
574+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
575+
}
567576

568577
StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(writer, codec, timeout);
569578
ConnectionState state = connection.getConnectionState();

src/main/java/io/lettuce/core/cluster/RedisClusterClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,9 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
531531
if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
532532
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
533533
}
534+
if (CommandListenerWriter.isSupported(getResources())) {
535+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
536+
}
534537

535538
StatefulRedisConnectionImpl<K, V> connection = new StatefulRedisConnectionImpl<>(writer, endpoint, codec,
536539
getDefaultTimeout());
@@ -573,6 +576,9 @@ <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNode
573576
if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
574577
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
575578
}
579+
if (CommandListenerWriter.isSupported(getResources())) {
580+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
581+
}
576582

577583
StatefulRedisPubSubConnectionImpl<K, V> connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec,
578584
getDefaultTimeout());
@@ -612,6 +618,9 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
612618
if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
613619
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
614620
}
621+
if (CommandListenerWriter.isSupported(getResources())) {
622+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
623+
}
615624

616625
ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer,
617626
topologyRefreshScheduler);
@@ -695,6 +704,9 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con
695704
if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
696705
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
697706
}
707+
if (CommandListenerWriter.isSupported(getResources())) {
708+
writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners()));
709+
}
698710

699711
ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer,
700712
topologyRefreshScheduler);

0 commit comments

Comments
 (0)