Skip to content

Commit

Permalink
Add support for WAITAOF command (#3393)
Browse files Browse the repository at this point in the history
* #3344 Add support for new redis command WAITAOF

* PR Comments (#3344)

* Change casted type in builder (#3344)

* Fix test (#3344)

---------

Co-authored-by: komp15 <77535280+komp15@users.noreply.github.com>
Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
  • Loading branch information
3 people authored May 8, 2023
1 parent a916994 commit 8a5740d
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 7 deletions.
15 changes: 10 additions & 5 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
import java.util.*;
import java.util.stream.Collectors;

import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.resps.StreamConsumerFullInfo;
import redis.clients.jedis.resps.StreamFullInfo;
import redis.clients.jedis.resps.StreamGroupFullInfo;
import redis.clients.jedis.resps.LCSMatchResult.MatchedPosition;
import redis.clients.jedis.resps.LCSMatchResult.Position;
import redis.clients.jedis.resps.*;
import redis.clients.jedis.search.aggr.AggregationResult;
import redis.clients.jedis.timeseries.TSKeyedElements;
import redis.clients.jedis.timeseries.TSElement;
import redis.clients.jedis.timeseries.TSKeyValue;
import redis.clients.jedis.util.DoublePrecision;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.KeyValue;
Expand Down Expand Up @@ -421,6 +416,16 @@ public String toString() {
}
};

public static final Builder<KeyValue<Long, Long>> LONG_LONG_PAIR = new Builder<KeyValue<Long, Long>>() {
@Override
@SuppressWarnings("unchecked")
public KeyValue<Long, Long> build(Object data) {
if (data == null) return null;
List<Object> dataList = (List<Object>) data;
return new KeyValue<>(LONG.build(dataList.get(0)), LONG.build(dataList.get(1)));
}
};

public static final Builder<List<KeyValue<String, List<String>>>> KEYED_STRING_LIST_LIST
= new Builder<List<KeyValue<String, List<String>>>>() {
@Override
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.JedisClusterHashTag;
import redis.clients.jedis.util.KeyValue;

public class ClusterCommandObjects extends CommandObjects {

Expand Down Expand Up @@ -97,4 +98,10 @@ public final CommandObject<ScanResult<byte[]>> scan(byte[] cursor, ScanParams pa
public final CommandObject<Long> waitReplicas(int replicas, long timeout) {
throw new UnsupportedOperationException(CLUSTER_UNSUPPORTED_MESSAGE);
}

@Override
public CommandObject<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
throw new UnsupportedOperationException(CLUSTER_UNSUPPORTED_MESSAGE);
}

}
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -3132,6 +3132,18 @@ public final CommandObject<Long> waitReplicas(byte[] sampleKey, int replicas, lo
return new CommandObject<>(commandArguments(WAIT).add(replicas).add(timeout).processKey(sampleKey), BuilderFactory.LONG);
}

public CommandObject<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
return new CommandObject<>(commandArguments(WAITAOF).add(numLocal).add(numReplicas).add(timeout), BuilderFactory.LONG_LONG_PAIR);
}

public CommandObject<KeyValue<Long, Long>> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout) {
return new CommandObject<>(commandArguments(WAITAOF).add(numLocal).add(numReplicas).add(timeout).processKey(sampleKey), BuilderFactory.LONG_LONG_PAIR);
}

public CommandObject<KeyValue<Long, Long>> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout) {
return new CommandObject<>(commandArguments(WAITAOF).add(numLocal).add(numReplicas).add(timeout).processKey(sampleKey), BuilderFactory.LONG_LONG_PAIR);
}

public final CommandObject<Long> publish(String channel, String message) {
return new CommandObject<>(commandArguments(PUBLISH).add(channel).add(message), BuilderFactory.LONG);
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4389,6 +4389,13 @@ public long waitReplicas(final int replicas, final long timeout) {
return connection.getIntegerReply();
}

@Override
public KeyValue<Long, Long> waitAOF(long numLocal, long numReplicas, long timeout) {
checkIsInMultiOrPipeline();
connection.sendCommand(WAITAOF, toByteArray(numLocal), toByteArray(numReplicas), toByteArray(timeout));
return BuilderFactory.LONG_LONG_PAIR.build(connection.getOne());
}

@Override
public long pfadd(final byte[] key, final byte[]... elements) {
checkIsInMultiOrPipeline();
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,11 @@ public Response<Long> waitReplicas(String sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(String script, String sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down Expand Up @@ -2509,6 +2514,11 @@ public Response<Long> waitReplicas(byte[] sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(byte[] script, byte[] sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,11 @@ public Response<Long> waitReplicas(String sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(String script, String sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down Expand Up @@ -2460,6 +2465,11 @@ public Response<Long> waitReplicas(byte[] sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(byte[] script, byte[] sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down Expand Up @@ -4312,6 +4322,10 @@ public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}

public Response<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(numLocal, numReplicas, timeout));
}

public Response<List<String>> time() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.TIME), BuilderFactory.STRING_LIST));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public static enum Command implements ProtocolCommand {
XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM,
XAUTOCLAIM, XINFO, BITFIELD_RO, ROLE, FAILOVER, GEOSEARCH, GEOSEARCHSTORE, EVAL_RO, EVALSHA_RO,
LOLWUT, EXPIRETIME, PEXPIRETIME, FUNCTION, FCALL, FCALL_RO, LMPOP, BLMPOP, ZMPOP, BZMPOP,
COMMAND, LATENCY, @Deprecated STRALGO;
COMMAND, LATENCY, WAITAOF, @Deprecated STRALGO;

private final byte[] raw;

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/ShardedCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.Hashing;
import redis.clients.jedis.util.JedisClusterHashTag;
import redis.clients.jedis.util.KeyValue;

public class ShardedCommandObjects extends CommandObjects {

Expand Down Expand Up @@ -109,4 +110,9 @@ public final CommandObject<ScanResult<byte[]>> scan(byte[] cursor, ScanParams pa
public final CommandObject<Long> waitReplicas(int replicas, long timeout) {
throw new UnsupportedOperationException();
}

@Override
public CommandObject<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
throw new UnsupportedOperationException();
}
}
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,11 @@ public Response<Long> waitReplicas(String sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(String script, String sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down Expand Up @@ -2559,6 +2564,11 @@ public Response<Long> waitReplicas(byte[] sampleKey, int replicas, long timeout)
return appendCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public Response<KeyValue<Long, Long>> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Response<Object> eval(byte[] script, byte[] sampleKey) {
return appendCommand(commandObjects.eval(script, sampleKey));
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3414,6 +3414,16 @@ public long waitReplicas(byte[] sampleKey, int replicas, long timeout) {
return executeCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout));
}

@Override
public KeyValue<Long, Long> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout) {
return executeCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public KeyValue<Long, Long> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout) {
return executeCommand(commandObjects.waitAOF(sampleKey, numLocal, numReplicas, timeout));
}

@Override
public Object eval(String script, String sampleKey) {
return executeCommand(commandObjects.eval(script, sampleKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import java.util.List;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.util.KeyValue;

public interface SampleBinaryKeyedCommands {

long waitReplicas(byte[] sampleKey, int replicas, long timeout);

KeyValue<Long, Long> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout);

Object eval(byte[] script, byte[] sampleKey);

Object evalsha(byte[] sha1, byte[] sampleKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import java.util.List;
import redis.clients.jedis.Response;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.util.KeyValue;

public interface SampleBinaryKeyedPipelineCommands {

Response<Long> waitReplicas(byte[] sampleKey, int replicas, long timeout);

Response<KeyValue<Long, Long>> waitAOF(byte[] sampleKey, long numLocal, long numReplicas, long timeout);

Response<Object> eval(byte[] script, byte[] sampleKey);

Response<Object> evalsha(byte[] sha1, byte[] sampleKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import java.util.List;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.util.KeyValue;

public interface SampleKeyedCommands {

long waitReplicas(String sampleKey, int replicas, long timeout);

KeyValue<Long, Long> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout);

Object eval(String script, String sampleKey);

Object evalsha(String sha1, String sampleKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import java.util.List;
import redis.clients.jedis.Response;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.util.KeyValue;

public interface SampleKeyedPipelineCommands {

Response<Long> waitReplicas(String sampleKey, int replicas, long timeout);

Response<KeyValue<Long, Long>> waitAOF(String sampleKey, long numLocal, long numReplicas, long timeout);

Response<Object> eval(String script, String sampleKey);

Response<Object> evalsha(String sha1, String sampleKey);
Expand Down
17 changes: 16 additions & 1 deletion src/main/java/redis/clients/jedis/commands/ServerCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.LolwutParams;
import redis.clients.jedis.params.ShutdownParams;
import redis.clients.jedis.util.KeyValue;

public interface ServerCommands {

Expand Down Expand Up @@ -211,7 +212,7 @@ public interface ServerCommands {
String replicaofNoOne();

/**
* Syncrhonous replication of Redis as described here: http://antirez.com/news/66.
* Synchronous replication of Redis as described here: http://antirez.com/news/66.
* <p>
* Blocks until all the previous write commands are successfully transferred and acknowledged by
* at least the specified number of replicas. If the timeout, specified in milliseconds, is
Expand All @@ -226,6 +227,20 @@ public interface ServerCommands {
*/
long waitReplicas(int replicas, long timeout);

/**
* Blocks the current client until all the previous write commands are acknowledged as having been
* fsynced to the AOF of the local Redis and/or at least the specified number of replicas.
* <a href="https://redis.io/commands/waitaof/">Redis Documentation</a>
* @param numLocal Number of local instances that are required to acknowledge the sync (0 or 1),
* cannot be non-zero if the local Redis does not have AOF enabled
* @param numReplicas Number of replicas that are required to acknowledge the sync
* @param timeout Timeout in millis of the operation - if 0 timeout is unlimited. If the timeout is reached,
* the command returns even if the specified number of acknowledgments has not been met.
* @return KeyValue where Key is number of local Redises (0 or 1) that have fsynced to AOF all writes
* performed in the context of the current connection, and the value is the number of replicas that have acknowledged doing the same.
*/
KeyValue<Long, Long> waitAOF(long numLocal, long numReplicas, long timeout);

String lolwut();

String lolwut(LolwutParams lolwutParams);
Expand Down
13 changes: 13 additions & 0 deletions src/test/java/redis/clients/jedis/PipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,19 @@ public void waitReplicas() {
}
}

@Test
public void waitAof() {
Pipeline p = jedis.pipelined();
p.set("wait", "aof");
p.waitAOF(1L, 0L, 0L);
p.sync();

try (Jedis j = new Jedis(HostAndPorts.getRedisServers().get(4))) {
j.auth("foobared");
assertEquals("aof", j.get("wait"));
}
}

@Test
public void setGet() {
Pipeline p = jedis.pipelined();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.HostAndPorts;
import redis.clients.jedis.util.AssertUtil;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.SafeEncoder;

public class ControlCommandsTest extends JedisCommandsTestBase {
Expand Down Expand Up @@ -248,6 +249,11 @@ public void waitReplicas() {
assertEquals(1, jedis.waitReplicas(1, 100));
}

@Test
public void waitAof() {
assertEquals(KeyValue.of(0L, 0L), jedis.waitAOF(0L, 0L, 100L));
}

@Test
public void clientPause() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,16 @@ public void testWaitReplicas() {
List<Object> results = pipeline.syncAndReturnAll();
assertEquals(Long.valueOf(0), results.get(3));
}

@Test
public void testWaitAof() {
Pipeline pipeline = new Pipeline(c);
pipeline.set("x", "1");
pipeline.graphProfile("social", "CREATE (:Person {name:'a'})");
pipeline.graphProfile("g", "CREATE (:Person {name:'a'})");
pipeline.waitAOF(1L, 0L, 100L);
List<Object> results = pipeline.syncAndReturnAll();
assertEquals(0L, results.get(3));
}

}

0 comments on commit 8a5740d

Please sign in to comment.