diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index b1ca7455b9..15bb0122cd 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -8,10 +8,7 @@ import redis.clients.jedis.resps.*; import redis.clients.jedis.resps.LCSMatchResult.MatchedPosition; import redis.clients.jedis.resps.LCSMatchResult.Position; -import redis.clients.jedis.util.DoublePrecision; -import redis.clients.jedis.util.JedisByteHashMap; -import redis.clients.jedis.util.KeyValue; -import redis.clients.jedis.util.SafeEncoder; +import redis.clients.jedis.util.*; public final class BuilderFactory { @@ -1809,6 +1806,112 @@ public String toString() { } }; + public static final Builder> STREAM_ENTRY_BINARY_LIST = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + if (null == data) { + return null; + } + List> objectList = (List>) data; + + List responses = new ArrayList<>(objectList.size() / 2); + if (objectList.isEmpty()) { + return responses; + } + + for (ArrayList res : objectList) { + if (res == null) { + responses.add(null); + continue; + } + String entryIdString = SafeEncoder.encode((byte[]) res.get(0)); + StreamEntryID entryID = new StreamEntryID(entryIdString); + List hash = (List) res.get(1); + if (hash == null) { + responses.add(new StreamEntryBinary(entryID, null)); + continue; + } + + Iterator hashIterator = hash.iterator(); + Map map = new JedisByteHashMap(); + while (hashIterator.hasNext()) { + map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next())); + } + responses.add(new StreamEntryBinary(entryID, map)); + } + + return responses; + } + + @Override + public String toString() { + return "List"; + } + }; + + public static final Builder>> STREAM_READ_BINARY_MAP_RESPONSE + = new Builder>>() { + @Override + @SuppressWarnings("unchecked") + public Map> build(Object data) { + if (data == null) return null; + List list = (List) data; + if (list.isEmpty()) return Collections.emptyMap(); + + JedisByteMap> result = new JedisByteMap<>(); + if (list.get(0) instanceof KeyValue) { + ((List) list).forEach(kv -> result.put(BINARY.build(kv.getKey()), STREAM_ENTRY_BINARY_LIST.build(kv.getValue()))); + return result; + } else { + for (Object anObj : list) { + List streamObj = (List) anObj; + byte[] streamKey = (byte[]) streamObj.get(0); + List streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1)); + result.put(streamKey, streamEntries); + } + return result; + } + } + + @Override + public String toString() { + return "Map>"; + } + }; + + public static final Builder>>> STREAM_READ_BINARY_RESPONSE + = new Builder>>>() { + @Override + @SuppressWarnings("unchecked") + public List>> build(Object data) { + if (data == null) return null; + List list = (List) data; + if (list.isEmpty()) return Collections.emptyList(); + + if (list.get(0) instanceof KeyValue) { + return ((List) list).stream() + .map(kv -> new KeyValue<>(BINARY.build(kv.getKey()), + STREAM_ENTRY_BINARY_LIST.build(kv.getValue()))) + .collect(Collectors.toList()); + } else { + List>> result = new ArrayList<>(list.size()); + for (Object anObj : list) { + List streamObj = (List) anObj; + byte[] streamKey = BINARY.build(streamObj.get(0)); + List streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1)); + result.add(KeyValue.of(streamKey, streamEntries)); + } + return result; + } + } + + @Override + public String toString() { + return "List>>"; + } + }; + private static final List BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS = Arrays.asList(STRING, LONG, DOUBLE); diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 6588303f14..ea4930f894 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2881,6 +2881,11 @@ public final CommandObject>> xreadGroupAsMap( return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE); } + /** + * @deprecated As of Jedis 6.1.0, replaced by {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry parsing. + */ + @Deprecated public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); for (Map.Entry entry : streams) { @@ -2892,6 +2897,35 @@ public final CommandObject> xread(XReadParams xReadParams, Map.Entr return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST); } + public final CommandObject>>> xreadBinary( + XReadParams xReadParams, Map.Entry... streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + for (Map.Entry entry : streams) { + args.key(entry.getKey()); + } + for (Map.Entry entry : streams) { + args.add(entry.getValue()); + } + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE); + } + + public final CommandObject>> xreadBinaryAsMap( + XReadParams xReadParams, Map.Entry... streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + for (Map.Entry entry : streams) { + args.key(entry.getKey()); + } + for (Map.Entry entry : streams) { + args.add(entry.getValue()); + } + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE); + } + + /** + * @deprecated As of Jedis 6.1.0, use {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated public final CommandObject> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREADGROUP) @@ -2905,6 +2939,78 @@ public final CommandObject> xreadGroup(byte[] groupName, byte[] con } return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST); } + + public final CommandObject>>> xreadGroupBinary( + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + Map.Entry... streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + for (Map.Entry entry : streams) { + args.key(entry.getKey()); + } + for (Map.Entry entry : streams) { + args.add(entry.getValue()); + } + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE); + } + + public final CommandObject>> xreadGroupBinaryAsMap( + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + Map.Entry... streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + for (Map.Entry entry : streams) { + args.key(entry.getKey()); + } + for (Map.Entry entry : streams) { + args.add(entry.getValue()); + } + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE); + } + + public final CommandObject>>> xreadBinary( + XReadParams xReadParams, Map streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE); + } + + public final CommandObject>> xreadBinaryAsMap( + XReadParams xReadParams, Map streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE); + } + + public final CommandObject>>> xreadGroupBinary( + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + Map streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE); + } + + public final CommandObject>> xreadGroupBinaryAsMap( + byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + Map streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE); + } // Stream commands // Scripting commands diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index e47ea960b2..e19a4fa619 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -1172,7 +1172,7 @@ public long hsetex(byte[] key, HSetExParams params, byte[] field, byte[] value) checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.hsetex(key, params, field, value)); } - + @Override public long hsetex(byte[] key, HSetExParams params, Map hash){ checkIsInMultiOrPipeline(); @@ -1200,13 +1200,13 @@ public List hgetex(byte[] key, HGetExParams params, byte[]... fields){ checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.hgetex(key, params, fields)); } - + @Override public List hgetdel(byte[] key, byte[]... fields){ checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.hgetdel(key, fields)); } - + /** * Set the specified hash field to the specified value if the field not exists. Time * complexity: O(1) @@ -4770,12 +4770,25 @@ public List hpersist(byte[] key, byte[]... fields) { return connection.executeCommand(commandObjects.hpersist(key, fields)); } + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry + * parsing. + */ + @Deprecated @Override public List xread(XReadParams xReadParams, Entry... streams) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xread(xReadParams, streams)); } + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated @Override public List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Entry... streams) { @@ -4783,6 +4796,34 @@ public List xreadGroup(byte[] groupName, byte[] consumer, return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public List>> xreadBinary(XReadParams xReadParams, + Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadBinary(xReadParams, streams)); + } + + @Override + public Map> xreadBinaryAsMap(XReadParams xReadParams, + Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams)); + } + + @Override + public List>> xreadGroupBinary(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams)); + } + + @Override + public Map> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public byte[] xadd(final byte[] key, final XAddParams params, final Map hash) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/PipeliningBase.java b/src/main/java/redis/clients/jedis/PipeliningBase.java index 728c99c109..f7b974f552 100644 --- a/src/main/java/redis/clients/jedis/PipeliningBase.java +++ b/src/main/java/redis/clients/jedis/PipeliningBase.java @@ -3359,17 +3359,56 @@ public Response> xinfoConsumers(byte[] key, byte[] group) { return appendCommand(commandObjects.xinfoConsumers(key, group)); } + /** + * @deprecated As of Jedis 6.1.0, use {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry + * parsing. + */ + @Deprecated @Override - public Response> xread(XReadParams xReadParams, Map.Entry... streams) { + public Response> xread(XReadParams xReadParams, + Map.Entry... streams) { return appendCommand(commandObjects.xread(xReadParams, streams)); } + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated @Override public Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Response>>> xreadBinary(XReadParams xReadParams, + Map streams) { + return appendCommand(commandObjects.xreadBinary(xReadParams, streams)); + } + + @Override + public Response>> xreadBinaryAsMap(XReadParams xReadParams, + Map streams) { + return appendCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams)); + } + + @Override + public Response>>> xreadGroupBinary(byte[] groupName, + byte[] consumer, XReadGroupParams xReadGroupParams, Map streams) { + return appendCommand( + commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams)); + } + + @Override + public Response>> xreadGroupBinaryAsMap(byte[] groupName, + byte[] consumer, XReadGroupParams xReadGroupParams, Map streams) { + return appendCommand( + commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public Response set(byte[] key, byte[] value) { return appendCommand(commandObjects.set(key, value)); diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 4b2e38faac..e3960862fa 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -1548,7 +1548,7 @@ public byte[] hget(byte[] key, byte[] field) { public List hgetex(byte[] key, HGetExParams params, byte[]... fields) { return executeCommand(commandObjects.hgetex(key, params, fields)); } - + @Override public List hgetdel(byte[] key, byte[]... fields) { return executeCommand(commandObjects.hgetdel(key, fields)); @@ -3451,14 +3451,55 @@ public List xinfoConsumers(byte[] key, byte[] group) { return executeCommand(commandObjects.xinfoConsumers(key, group)); } + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry + * parsing. + */ + @Deprecated @Override public List xread(XReadParams xReadParams, Map.Entry... streams) { return executeCommand(commandObjects.xread(xReadParams, streams)); } + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated @Override - public List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { - return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); + public List xreadGroup(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map.Entry... streams) { + return executeCommand( + commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); + } + + @Override + public List>> xreadBinary(XReadParams xReadParams, + Map streams) { + return executeCommand(commandObjects.xreadBinary(xReadParams, streams)); + } + + @Override + public Map> xreadBinaryAsMap(XReadParams xReadParams, + Map streams) { + return executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams)); + } + + @Override + public List>> xreadGroupBinary(byte[] groupName, + byte[] consumer, XReadGroupParams xReadGroupParams, Map streams) { + return executeCommand( + commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams)); + } + + @Override + public Map> xreadGroupBinaryAsMap(byte[] groupName, + byte[] consumer, XReadGroupParams xReadGroupParams, Map streams) { + return executeCommand( + commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams)); } // Stream commands diff --git a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java index 01dec14d2d..5db025ef2d 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java @@ -3,7 +3,9 @@ import java.util.List; import java.util.Map; +import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.*; +import redis.clients.jedis.resps.StreamEntryBinary; public interface StreamBinaryCommands { @@ -74,9 +76,64 @@ List xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName, List xinfoConsumers(byte[] key, byte[] group); + /** + * @deprecated As of Jedis 6.1.0, replaced by {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry parsing. + */ + @Deprecated List xread(XReadParams xReadParams, Map.Entry... streams); + /** + * @deprecated As of Jedis 6.1.0, use {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams); + /** + * Read from one or more streams. + * @param xReadParams {@link XReadParams} + * @param streams Map of stream name and ID to read from. + * @return List of entries. Each entry in the list is a pair of stream name and the entries + * reported for that key. + */ + List>> xreadBinary(XReadParams xReadParams, + Map streams); + + /** + * Read from one or more streams and return a map of stream name to list of entries. + * @param xReadParams {@link XReadParams} + * @param streams Map of stream name and ID to read from. + * @return Map of stream name to list of entries. key is the stream name and value is the list of + * entries reported for that key. + */ + Map> xreadBinaryAsMap(XReadParams xReadParams, + Map streams); + + /** + * Read from one or more streams as a consumer group. + * @param groupName Consumer group name. + * @param consumer Consumer name. + * @param xReadGroupParams {@link XReadGroupParams} + * @param streams Map of stream name and ID to read from. + * @return List of entries. Each entry in the list is a pair of stream name and the entries + * reported for that key. + */ + List>> xreadGroupBinary(byte[] groupName, + byte[] consumer, XReadGroupParams xReadGroupParams, Map streams); + + /** + * Read from one or more streams as a consumer group and return a map of stream name to list of + * entries. + * @param groupName Consumer group name. + * @param consumer Consumer name. + * @param xReadGroupParams {@link XReadGroupParams} + * @param streams Map of stream name and ID to read from. + * @return Map of stream name to list of entries. key is the stream name and value is the list of + * entries reported for that key. + */ + Map> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java index 8198443009..3cda8f079a 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java @@ -4,7 +4,9 @@ import java.util.Map; import redis.clients.jedis.Response; +import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.*; +import redis.clients.jedis.resps.StreamEntryBinary; public interface StreamPipelineBinaryCommands { @@ -75,9 +77,66 @@ Response> xautoclaimJustId(byte[] key, byte[] groupName, byte[] con Response> xinfoConsumers(byte[] key, byte[] group); + /** + * @deprecated As of Jedis 6.1.0, use {@link #xreadBinary(XReadParams, Map)} or + * {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry + * parsing. + */ + @Deprecated Response> xread(XReadParams xReadParams, Map.Entry... streams); + /** + * @deprecated As of Jedis 6.1.0, use + * {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or + * {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead. + */ + @Deprecated Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams); + + /** + * Read from one or more streams. + * + * @param xReadParams {@link XReadParams} + * @param streams Map of stream name and ID to read from. + * @return List of entries. Each entry in the list is a pair of stream name and the entries reported for that key. + */ + Response>>> xreadBinary(XReadParams xReadParams, + Map streams); + + /** + * Read from one or more streams and return a map of stream name to list of entries. + * + * @param xReadParams {@link XReadParams} + * @param streams Map of stream name and ID to read from. + * @return Map of stream name to list of entries. + */ + Response>> xreadBinaryAsMap(XReadParams xReadParams, + Map streams); + + /** + * Read from one or more streams using a consumer group. + * + * @param groupName Consumer group name + * @param consumer Consumer name + * @param xReadGroupParams {@link XReadGroupParams} + * @param streams Map of stream name and ID to read from. + * @return List of entries. Each entry in the list is a pair of stream name and the entries reported for that key. + */ + Response>>> xreadGroupBinary(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map streams); + + /** + * Read from one or more streams using a consumer group and return a map of stream name to list of entries. + * + * @param groupName Consumer group name + * @param consumer Consumer name + * @param xReadGroupParams {@link XReadGroupParams} + * @param streams Map of stream name and ID to read from. + * @return Map of stream name to list of entries. + */ + Response>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java new file mode 100644 index 0000000000..24f0b5f5fb --- /dev/null +++ b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java @@ -0,0 +1,42 @@ +package redis.clients.jedis.resps; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import redis.clients.jedis.StreamEntryID; + +public class StreamEntryBinary implements Serializable { + + private static final long serialVersionUID = 1L; + + private StreamEntryID id; + private Map fields; + + public StreamEntryBinary(StreamEntryID id, Map fields) { + this.id = id; + this.fields = fields; + } + + public StreamEntryID getID() { + return id; + } + + public Map getFields() { + return fields; + } + + @Override + public String toString() { + return id + " " + fields; + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.writeUnshared(this.id); + out.writeUnshared(this.fields); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + this.id = (StreamEntryID) in.readUnshared(); + this.fields = (Map) in.readUnshared(); + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/util/JedisByteMap.java b/src/main/java/redis/clients/jedis/util/JedisByteMap.java new file mode 100644 index 0000000000..5dfff673da --- /dev/null +++ b/src/main/java/redis/clients/jedis/util/JedisByteMap.java @@ -0,0 +1,142 @@ +package redis.clients.jedis.util; + +import java.io.Serializable; +import java.util.*; + +public class JedisByteMap implements Map, Cloneable, Serializable { + private static final long serialVersionUID = -6971431362627219416L; + private final Map internalMap = new HashMap<>(); + + @Override + public void clear() { + internalMap.clear(); + } + + @Override + public boolean containsKey(Object key) { + if (key instanceof byte[]) return internalMap.containsKey(new ByteArrayWrapper((byte[]) key)); + return internalMap.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return internalMap.containsValue(value); + } + + @Override + public Set> entrySet() { + Iterator> iterator = internalMap.entrySet() + .iterator(); + HashSet> hashSet = new HashSet<>(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + hashSet.add(new JedisByteEntry(entry.getKey().data, entry.getValue())); + } + return hashSet; + } + + @Override + public T get(Object key) { + if (key instanceof byte[]) return internalMap.get(new ByteArrayWrapper((byte[]) key)); + return internalMap.get(key); + } + + @Override + public boolean isEmpty() { + return internalMap.isEmpty(); + } + + @Override + public Set keySet() { + Set keySet = new HashSet<>(); + Iterator iterator = internalMap.keySet().iterator(); + while (iterator.hasNext()) { + keySet.add(iterator.next().data); + } + return keySet; + } + + @Override + public T put(byte[] key, T value) { + return internalMap.put(new ByteArrayWrapper(key), value); + } + + @Override + @SuppressWarnings("unchecked") + public void putAll(Map m) { + Iterator iterator = m.entrySet().iterator(); + while (iterator.hasNext()) { + Entry next = (Entry) iterator + .next(); + internalMap.put(new ByteArrayWrapper(next.getKey()), next.getValue()); + } + } + + @Override + public T remove(Object key) { + if (key instanceof byte[]) return internalMap.remove(new ByteArrayWrapper((byte[]) key)); + return internalMap.remove(key); + } + + @Override + public int size() { + return internalMap.size(); + } + + @Override + public Collection values() { + return internalMap.values(); + } + + private static final class ByteArrayWrapper implements Serializable { + private final byte[] data; + + public ByteArrayWrapper(byte[] data) { + if (data == null) { + throw new NullPointerException(); + } + this.data = data; + } + + @Override + public boolean equals(Object other) { + if (other == null) return false; + if (other == this) return true; + if (!(other instanceof ByteArrayWrapper)) return false; + + return Arrays.equals(data, ((ByteArrayWrapper) other).data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } + } + + private static final class JedisByteEntry implements Entry { + private final byte[] key; + private T value; + + public JedisByteEntry(byte[] key, T value) { + this.key = key; + this.value = value; + } + + @Override + public byte[] getKey() { + return this.key; + } + + @Override + public T getValue() { + return this.value; + } + + @Override + public T setValue(T value) { + this.value = value; + return value; + } + + } +} diff --git a/src/test/java/redis/clients/jedis/collections/JedisByteHashMapTest.java b/src/test/java/redis/clients/jedis/collections/JedisByteHashMapTest.java index f89da29b34..08654dc89b 100644 --- a/src/test/java/redis/clients/jedis/collections/JedisByteHashMapTest.java +++ b/src/test/java/redis/clients/jedis/collections/JedisByteHashMapTest.java @@ -13,6 +13,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import redis.clients.jedis.util.JedisByteHashMap; +import redis.clients.jedis.util.JedisByteMap; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -21,6 +22,7 @@ public class JedisByteHashMapTest { private static JedisByteHashMap map = new JedisByteHashMap(); + private static JedisByteMap map2 = new JedisByteMap<>(); private byte[][] keys = { { 'k', 'e', 'y', '1' }, { 'k', 'e', 'y', '2' }, { 'k', 'e', 'y', '3' } }; private byte[][] vals = { { 'v', 'a', 'l', '1' }, { 'v', 'a', 'l', '2' }, { 'v', 'a', 'l', '3' } }; @@ -28,6 +30,7 @@ public class JedisByteHashMapTest { @BeforeEach public void before() throws Exception { map.clear(); + map2.clear(); } private boolean arrayContainsKey(byte[][] arr, byte[] key) { @@ -131,4 +134,73 @@ public void serialize() throws Exception { assertTrue(entrySetSame(map.entrySet(), mapRead.entrySet())); } + + @Test + public void map2Operations() { + // put + map2.put(keys[0], vals[0]); + assertEquals(1, map2.size()); + + // putAll + Map kvMap = new HashMap<>(); + kvMap.put(keys[1], vals[1]); + kvMap.put(keys[2], vals[2]); + map2.putAll(kvMap); + assertEquals(3, map2.size()); + + // containsKey + assertTrue(map2.containsKey(keys[0])); + + // containsValue + assertTrue(map2.containsValue(vals[0])); + + // entrySet + Set> entries = map2.entrySet(); + assertEquals(3, entries.size()); + for (Map.Entry entry : entries) { + assertTrue(arrayContainsKey(keys, entry.getKey())); + assertTrue(arrayContainsKey(vals, entry.getValue())); + } + + // get + assertArrayEquals(vals[0], map2.get(keys[0])); + + // isEmpty + assertFalse(map2.isEmpty()); + + // keySet + for (byte[] key : map2.keySet()) { + assertTrue(arrayContainsKey(keys, key)); + } + + // values + for (byte[] value : map2.values()) { + assertTrue(arrayContainsKey(vals, value)); + } + + // remove + map2.remove(keys[0]); + assertEquals(2, map2.size()); + + // clear + map2.clear(); + assertEquals(0, map2.size()); + } + + @Test + public void serialize2() throws Exception { + for (int i = 0; i < keys.length; i++) { + map2.put(keys[i], vals[i]); + } + + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + ObjectOutputStream objOut = new ObjectOutputStream(byteOut); + objOut.writeObject(map2); + + ByteArrayInputStream byteIn = new ByteArrayInputStream(byteOut.toByteArray()); + ObjectInputStream objIn = new ObjectInputStream(byteIn); + JedisByteMap mapRead = (JedisByteMap) objIn.readObject(); + + assertTrue(entrySetSame(map2.entrySet(), mapRead.entrySet())); + } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java new file mode 100644 index 0000000000..e53b4d87ba --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java @@ -0,0 +1,256 @@ +package redis.clients.jedis.commands.jedis; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.resps.StreamEntryBinary; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; +import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; + +/** + * Test replicated from {@link redis.clients.jedis.commands.unified.StreamsBinaryCommandsTestBase} + * but adapted to use Jedis commands. Note: Consider merging with the unified test class if + * possible. e.g., by using a common base class + */ +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class StreamsBinaryCommandsTest extends JedisCommandsTestBase { + + protected static final byte[] STREAM_KEY_1 = "{binary-stream}-1".getBytes(); + protected static final byte[] STREAM_KEY_2 = "{binary-stream}-2".getBytes(); + protected static final byte[] GROUP_NAME = "group-1".getBytes(); + protected static final byte[] CONSUMER_NAME = "consumer-1".getBytes(); + + protected static final byte[] FIELD_KEY_1 = "binary-field-1".getBytes(); + // Test with invalid UTF-8 characters + protected static final byte[] BINARY_VALUE_1 = new byte[] { 0x00, 0x01, 0x02, 0x03, (byte) 0xFF }; + + protected static final byte[] FIELD_KEY_2 = "binary-field-1".getBytes(); + protected static final byte[] BINARY_VALUE_2 = "binary-value-2".getBytes(); + protected static final Map HASH_1 = singletonMap(FIELD_KEY_1, BINARY_VALUE_1); + protected static final Map HASH_2 = singletonMap(FIELD_KEY_2, BINARY_VALUE_2); + + protected static final List stream1Entries = new ArrayList<>(); + protected static final List stream2Entries = new ArrayList<>(); + + static { + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-1"), HASH_1)); + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-3"), HASH_2)); + + stream2Entries.add(new StreamEntryBinary(new StreamEntryID("0-2"), HASH_1)); + } + + public StreamsBinaryCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + /** + * Creates a map of stream keys to StreamEntryID objects. + * @param streamOffsets Array of stream key and offset pairs + * @return Map of stream keys to StreamEntryID objects + */ + public static Map offsets(Object... streamOffsets) { + if (streamOffsets.length % 2 != 0) { + throw new IllegalArgumentException("Stream offsets must be provided as key-value pairs"); + } + + Map result = new HashMap<>(); + for (int i = 0; i < streamOffsets.length; i += 2) { + byte[] key = (byte[]) streamOffsets[i]; + Object value = streamOffsets[i + 1]; + + StreamEntryID id; + if (value instanceof String) { + id = new StreamEntryID((String) value); + } else if (value instanceof StreamEntryID) { + id = (StreamEntryID) value; + } else { + throw new IllegalArgumentException("Offset must be a String or StreamEntryID"); + } + + result.put(key, id); + } + + return result; + } + + @BeforeEach + public void setUpTestStream() { + jedis.del(STREAM_KEY_1); + jedis.del(STREAM_KEY_2); + try { + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + try { + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + } + + @Test + public void xreadBinaryNoEntries() { + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertNull(actualEntries); + } + + @Test + public void xreadBinary() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryCount() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams().count(1), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries.subList(0, 1))); + } + + @Test + public void xreadBinaryAsMapNoEntries() { + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertNull(actualEntries); + } + + @Test + public void xreadBinaryAsMap() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(1)); + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryAsMapCount() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams().count(1), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(1)); + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries.subList(0, 1))); + } + + @Test + public void xreadBinaryAsMapWithMultipleStreams() { + + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0", STREAM_KEY_2, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + + @Test + public void xreadGroupBinary() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + // verify the result contains entries from one stream + // and is under the expected stream key + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMap() { + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadGroupBinaryAsMap(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + assertThat(actualEntries.entrySet(), hasSize(1)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMapMultipleStreams() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadGroupBinaryAsMap(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY, STREAM_KEY_2, + XREADGROUP_UNDELIVERED_ENTRY)); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java new file mode 100644 index 0000000000..816dd8e6ac --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java @@ -0,0 +1,255 @@ +package redis.clients.jedis.commands.unified; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.resps.StreamEntryBinary; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; +import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; + +public abstract class StreamsBinaryCommandsTestBase extends UnifiedJedisCommandsTestBase { + + protected static final byte[] STREAM_KEY_1 = "{binary-stream}-1".getBytes(); + protected static final byte[] STREAM_KEY_2 = "{binary-stream}-2".getBytes(); + protected static final byte[] GROUP_NAME = "group-1".getBytes(); + protected static final byte[] CONSUMER_NAME = "consumer-1".getBytes(); + + protected static final byte[] FIELD_KEY_1 = "binary-field-1".getBytes(); + // Test with invalid UTF-8 characters + protected static final byte[] BINARY_VALUE_1 = new byte[] { 0x00, 0x01, 0x02, 0x03, (byte) 0xFF }; + + protected static final byte[] FIELD_KEY_2 = "binary-field-1".getBytes(); + protected static final byte[] BINARY_VALUE_2 = "binary-value-2".getBytes(); + protected static final Map HASH_1 = singletonMap(FIELD_KEY_1, BINARY_VALUE_1); + protected static final Map HASH_2 = singletonMap(FIELD_KEY_2, BINARY_VALUE_2); + + protected static final List stream1Entries = new ArrayList<>(); + protected static final List stream2Entries = new ArrayList<>(); + + static { + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-1"), HASH_1)); + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-3"), HASH_2)); + + stream2Entries.add(new StreamEntryBinary(new StreamEntryID("0-2"), HASH_1)); + } + + public StreamsBinaryCommandsTestBase(RedisProtocol protocol) { + super(protocol); + } + + /** + * Creates a map of stream keys to StreamEntryID objects. + * @param streamOffsets Array of stream key and offset pairs + * @return Map of stream keys to StreamEntryID objects + */ + public static Map offsets(Object... streamOffsets) { + if (streamOffsets.length % 2 != 0) { + throw new IllegalArgumentException("Stream offsets must be provided as key-value pairs"); + } + + Map result = new HashMap<>(); + for (int i = 0; i < streamOffsets.length; i += 2) { + byte[] key = (byte[]) streamOffsets[i]; + Object value = streamOffsets[i + 1]; + + StreamEntryID id; + if (value instanceof String) { + id = new StreamEntryID((String) value); + } else if (value instanceof StreamEntryID) { + id = (StreamEntryID) value; + } else { + throw new IllegalArgumentException("Offset must be a String or StreamEntryID"); + } + + result.put(key, id); + } + + return result; + } + + @BeforeEach + public void setUp() { + setUpTestClient(); + setUpTestStream(); + } + + protected void setUpTestClient() { + } + + public void setUpTestStream() { + jedis.del(STREAM_KEY_1); + jedis.del(STREAM_KEY_2); + try { + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + try { + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + } + + @Test + public void xreadBinaryNoEntries() { + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertNull(actualEntries); + } + + @Test + public void xreadBinary() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryCount() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadBinary( + XReadParams.xReadParams().count(1), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries.subList(0, 1))); + } + + @Test + public void xreadBinaryAsMapNoEntries() { + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertNull(actualEntries); + } + + @Test + public void xreadBinaryAsMap() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(1)); + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryAsMapCount() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams().count(1), offsets(STREAM_KEY_1, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(1)); + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries.subList(0, 1))); + } + + @Test + public void xreadBinaryAsMapWithMultipleStreams() { + + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0", STREAM_KEY_2, "0-0")); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + + @Test + public void xreadGroupBinary() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + List>> actualEntries = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + // verify the result contains entries from one stream + // and is under the expected stream key + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMap() { + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadGroupBinaryAsMap(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + assertThat(actualEntries.entrySet(), hasSize(1)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMapMultipleStreams() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Map> actualEntries = jedis.xreadGroupBinaryAsMap(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY, STREAM_KEY_2, + XREADGROUP_UNDELIVERED_ENTRY)); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java new file mode 100644 index 0000000000..e651c1a4c1 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterStreamsBinaryCommandsTest.java @@ -0,0 +1,28 @@ +package redis.clients.jedis.commands.unified.cluster; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.commands.unified.StreamsBinaryCommandsTestBase; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class ClusterStreamsBinaryCommandsTest extends StreamsBinaryCommandsTestBase { + + public ClusterStreamsBinaryCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + @Override + protected void setUpTestClient() { + jedis = ClusterCommandsTestHelper.getCleanCluster(protocol); + } + + @AfterEach + public void tearDown() { + jedis.close(); + ClusterCommandsTestHelper.clearClusterData(); + } + +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java new file mode 100644 index 0000000000..c22559e7a2 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/BinaryStreamsPipelineCommandsTest.java @@ -0,0 +1,224 @@ +package redis.clients.jedis.commands.unified.pipeline; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.Response; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.resps.StreamEntryBinary; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; +import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class BinaryStreamsPipelineCommandsTest extends PipelineCommandsTestBase { + protected static final byte[] STREAM_KEY_1 = "{binary-stream}-1".getBytes(); + protected static final byte[] STREAM_KEY_2 = "{binary-stream}-2".getBytes(); + protected static final byte[] GROUP_NAME = "group-1".getBytes(); + protected static final byte[] CONSUMER_NAME = "consumer-1".getBytes(); + + protected static final byte[] FIELD_KEY_1 = "binary-field-1".getBytes(); + protected static final byte[] BINARY_VALUE_1 = new byte[] { 0x00, 0x01, 0x02, 0x03, (byte) 0xFF }; + + protected static final byte[] FIELD_KEY_2 = "binary-field-1".getBytes(); + protected static final byte[] BINARY_VALUE_2 = "binary-value-2".getBytes(); + protected static final Map HASH_1 = singletonMap(FIELD_KEY_1, BINARY_VALUE_1); + protected static final Map HASH_2 = singletonMap(FIELD_KEY_2, BINARY_VALUE_2); + + protected static final List stream1Entries = new ArrayList<>(); + protected static final List stream2Entries = new ArrayList<>(); + + static { + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-1"), HASH_1)); + stream1Entries.add(new StreamEntryBinary(new StreamEntryID("0-3"), HASH_2)); + + stream2Entries.add(new StreamEntryBinary(new StreamEntryID("0-2"), HASH_1)); + } + + public BinaryStreamsPipelineCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + /** + * Creates a map of stream keys to StreamEntryID objects. + * @param streamOffsets Array of stream key and offset pairs + * @return Map of stream keys to StreamEntryID objects + */ + public static Map offsets(Object... streamOffsets) { + if (streamOffsets.length % 2 != 0) { + throw new IllegalArgumentException("Stream offsets must be provided as key-value pairs"); + } + + Map result = new HashMap<>(); + for (int i = 0; i < streamOffsets.length; i += 2) { + byte[] key = (byte[]) streamOffsets[i]; + Object value = streamOffsets[i + 1]; + + StreamEntryID id; + if (value instanceof String) { + id = new StreamEntryID((String) value); + } else if (value instanceof StreamEntryID) { + id = (StreamEntryID) value; + } else { + throw new IllegalArgumentException("Offset must be a String or StreamEntryID"); + } + + result.put(key, id); + } + + return result; + } + + @BeforeEach + public void setUpTestStream() { + jedis.del(STREAM_KEY_1); + jedis.del(STREAM_KEY_2); + try { + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + try { + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, + StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + } + + @Test + public void xreadBinary() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>>> response = pipe.xreadBinary( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + pipe.sync(); + List>> actualEntries = response.get(); + + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryAsMap() { + + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>> response = pipe.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0")); + + pipe.sync(); + Map> actualEntries = response.get(); + + assertThat(actualEntries.entrySet(), hasSize(1)); + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadBinaryAsMapWithMultipleStreams() { + + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>> response = pipe.xreadBinaryAsMap( + XReadParams.xReadParams(), offsets(STREAM_KEY_1, "0-0", STREAM_KEY_2, "0-0")); + + pipe.sync(); + Map> actualEntries = response.get(); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + + @Test + public void xreadGroupBinary() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>>> response = pipe.xreadGroupBinary( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + pipe.sync(); + List>> actualEntries = response.get(); + + // verify the result contains entries from one stream + // and is under the expected stream key + assertThat(actualEntries, hasSize(1)); + assertArrayEquals(STREAM_KEY_1, actualEntries.get(0).getKey()); + + assertThat(actualEntries.get(0).getValue(), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMap() { + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>> response = pipe.xreadGroupBinaryAsMap( + GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY)); + + pipe.sync(); + Map> actualEntries = response.get(); + + assertThat(actualEntries.entrySet(), hasSize(1)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + } + + @Test + public void xreadGroupBinaryAsMapMultipleStreams() { + // Add entries to the streams + stream1Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_1, new XAddParams().id(entry.getID()), entry.getFields())); + stream2Entries.forEach( + entry -> jedis.xadd(STREAM_KEY_2, new XAddParams().id(entry.getID()), entry.getFields())); + + Response>> response = pipe.xreadGroupBinaryAsMap(GROUP_NAME, + CONSUMER_NAME, XReadGroupParams.xReadGroupParams(), + offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY, STREAM_KEY_2, + XREADGROUP_UNDELIVERED_ENTRY)); + + pipe.sync(); + Map> actualEntries = response.get(); + + assertThat(actualEntries.entrySet(), hasSize(2)); + + assertThat(actualEntries.get(STREAM_KEY_1), equalsStreamEntries(stream1Entries)); + assertThat(actualEntries.get(STREAM_KEY_2), equalsStreamEntries(stream2Entries)); + } + +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java new file mode 100644 index 0000000000..93f4279b4a --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStreamsBinaryCommandsTest.java @@ -0,0 +1,27 @@ +package redis.clients.jedis.commands.unified.pooled; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.commands.unified.StreamsBinaryCommandsTestBase; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class PooledStreamsBinaryCommandsTest extends StreamsBinaryCommandsTestBase { + + public PooledStreamsBinaryCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + @Override + public void setUpTestClient() { + jedis = PooledCommandsTestHelper.getPooled(protocol); + } + + @AfterEach + public void tearDown() { + jedis.close(); + } + +} diff --git a/src/test/java/redis/clients/jedis/util/ByteArrayMapMatcher.java b/src/test/java/redis/clients/jedis/util/ByteArrayMapMatcher.java new file mode 100644 index 0000000000..f653607b6e --- /dev/null +++ b/src/test/java/redis/clients/jedis/util/ByteArrayMapMatcher.java @@ -0,0 +1,41 @@ +package redis.clients.jedis.util; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import java.util.Map; +import java.util.Arrays; + +public class ByteArrayMapMatcher extends TypeSafeMatcher> { + + private final Map expected; + + public ByteArrayMapMatcher(Map expected) { + this.expected = expected; + } + + @Override + protected boolean matchesSafely(Map actual) { + if (actual.size() != expected.size()) return false; + + outer: + for (Map.Entry expectedEntry : expected.entrySet()) { + for (Map.Entry actualEntry : actual.entrySet()) { + if (Arrays.equals(expectedEntry.getKey(), actualEntry.getKey()) && + Arrays.equals(expectedEntry.getValue(), actualEntry.getValue())) { + continue outer; + } + } + return false; + } + + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("maps to be equal by byte[] content"); + } + + public static ByteArrayMapMatcher equalToByteArrayMap(Map expected) { + return new ByteArrayMapMatcher(expected); + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/util/StreamEntryBinaryListMatcher.java b/src/test/java/redis/clients/jedis/util/StreamEntryBinaryListMatcher.java new file mode 100644 index 0000000000..e3a1245e2a --- /dev/null +++ b/src/test/java/redis/clients/jedis/util/StreamEntryBinaryListMatcher.java @@ -0,0 +1,59 @@ +package redis.clients.jedis.util; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import redis.clients.jedis.resps.StreamEntryBinary; + +import java.util.List; +import java.util.Map; +import java.util.Arrays; + +public class StreamEntryBinaryListMatcher extends TypeSafeMatcher> { + + private final List expected; + + public StreamEntryBinaryListMatcher(List expected) { + this.expected = expected; + } + + @Override + protected boolean matchesSafely(List actual) { + if (actual.size() != expected.size()) return false; + + for (int i = 0; i < expected.size(); i++) { + StreamEntryBinary e = expected.get(i); + StreamEntryBinary a = actual.get(i); + + if (!e.getID().equals(a.getID())) return false; + if (!mapsEqual(e.getFields(), a.getFields())) return false; + } + + return true; + } + + private boolean mapsEqual(Map m1, Map m2) { + if (m1.size() != m2.size()) return false; + + outer: + for (Map.Entry e1 : m1.entrySet()) { + for (Map.Entry e2 : m2.entrySet()) { + if (Arrays.equals(e1.getKey(), e2.getKey()) && + Arrays.equals(e1.getValue(), e2.getValue())) { + continue outer; + } + } + return false; + } + + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("StreamEntryBinary lists to match by ID and field content"); + } + + public static StreamEntryBinaryListMatcher equalsStreamEntries(List expected) { + return new StreamEntryBinaryListMatcher(expected); + } +} \ No newline at end of file