Skip to content

Commit

Permalink
Support [BF/CF].[SCANDUMP/LOADCHUNK] commands (#3029)
Browse files Browse the repository at this point in the history
Support 4 RedisBloom commands:
- BF.SCANDUMP
- BF.LOADCHUNK
- CF.SCANDUMP
- CF.LOADCHUNK
  • Loading branch information
sazzad16 authored Jun 16, 2022
1 parent fbb2be4 commit 87320a9
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 117 deletions.
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,14 @@ public List<TSKeyValue<TSElement>> build(Object data) {
}
};

public static final Builder<Map.Entry<Long, byte[]>> BLOOM_SCANDUMP_RESPONSE = new Builder<Map.Entry<Long, byte[]>>() {
@Override
public Map.Entry<Long, byte[]> build(Object data) {
List<Object> list = (List<Object>) data;
return new KeyValue<>(LONG.build(list.get(0)), BINARY.build(list.get(1)));
}
};

/**
* A decorator to implement Set from List. Assume that given List do not contains duplicated
* values. The resulting set displays the same ordering, concurrency, and performance
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -3579,6 +3579,14 @@ public final CommandObject<List<Boolean>> bfMExists(String key, String... items)
addObjects((Object[]) items), BuilderFactory.BOOLEAN_LIST);
}

public final CommandObject<Map.Entry<Long, byte[]>> bfScanDump(String key, long iterator) {
return new CommandObject<>(commandArguments(BloomFilterCommand.SCANDUMP).key(key).add(iterator), BuilderFactory.BLOOM_SCANDUMP_RESPONSE);
}

public final CommandObject<String> bfLoadChunk(String key, long iterator, byte[] data) {
return new CommandObject<>(commandArguments(BloomFilterCommand.LOADCHUNK).key(key).add(iterator).add(data), BuilderFactory.STRING);
}

public final CommandObject<Map<String, Object>> bfInfo(String key) {
return new CommandObject<>(commandArguments(BloomFilterCommand.INFO).key(key), BuilderFactory.ENCODED_OBJECT_MAP);
}
Expand Down Expand Up @@ -3636,6 +3644,14 @@ public final CommandObject<Long> cfCount(String key, String item) {
return new CommandObject<>(commandArguments(CuckooFilterCommand.COUNT).key(key).add(item), BuilderFactory.LONG);
}

public final CommandObject<Map.Entry<Long, byte[]>> cfScanDump(String key, long iterator) {
return new CommandObject<>(commandArguments(CuckooFilterCommand.SCANDUMP).key(key).add(iterator), BuilderFactory.BLOOM_SCANDUMP_RESPONSE);
}

public final CommandObject<String> cfLoadChunk(String key, long iterator, byte[] data) {
return new CommandObject<>(commandArguments(CuckooFilterCommand.LOADCHUNK).key(key).add(iterator).add(data), BuilderFactory.STRING);
}

public final CommandObject<Map<String, Object>> cfInfo(String key) {
return new CommandObject<>(commandArguments(CuckooFilterCommand.INFO).key(key), BuilderFactory.ENCODED_OBJECT_MAP);
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3857,6 +3857,16 @@ public Response<List<Boolean>> bfMExists(String key, String... items) {
return appendCommand(commandObjects.bfMExists(key, items));
}

@Override
public Response<Map.Entry<Long, byte[]>> bfScanDump(String key, long iterator) {
return appendCommand(commandObjects.bfScanDump(key, iterator));
}

@Override
public Response<String> bfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.bfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> bfInfo(String key) {
return appendCommand(commandObjects.bfInfo(key));
Expand Down Expand Up @@ -3917,6 +3927,16 @@ public Response<Long> cfCount(String key, String item) {
return appendCommand(commandObjects.cfCount(key, item));
}

@Override
public Response<Map.Entry<Long, byte[]>> cfScanDump(String key, long iterator) {
return appendCommand(commandObjects.cfScanDump(key, iterator));
}

@Override
public Response<String> cfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.cfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> cfInfo(String key) {
return appendCommand(commandObjects.cfInfo(key));
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,16 @@ public Response<List<Boolean>> bfMExists(String key, String... items) {
return appendCommand(commandObjects.bfMExists(key, items));
}

@Override
public Response<Map.Entry<Long, byte[]>> bfScanDump(String key, long iterator) {
return appendCommand(commandObjects.bfScanDump(key, iterator));
}

@Override
public Response<String> bfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.bfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> bfInfo(String key) {
return appendCommand(commandObjects.bfInfo(key));
Expand Down Expand Up @@ -3919,6 +3929,16 @@ public Response<Long> cfCount(String key, String item) {
return appendCommand(commandObjects.cfCount(key, item));
}

@Override
public Response<Map.Entry<Long, byte[]>> cfScanDump(String key, long iterator) {
return appendCommand(commandObjects.cfScanDump(key, iterator));
}

@Override
public Response<String> cfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.cfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> cfInfo(String key) {
return appendCommand(commandObjects.cfInfo(key));
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3927,6 +3927,16 @@ public Response<List<Boolean>> bfMExists(String key, String... items) {
return appendCommand(commandObjects.bfMExists(key, items));
}

@Override
public Response<Map.Entry<Long, byte[]>> bfScanDump(String key, long iterator) {
return appendCommand(commandObjects.bfScanDump(key, iterator));
}

@Override
public Response<String> bfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.bfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> bfInfo(String key) {
return appendCommand(commandObjects.bfInfo(key));
Expand Down Expand Up @@ -3987,6 +3997,16 @@ public Response<Long> cfCount(String key, String item) {
return appendCommand(commandObjects.cfCount(key, item));
}

@Override
public Response<Map.Entry<Long, byte[]>> cfScanDump(String key, long iterator) {
return appendCommand(commandObjects.cfScanDump(key, iterator));
}

@Override
public Response<String> cfLoadChunk(String key, long iterator, byte[] data) {
return appendCommand(commandObjects.cfLoadChunk(key, iterator, data));
}

@Override
public Response<Map<String, Object>> cfInfo(String key) {
return appendCommand(commandObjects.cfInfo(key));
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3998,6 +3998,16 @@ public List<Boolean> bfMExists(String key, String... items) {
return executeCommand(commandObjects.bfMExists(key, items));
}

@Override
public Map.Entry<Long, byte[]> bfScanDump(String key, long iterator) {
return executeCommand(commandObjects.bfScanDump(key, iterator));
}

@Override
public String bfLoadChunk(String key, long iterator, byte[] data) {
return executeCommand(commandObjects.bfLoadChunk(key, iterator, data));
}

@Override
public Map<String, Object> bfInfo(String key) {
return executeCommand(commandObjects.bfInfo(key));
Expand Down Expand Up @@ -4063,6 +4073,16 @@ public long cfCount(String key, String item) {
return executeCommand(commandObjects.cfCount(key, item));
}

@Override
public Map.Entry<Long, byte[]> cfScanDump(String key, long iterator) {
return executeCommand(commandObjects.cfScanDump(key, iterator));
}

@Override
public String cfLoadChunk(String key, long iterator, byte[] data) {
return executeCommand(commandObjects.cfLoadChunk(key, iterator, data));
}

@Override
public Map<String, Object> cfInfo(String key) {
return executeCommand(commandObjects.cfInfo(key));
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/redis/clients/jedis/bloom/BloomFilterCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,24 @@ public interface BloomFilterCommands {
*/
List<Boolean> bfMExists(String key, String... items);

