Skip to content

Commit

Permalink
fix: use scan command to process data collection in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Aug 29, 2024
1 parent 16a0924 commit 28a32a8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 0 additions & 2 deletions inlong-agent/agent-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<debezium.version>1.8.0.Final</debezium.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<darwinsys.version>1.5.1</darwinsys.version>
<jedis.version>4.3.1</jedis.version>
</properties>

<dependencies>
Expand All @@ -46,7 +45,6 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

import java.io.IOException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -114,10 +116,19 @@ protected void initSource(InstanceProfile profile) {
}

private void startJedisSynchronize() {
for (String key : keys) {
String data = jedisExecuteCommand(redisCommand, key, fieldOrMember);
synchronizeData(data);
}
int cursor = 0;
// Each scan processes up to 100 keys
ScanParams scanParams = new ScanParams().count(100);
do {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
for (String key : scanResult.getResult()) {
if (!keys.contains(key)) {
String data = jedisExecuteCommand(redisCommand, key, fieldOrMember);
synchronizeData(data);
}
}
cursor = scanResult.getCursor();
} while (cursor != 0);
}

private void synchronizeData(String data) {
Expand Down

0 comments on commit 28a32a8

Please sign in to comment.