Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 107 additions & 4 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import redis.clients.jedis.resps.*;
import redis.clients.jedis.resps.LCSMatchResult.MatchedPosition;
import redis.clients.jedis.resps.LCSMatchResult.Position;
import redis.clients.jedis.util.DoublePrecision;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.SafeEncoder;
import redis.clients.jedis.util.*;

public final class BuilderFactory {

Expand Down Expand Up @@ -1809,6 +1806,112 @@ public String toString() {
}
};

public static final Builder<List<StreamEntryBinary>> STREAM_ENTRY_BINARY_LIST = new Builder<List<StreamEntryBinary>>() {
@Override
@SuppressWarnings("unchecked")
public List<StreamEntryBinary> build(Object data) {
if (null == data) {
return null;
}
List<ArrayList<Object>> objectList = (List<ArrayList<Object>>) data;

List<StreamEntryBinary> responses = new ArrayList<>(objectList.size() / 2);
if (objectList.isEmpty()) {
return responses;
}

for (ArrayList<Object> res : objectList) {
if (res == null) {
responses.add(null);
continue;
}
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
StreamEntryID entryID = new StreamEntryID(entryIdString);
List<byte[]> hash = (List<byte[]>) res.get(1);
if (hash == null) {
responses.add(new StreamEntryBinary(entryID, null));
continue;
}

Iterator<byte[]> hashIterator = hash.iterator();
Map<byte[], byte[]> map = new JedisByteHashMap();
while (hashIterator.hasNext()) {
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
}
responses.add(new StreamEntryBinary(entryID, map));
}

return responses;
}

@Override
public String toString() {
return "List<StreamEntryBinary>";
}
};

public static final Builder<Map<byte[], List<StreamEntryBinary>>> STREAM_READ_BINARY_MAP_RESPONSE
= new Builder<Map<byte[], List<StreamEntryBinary>>>() {
@Override
@SuppressWarnings("unchecked")
public Map<byte[], List<StreamEntryBinary>> build(Object data) {
if (data == null) return null;
List list = (List) data;
if (list.isEmpty()) return Collections.emptyMap();

JedisByteMap<List<StreamEntryBinary>> result = new JedisByteMap<>();
if (list.get(0) instanceof KeyValue) {
((List<KeyValue>) list).forEach(kv -> result.put(BINARY.build(kv.getKey()), STREAM_ENTRY_BINARY_LIST.build(kv.getValue())));
return result;
} else {
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
byte[] streamKey = (byte[]) streamObj.get(0);
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
result.put(streamKey, streamEntries);
}
return result;
}
}

@Override
public String toString() {
return "Map<byte[], List<StreamEntryBinary>>";
}
};

public static final Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>> STREAM_READ_BINARY_RESPONSE
= new Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>>() {
@Override
@SuppressWarnings("unchecked")
public List<Map.Entry<byte[], List<StreamEntryBinary>>> build(Object data) {
if (data == null) return null;
List list = (List) data;
if (list.isEmpty()) return Collections.emptyList();

if (list.get(0) instanceof KeyValue) {
return ((List<KeyValue>) list).stream()
.map(kv -> new KeyValue<>(BINARY.build(kv.getKey()),
STREAM_ENTRY_BINARY_LIST.build(kv.getValue())))
.collect(Collectors.toList());
} else {
List<Map.Entry<byte[], List<StreamEntryBinary>>> result = new ArrayList<>(list.size());
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
byte[] streamKey = BINARY.build(streamObj.get(0));
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
result.add(KeyValue.of(streamKey, streamEntries));
}
return result;
}
}

@Override
public String toString() {
return "List<Entry<byte[], List<StreamEntryBinary>>>";
}
};

private static final List<Builder> BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS
= Arrays.asList(STRING, LONG, DOUBLE);

Expand Down
106 changes: 106 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,11 @@ public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
}

/**
* @deprecated As of Jedis 6.1.0, replaced by {@link #xreadBinary(XReadParams, Map)} or
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry parsing.
*/
@Deprecated
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
Expand All @@ -2892,6 +2897,35 @@ public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entr
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
XReadParams xReadParams, Map.Entry<byte[], StreamEntryID>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
}

public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
XReadParams xReadParams, Map.Entry<byte[], StreamEntryID>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
}

/**
* @deprecated As of Jedis 6.1.0, use {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
*/
@Deprecated
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
Expand All @@ -2905,6 +2939,78 @@ public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] con
}
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], StreamEntryID>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
}

public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], StreamEntryID>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
}

public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
}

public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
}

public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map<byte[], StreamEntryID> streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
}

public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map<byte[], StreamEntryID> streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
}
// Stream commands

// Scripting commands
Expand Down
47 changes: 44 additions & 3 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ public long hsetex(byte[] key, HSetExParams params, byte[] field, byte[] value)
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.hsetex(key, params, field, value));
}

@Override
public long hsetex(byte[] key, HSetExParams params, Map<byte[], byte[]> hash){
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -1200,13 +1200,13 @@ public List<byte[]> hgetex(byte[] key, HGetExParams params, byte[]... fields){
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.hgetex(key, params, fields));
}

@Override
public List<byte[]> hgetdel(byte[] key, byte[]... fields){
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.hgetdel(key, fields));
}

/**
* Set the specified hash field to the specified value if the field not exists. <b>Time
* complexity:</b> O(1)
Expand Down Expand Up @@ -4770,19 +4770,60 @@ public List<Long> hpersist(byte[] key, byte[]... fields) {
return connection.executeCommand(commandObjects.hpersist(key, fields));
}

/**
* @deprecated As of Jedis 6.1.0, use
* {@link #xreadBinary(XReadParams, Map)} or
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry
* parsing.
*/
@Deprecated
@Override
public List<Object> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

/**
* @deprecated As of Jedis 6.1.0, use
* {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
*/
@Deprecated
@Override
public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
Map<byte[], StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadBinary(xReadParams, streams));
}

@Override
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
Map<byte[], StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
}

@Override
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public byte[] xadd(final byte[] key, final XAddParams params, final Map<byte[], byte[]> hash) {
checkIsInMultiOrPipeline();
Expand Down
41 changes: 40 additions & 1 deletion src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3359,17 +3359,56 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
return appendCommand(commandObjects.xinfoConsumers(key, group));
}

/**
* @deprecated As of Jedis 6.1.0, use {@link #xreadBinary(XReadParams, Map)} or
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry
* parsing.
*/
@Deprecated
@Override
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams,
Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

/**
* @deprecated As of Jedis 6.1.0, use
* {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
*/
@Deprecated
@Override
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(XReadParams xReadParams,
Map<byte[], StreamEntryID> streams) {
return appendCommand(commandObjects.xreadBinary(xReadParams, streams));
}

@Override
public Response<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(XReadParams xReadParams,
Map<byte[], StreamEntryID> streams) {
return appendCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
}

@Override
public Response<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(byte[] groupName,
byte[] consumer, XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
return appendCommand(
commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(byte[] groupName,
byte[] consumer, XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
return appendCommand(
commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<String> set(byte[] key, byte[] value) {
return appendCommand(commandObjects.set(key, value));
Expand Down
Loading
Loading