// BF.SCANDUMP {key} {iter}
// BF.LOADCHUNK {key} {iter} {data}
/**
* {@code BF.SCANDUMP {key} {iterator}}
*
* @param key
* @param iterator
* @return Pair of next iterator and current data
*/
Map.Entry<Long, byte[]> bfScanDump(String key, long iterator);

/**
* {@code BF.LOADCHUNK {key} {iterator} {data}}
*
* @param key
* @param iterator
* @param data
* @return OK
*/
String bfLoadChunk(String key, long iterator, byte[] data);

Map<String, Object> bfInfo(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ public interface BloomFilterPipelineCommands {

Response<List<Boolean>> bfMExists(String key, String... items);

Response<Map.Entry<Long, byte[]>> bfScanDump(String key, long iterator);

Response<String> bfLoadChunk(String key, long iterator, byte[] data);

Response<Map<String, Object>> bfInfo(String key);
}
40 changes: 25 additions & 15 deletions src/main/java/redis/clients/jedis/bloom/CuckooFilterCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,21 @@ public interface CuckooFilterCommands {
* @return The number of times the item exists in the filter
*/
long cfCount(String key, String item);
//
// /**
// * CF.SCANDUMP Begins an incremental save of the cuckoo filter. This is useful
// * for large cuckoo filters which cannot fit into the normal SAVE and RESTORE
// * model.
// *
// * The Iterator is passed as input to the next invocation of SCANDUMP . If
// * Iterator is 0, the iteration has completed.
// *
// * @param key Name of the filter
// * @param iterator This is either 0, or the iterator from a previous invocation
// * of this command
// * @return a Map.Entry containing the Iterator and Data.
// */
// Map.Entry<Long, byte[]> cfScanDump(String key, long iterator);

/**
* CF.SCANDUMP Begins an incremental save of the cuckoo filter. This is useful
* for large cuckoo filters which cannot fit into the normal SAVE and RESTORE
* model.
*
* The Iterator is passed as input to the next invocation of SCANDUMP . If
* Iterator is 0, the iteration has completed.
*
* @param key Name of the filter
* @param iterator This is either 0, or the iterator from a previous invocation
* of this command
* @return a Map.Entry containing the Iterator and Data.
*/
Map.Entry<Long, byte[]> cfScanDump(String key, long iterator);
//
// /**
// * CF.SCANDUMP Begins an incremental save of the cuckoo filter. This is useful
Expand All @@ -177,6 +177,16 @@ public interface CuckooFilterCommands {
// * @return A sequential Stream of Pair of iterator and data
// */
// Stream<Map.Entry<Long, byte[]>> cfScanDumpStream(String key);

/**
* CF.LOADCHUNK Restores a filter previously saved using SCANDUMP.
*
* @param key Name of the filter to restore
* @param iterator Iterator from CF.SCANDUMP
* @param data Data from CF.SCANDUMP
* @return OK
*/
String cfLoadChunk(String key, long iterator, byte[] data);
//
// /**
// * CF.LOADCHUNK Restores a filter previously saved using SCANDUMP . See the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ public interface CuckooFilterPipelineCommands {

Response<Long> cfCount(String key, String item);

Response<Map.Entry<Long, byte[]>> cfScanDump(String key, long iterator);

Response<String> cfLoadChunk(String key, long iterator, byte[] data);

Response<Map<String, Object>> cfInfo(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public enum BloomFilterCommand implements ProtocolCommand {
EXISTS("BF.EXISTS"),
MEXISTS("BF.MEXISTS"),
INSERT("BF.INSERT"),
SCANDUMP("BF.SCANDUMP"),
LOADCHUNK("BF.LOADCHUNK"),
INFO("BF.INFO");

private final byte[] raw;
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/redis/clients/jedis/modules/bloom/BloomTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,22 @@ public void insertExpansion() {
"o", "i", "u", "y", "t", "r", "e", "w", "q");
assertEquals(20, insert.size());
}

@Test(timeout = 2000L)
public void testScanDumpAndLoadChunk() {
client.bfAdd("bloom-dump", "a");

long iterator = 0;
while (true) {
Map.Entry<Long, byte[]> chunkData = client.bfScanDump("bloom-dump", iterator);
iterator = chunkData.getKey();
if (iterator == 0L) break;
assertEquals("OK", client.bfLoadChunk("bloom-load", iterator, chunkData.getValue()));
}

// check for properties
assertEquals(client.bfInfo("bloom-dump"), client.bfInfo("bloom-load"));
// check for existing items
assertTrue(client.bfExists("bloom-load", "a"));
}
}
Loading

0 comments on commit 87320a9

Please sign in to comment.