Skip to content

Commit

Permalink
Support round-robining for timeseries commands
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Apr 8, 2023
1 parent 014182d commit 380857d
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 6 deletions.
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4207,6 +4207,10 @@ public List<TSKeyedElements> tsMRange(TSMRangeParams multiRangeParams) {
return executeCommand(commandObjects.tsMRange(multiRangeParams));
}

public TsMRangeRoundRobin tsMRangeRoundRobin(TSMRangeParams multiRangeParams) {
return new TsMRangeRoundRobin(provider, false, multiRangeParams);
}

@Override
public List<TSKeyedElements> tsMRevRange(long fromTimestamp, long toTimestamp, String... filters) {
return executeCommand(commandObjects.tsMRevRange(fromTimestamp, toTimestamp, filters));
Expand All @@ -4217,6 +4221,10 @@ public List<TSKeyedElements> tsMRevRange(TSMRangeParams multiRangeParams) {
return executeCommand(commandObjects.tsMRevRange(multiRangeParams));
}

public TsMRangeRoundRobin tsMRevRangeRoundRobin(TSMRangeParams multiRangeParams) {
return new TsMRangeRoundRobin(provider, true, multiRangeParams);
}

@Override
public TSElement tsGet(String key) {
return executeCommand(commandObjects.tsGet(key));
Expand All @@ -4232,6 +4240,10 @@ public List<TSKeyValue<TSElement>> tsMGet(TSMGetParams multiGetParams, String...
return executeCommand(commandObjects.tsMGet(multiGetParams, filters));
}

public TsMGetRoundRobin tsMGetRoundRobin(TSMGetParams multiGetParams, String... filters) {
return new TsMGetRoundRobin(provider, multiGetParams, filters);
}

@Override
public String tsCreateRule(String sourceKey, String destKey, AggregationType aggregationType, long timeBucket) {
return executeCommand(commandObjects.tsCreateRule(sourceKey, destKey, aggregationType, timeBucket));
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/redis/clients/jedis/timeseries/TsMGetRoundRobin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package redis.clients.jedis.timeseries;

import java.util.List;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.util.JedisRoundRobinBase;

public class TsMGetRoundRobin extends JedisRoundRobinBase<List<TSKeyValue<TSElement>>> {

private final CommandArguments args;

public TsMGetRoundRobin(ConnectionProvider connectionProvider, TSMGetParams multiGetParams, String... filters) {
super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MGET_RESPONSE);
this.args = new CommandArguments(TimeSeriesProtocol.TimeSeriesCommand.MGET).addParams(multiGetParams)
.add(TimeSeriesProtocol.TimeSeriesKeyword.FILTER).addObjects((Object[]) filters);
}

@Override
protected boolean isIterationCompleted(List<TSKeyValue<TSElement>> reply) {
return reply != null;
}

@Override
protected CommandArguments initCommandArguments() {
return args;
}

@Override
protected CommandArguments nextCommandArguments(List<TSKeyValue<TSElement>> lastReply) {
throw new IllegalStateException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis.clients.jedis.timeseries;

import java.util.List;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesCommand;
import redis.clients.jedis.util.JedisRoundRobinBase;

public class TsMRangeRoundRobin extends JedisRoundRobinBase<List<TSKeyedElements>> {

private final CommandArguments args;

/**
* @param connectionProvider connection provider
* @param reverse {@code false} means TS.MRANGE command; {@code true} means TS.MREVRANGE command
* @param multiRangeParams optional arguments and parameters
*/
public TsMRangeRoundRobin(ConnectionProvider connectionProvider, boolean reverse, TSMRangeParams multiRangeParams) {
super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MRANGE_RESPONSE);
this.args = new CommandArguments(!reverse ? TimeSeriesCommand.MRANGE : TimeSeriesCommand.MREVRANGE).addParams(multiRangeParams);
}

@Override
protected boolean isIterationCompleted(List<TSKeyedElements> reply) {
return reply != null;
}

@Override
protected CommandArguments initCommandArguments() {
return args;
}

@Override
protected CommandArguments nextCommandArguments(List<TSKeyedElements> lastReply) {
throw new IllegalStateException();
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package redis.clients.jedis.modules.timeseries;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

import java.util.*;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -533,6 +528,71 @@ public void mrangeFilterBy() {
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
}

@Test
public void mrangeFilterByRoundRobin() {
TsMRangeRoundRobin rr;
List<TSKeyedElements> range;

Map<String, String> labels = Collections.singletonMap("label", "multi");
client.tsCreate("ts1", TSCreateParams.createParams().labels(labels));
client.tsCreate("ts2", TSCreateParams.createParams().labels(labels));
String filter = "label=multi";

TSElement[] rawValues = new TSElement[]{
new TSElement(1000L, 1.0),
new TSElement(2000L, 0.9),
new TSElement(3200L, 1.1),
new TSElement(4500L, -1.1)
};

client.tsAdd("ts1", rawValues[0].getTimestamp(), rawValues[0].getValue());
client.tsAdd("ts2", rawValues[1].getTimestamp(), rawValues[1].getValue());
client.tsAdd("ts2", rawValues[2].getTimestamp(), rawValues[2].getValue());
client.tsAdd("ts1", rawValues[3].getTimestamp(), rawValues[3].getValue());

// MRANGE
rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
.filterByTS(1000L, 2000L).filter(filter));
assertFalse(rr.isRoundRobinCompleted());
range = rr.get();
assertEquals("ts1", range.get(0).getKey());
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
assertEquals("ts2", range.get(1).getKey());
assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue());
assertTrue(rr.isRoundRobinCompleted());

rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
.filterByValues(1.0, 1.2).filter(filter));
assertFalse(rr.isRoundRobinCompleted());
range = rr.get();
assertEquals("ts1", range.get(0).getKey());
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
assertEquals("ts2", range.get(1).getKey());
assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue());
assertTrue(rr.isRoundRobinCompleted());

// MREVRANGE
rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
.filterByTS(1000L, 2000L).filter(filter));
assertFalse(rr.isRoundRobinCompleted());
range = rr.get();
assertEquals("ts1", range.get(0).getKey());
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
assertEquals("ts2", range.get(1).getKey());
assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue());
assertTrue(rr.isRoundRobinCompleted());

rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
.filterByValues(1.0, 1.2).filter(filter));
assertFalse(rr.isRoundRobinCompleted());
range = rr.get();
assertEquals("ts1", range.get(0).getKey());
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
assertEquals("ts2", range.get(1).getKey());
assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue());
assertTrue(rr.isRoundRobinCompleted());
}

@Test
public void groupByReduce() {
client.tsCreate("ts1", TSCreateParams.createParams().labels(convertMap("metric", "cpu", "metric_name", "system")));
Expand Down Expand Up @@ -624,6 +684,47 @@ public void testMGet() {
assertNull(ranges3.get(1).getValue());
}

@Test
public void mgetRoundRobin() {
TsMGetRoundRobin rr;
Map<String, String> labels = new HashMap<>();
labels.put("l1", "v1");
labels.put("l2", "v2");
assertEquals("OK", client.tsCreate("seriesMGet1", TSCreateParams.createParams()
.retention(100 * 1000 /*100sec retentionTime*/).labels(labels)));
assertEquals("OK", client.tsCreate("seriesMGet2", TSCreateParams.createParams()
.retention(100 * 1000 /*100sec retentionTime*/).labels(labels)));

// Test for empty result
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v2");
assertFalse(rr.isRoundRobinCompleted());
List<TSKeyValue<TSElement>> ranges1 = rr.get();
assertEquals(0, ranges1.size());
assertTrue(rr.isRoundRobinCompleted());

// Test for empty ranges
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(true), "l1=v1");
assertFalse(rr.isRoundRobinCompleted());
List<TSKeyValue<TSElement>> ranges2 = rr.get();
assertEquals(2, ranges2.size());
assertEquals(labels, ranges2.get(0).getLabels());
assertEquals(labels, ranges2.get(1).getLabels());
assertNull(ranges2.get(0).getValue());
assertTrue(rr.isRoundRobinCompleted());

// Test for returned result on MGet
client.tsAdd("seriesMGet1", 1500, 1.3);
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v1");
assertFalse(rr.isRoundRobinCompleted());
List<TSKeyValue<TSElement>> ranges3 = rr.get();
assertEquals(2, ranges3.size());
assertEquals(Collections.emptyMap(), ranges3.get(0).getLabels());
assertEquals(Collections.emptyMap(), ranges3.get(1).getLabels());
assertEquals(new TSElement(1500, 1.3), ranges3.get(0).getValue());
assertNull(ranges3.get(1).getValue());
assertTrue(rr.isRoundRobinCompleted());
}

@Test
public void testQueryIndex() {

Expand Down

0 comments on commit 380857d

Please sign in to comment.