From d92e9d7e8ecd33ccf52dd169380192d7416726b0 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Sat, 6 Apr 2024 16:09:54 +0700 Subject: [PATCH 1/2] StreamEntry support Binary --- .../jedis/resps/StreamEntryBinary.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java new file mode 100644 index 0000000000..1cc0ea360c --- /dev/null +++ b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java @@ -0,0 +1,43 @@ +package redis.clients.jedis.resps; + +import redis.clients.jedis.StreamEntryID; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +public class StreamEntryBinary implements Serializable { + + private static final long serialVersionUID = 1L; + + private StreamEntryID id; + private Map fields; + + public StreamEntryBinary(StreamEntryID id, Map fields) { + this.id = id; + this.fields = fields; + } + + public StreamEntryID getID() { + return id; + } + + public Map getFields() { + return fields; + } + + @Override + public String toString() { + return id + " " + fields; + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.writeUnshared(this.id); + out.writeUnshared(this.fields); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + this.id = (StreamEntryID) in.readUnshared(); + this.fields = (Map) in.readUnshared(); + } +} From ee1a2db503cb4027af96675eddc7279b94bfa500 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Mon, 30 Sep 2024 22:30:44 +0700 Subject: [PATCH 2/2] Implement JedisBinaryStream --- .../redis/clients/jedis/BuilderFactory.java | 32 +++++++++++++++++++ .../clients/jedis/JedisBinaryStream.java | 21 ++++++++++++ .../clients/jedis/JedisBinaryStreamTest.java | 12 +++++++ 3 files changed, 65 insertions(+) create mode 100644 src/main/java/redis/clients/jedis/JedisBinaryStream.java create mode 100644 src/test/java/redis/clients/jedis/JedisBinaryStreamTest.java diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index adce27f1f4..6848def3ae 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1317,6 +1317,38 @@ public String toString() { } }; + public static final Builder> STREAM_ENTRY_BINARY = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + List result = new ArrayList<>(); + if (data == null) { + return result; + } + List objectList = (List) data; + + for (Object obj : objectList) { + List entry = (List) obj; + String entryIdString = SafeEncoder.encode((byte[]) entry.get(0)); + StreamEntryID entryID = new StreamEntryID(entryIdString); + List hash = (List) entry.get(1); + + Iterator hashIterator = hash.iterator(); + Map map = new HashMap<>(hash.size() / 2, 1f); + while (hashIterator.hasNext()) { + map.put(hashIterator.next(), hashIterator.next()); + } + result.add(new StreamEntryBinary(entryID, map)); + } + return result; + } + + @Override + public String toString() { + return "StreamEntryBinary"; + } + }; + public static final Builder> STREAM_ENTRY_LIST = new Builder>() { @Override @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/JedisBinaryStream.java b/src/main/java/redis/clients/jedis/JedisBinaryStream.java new file mode 100644 index 0000000000..414439bd67 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisBinaryStream.java @@ -0,0 +1,21 @@ +package redis.clients.jedis; + +import redis.clients.jedis.resps.StreamEntryBinary; + +import java.util.List; + +public class JedisBinaryStream extends Jedis { + public JedisBinaryStream(String host) { + super(host); + } + + public List xrangeBinary(byte[] key, byte[] start, byte[] end, int count) { + List rawResponse = super.xrange(key, start, end, count); + return BuilderFactory.STREAM_ENTRY_BINARY.build(rawResponse); + } + + public List xrangeBinary(byte[] key, byte[] start, byte[] end) { + List rawResponse = super.xrange(key, start, end); + return BuilderFactory.STREAM_ENTRY_BINARY.build(rawResponse); + } +} diff --git a/src/test/java/redis/clients/jedis/JedisBinaryStreamTest.java b/src/test/java/redis/clients/jedis/JedisBinaryStreamTest.java new file mode 100644 index 0000000000..2976f7bd6a --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisBinaryStreamTest.java @@ -0,0 +1,12 @@ +package redis.clients.jedis; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import redis.clients.jedis.commands.jedis.JedisCommandsTestBase; + +@RunWith(Parameterized.class) +public class JedisBinaryStreamTest extends JedisCommandsTestBase { + public JedisBinaryStreamTest(RedisProtocol protocol) { + super(protocol); + } +}