From 380857d7238a3f1469db5c9ec06a0ab76dd8d9a8 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Sat, 8 Apr 2023 19:26:44 +0600 Subject: [PATCH] Support round-robining for timeseries commands --- .../redis/clients/jedis/UnifiedJedis.java | 12 ++ .../jedis/timeseries/TsMGetRoundRobin.java | 33 +++++ .../jedis/timeseries/TsMRangeRoundRobin.java | 38 ++++++ .../modules/timeseries/TimeSeriesTest.java | 113 +++++++++++++++++- 4 files changed, 190 insertions(+), 6 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/timeseries/TsMGetRoundRobin.java create mode 100644 src/main/java/redis/clients/jedis/timeseries/TsMRangeRoundRobin.java diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index f49af3fdd8..efd8404675 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -4207,6 +4207,10 @@ public List tsMRange(TSMRangeParams multiRangeParams) { return executeCommand(commandObjects.tsMRange(multiRangeParams)); } + public TsMRangeRoundRobin tsMRangeRoundRobin(TSMRangeParams multiRangeParams) { + return new TsMRangeRoundRobin(provider, false, multiRangeParams); + } + @Override public List tsMRevRange(long fromTimestamp, long toTimestamp, String... filters) { return executeCommand(commandObjects.tsMRevRange(fromTimestamp, toTimestamp, filters)); @@ -4217,6 +4221,10 @@ public List tsMRevRange(TSMRangeParams multiRangeParams) { return executeCommand(commandObjects.tsMRevRange(multiRangeParams)); } + public TsMRangeRoundRobin tsMRevRangeRoundRobin(TSMRangeParams multiRangeParams) { + return new TsMRangeRoundRobin(provider, true, multiRangeParams); + } + @Override public TSElement tsGet(String key) { return executeCommand(commandObjects.tsGet(key)); @@ -4232,6 +4240,10 @@ public List> tsMGet(TSMGetParams multiGetParams, String... return executeCommand(commandObjects.tsMGet(multiGetParams, filters)); } + public TsMGetRoundRobin tsMGetRoundRobin(TSMGetParams multiGetParams, String... filters) { + return new TsMGetRoundRobin(provider, multiGetParams, filters); + } + @Override public String tsCreateRule(String sourceKey, String destKey, AggregationType aggregationType, long timeBucket) { return executeCommand(commandObjects.tsCreateRule(sourceKey, destKey, aggregationType, timeBucket)); diff --git a/src/main/java/redis/clients/jedis/timeseries/TsMGetRoundRobin.java b/src/main/java/redis/clients/jedis/timeseries/TsMGetRoundRobin.java new file mode 100644 index 0000000000..160d16340a --- /dev/null +++ b/src/main/java/redis/clients/jedis/timeseries/TsMGetRoundRobin.java @@ -0,0 +1,33 @@ +package redis.clients.jedis.timeseries; + +import java.util.List; + +import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.util.JedisRoundRobinBase; + +public class TsMGetRoundRobin extends JedisRoundRobinBase>> { + + private final CommandArguments args; + + public TsMGetRoundRobin(ConnectionProvider connectionProvider, TSMGetParams multiGetParams, String... filters) { + super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MGET_RESPONSE); + this.args = new CommandArguments(TimeSeriesProtocol.TimeSeriesCommand.MGET).addParams(multiGetParams) + .add(TimeSeriesProtocol.TimeSeriesKeyword.FILTER).addObjects((Object[]) filters); + } + + @Override + protected boolean isIterationCompleted(List> reply) { + return reply != null; + } + + @Override + protected CommandArguments initCommandArguments() { + return args; + } + + @Override + protected CommandArguments nextCommandArguments(List> lastReply) { + throw new IllegalStateException(); + } +} diff --git a/src/main/java/redis/clients/jedis/timeseries/TsMRangeRoundRobin.java b/src/main/java/redis/clients/jedis/timeseries/TsMRangeRoundRobin.java new file mode 100644 index 0000000000..057ffa19a5 --- /dev/null +++ b/src/main/java/redis/clients/jedis/timeseries/TsMRangeRoundRobin.java @@ -0,0 +1,38 @@ +package redis.clients.jedis.timeseries; + +import java.util.List; + +import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesCommand; +import redis.clients.jedis.util.JedisRoundRobinBase; + +public class TsMRangeRoundRobin extends JedisRoundRobinBase> { + + private final CommandArguments args; + + /** + * @param connectionProvider connection provider + * @param reverse {@code false} means TS.MRANGE command; {@code true} means TS.MREVRANGE command + * @param multiRangeParams optional arguments and parameters + */ + public TsMRangeRoundRobin(ConnectionProvider connectionProvider, boolean reverse, TSMRangeParams multiRangeParams) { + super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MRANGE_RESPONSE); + this.args = new CommandArguments(!reverse ? TimeSeriesCommand.MRANGE : TimeSeriesCommand.MREVRANGE).addParams(multiRangeParams); + } + + @Override + protected boolean isIterationCompleted(List reply) { + return reply != null; + } + + @Override + protected CommandArguments initCommandArguments() { + return args; + } + + @Override + protected CommandArguments nextCommandArguments(List lastReply) { + throw new IllegalStateException(); + } +} diff --git a/src/test/java/redis/clients/jedis/modules/timeseries/TimeSeriesTest.java b/src/test/java/redis/clients/jedis/modules/timeseries/TimeSeriesTest.java index f0b8c2b99a..c352af6875 100644 --- a/src/test/java/redis/clients/jedis/modules/timeseries/TimeSeriesTest.java +++ b/src/test/java/redis/clients/jedis/modules/timeseries/TimeSeriesTest.java @@ -1,11 +1,6 @@ package redis.clients.jedis.modules.timeseries; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.util.*; import org.junit.BeforeClass; @@ -533,6 +528,71 @@ public void mrangeFilterBy() { assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue()); } + @Test + public void mrangeFilterByRoundRobin() { + TsMRangeRoundRobin rr; + List range; + + Map labels = Collections.singletonMap("label", "multi"); + client.tsCreate("ts1", TSCreateParams.createParams().labels(labels)); + client.tsCreate("ts2", TSCreateParams.createParams().labels(labels)); + String filter = "label=multi"; + + TSElement[] rawValues = new TSElement[]{ + new TSElement(1000L, 1.0), + new TSElement(2000L, 0.9), + new TSElement(3200L, 1.1), + new TSElement(4500L, -1.1) + }; + + client.tsAdd("ts1", rawValues[0].getTimestamp(), rawValues[0].getValue()); + client.tsAdd("ts2", rawValues[1].getTimestamp(), rawValues[1].getValue()); + client.tsAdd("ts2", rawValues[2].getTimestamp(), rawValues[2].getValue()); + client.tsAdd("ts1", rawValues[3].getTimestamp(), rawValues[3].getValue()); + + // MRANGE + rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L) + .filterByTS(1000L, 2000L).filter(filter)); + assertFalse(rr.isRoundRobinCompleted()); + range = rr.get(); + assertEquals("ts1", range.get(0).getKey()); + assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue()); + assertEquals("ts2", range.get(1).getKey()); + assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + + rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L) + .filterByValues(1.0, 1.2).filter(filter)); + assertFalse(rr.isRoundRobinCompleted()); + range = rr.get(); + assertEquals("ts1", range.get(0).getKey()); + assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue()); + assertEquals("ts2", range.get(1).getKey()); + assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + + // MREVRANGE + rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L) + .filterByTS(1000L, 2000L).filter(filter)); + assertFalse(rr.isRoundRobinCompleted()); + range = rr.get(); + assertEquals("ts1", range.get(0).getKey()); + assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue()); + assertEquals("ts2", range.get(1).getKey()); + assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + + rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L) + .filterByValues(1.0, 1.2).filter(filter)); + assertFalse(rr.isRoundRobinCompleted()); + range = rr.get(); + assertEquals("ts1", range.get(0).getKey()); + assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue()); + assertEquals("ts2", range.get(1).getKey()); + assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + } + @Test public void groupByReduce() { client.tsCreate("ts1", TSCreateParams.createParams().labels(convertMap("metric", "cpu", "metric_name", "system"))); @@ -624,6 +684,47 @@ public void testMGet() { assertNull(ranges3.get(1).getValue()); } + @Test + public void mgetRoundRobin() { + TsMGetRoundRobin rr; + Map labels = new HashMap<>(); + labels.put("l1", "v1"); + labels.put("l2", "v2"); + assertEquals("OK", client.tsCreate("seriesMGet1", TSCreateParams.createParams() + .retention(100 * 1000 /*100sec retentionTime*/).labels(labels))); + assertEquals("OK", client.tsCreate("seriesMGet2", TSCreateParams.createParams() + .retention(100 * 1000 /*100sec retentionTime*/).labels(labels))); + + // Test for empty result + rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v2"); + assertFalse(rr.isRoundRobinCompleted()); + List> ranges1 = rr.get(); + assertEquals(0, ranges1.size()); + assertTrue(rr.isRoundRobinCompleted()); + + // Test for empty ranges + rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(true), "l1=v1"); + assertFalse(rr.isRoundRobinCompleted()); + List> ranges2 = rr.get(); + assertEquals(2, ranges2.size()); + assertEquals(labels, ranges2.get(0).getLabels()); + assertEquals(labels, ranges2.get(1).getLabels()); + assertNull(ranges2.get(0).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + + // Test for returned result on MGet + client.tsAdd("seriesMGet1", 1500, 1.3); + rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v1"); + assertFalse(rr.isRoundRobinCompleted()); + List> ranges3 = rr.get(); + assertEquals(2, ranges3.size()); + assertEquals(Collections.emptyMap(), ranges3.get(0).getLabels()); + assertEquals(Collections.emptyMap(), ranges3.get(1).getLabels()); + assertEquals(new TSElement(1500, 1.3), ranges3.get(0).getValue()); + assertNull(ranges3.get(1).getValue()); + assertTrue(rr.isRoundRobinCompleted()); + } + @Test public void testQueryIndex() {