Skip to content

Commit

Permalink
Merge pull request #674 from camunda-community-hub/enable-redis-clust…
Browse files Browse the repository at this point in the history
…ers-with-connector-0.9.10

feat: upgrade to Redis exporter 0.9.10 and enable Redis cluster support
  • Loading branch information
nitram509 authored Feb 18, 2024
2 parents a2022aa + 6ce633a commit a593bfb
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 39 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ zeebe:
worker:
redis:
connection: redis://localhost:6379
useClusterClient: false
consumer-group: simple-monitor
prefix: zeebe
xread-count: 500
xread-block-millis: 2000
Expand All @@ -292,6 +294,9 @@ zeebe-importer: redis

Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Redis importer. Profile presets: `redis,redis_in_memory`

Please be aware that when connecting to a Redis cluster you must activate
the `useClusterClient` option.

## Code of Conduct

This project adheres to the Contributor Covenant [Code of
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ services:

zeebe-redis:
container_name: zeebe-broker-redis
image: ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:8.4.0
image: ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:8.4.2
hostname: zeebe
environment:
- ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379
- ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900
Expand Down Expand Up @@ -135,7 +136,7 @@ services:

simple-monitor-in-memory-redis:
container_name: zeebe-simple-monitor-in-memory-redis
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.6.4
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.7.1
environment:
- zeebe.client.broker.gateway-address=zeebe:26500
- zeebe-importer=redis
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<zeebe.version>8.3.4</zeebe.version>
<version.zeebe.spring>8.4.1</version.zeebe.spring>
<hazelcast.exporter.version>1.4.0</hazelcast.exporter.version>
<redis.exporter.version>0.9.9</redis.exporter.version>
<redis.exporter.version>0.9.10</redis.exporter.version>

<querydsl.version>5.0.0</querydsl.version>

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/zeebe/monitor/config/RedisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public class RedisConfig {
@Value("${zeebe.client.worker.redis.connection}")
private String redisConnection;

@Value("${zeebe.client.worker.redis.useClusterClient:false}")
private boolean useClusterClient;

@Value("${zeebe.client.worker.redis.consumer-group:simple-monitor}")
private String redisConumerGroup;

Expand All @@ -20,10 +23,17 @@ public class RedisConfig {
@Value("${zeebe.client.worker.redis.xread-block-millis:2000}")
private int redisXreadBlockMillis;

@Value("${zeebe.client.worker.redis.prefix:zeebe}")
private String redisPrefix;

public String getRedisConnection() {
return redisConnection;
}

public boolean isUseClusterClient() {
return useClusterClient;
}

public String getRedisConumerGroup() {
return redisConumerGroup;
}
Expand All @@ -35,4 +45,8 @@ public int getRedisXreadCount() {
public int getRedisXreadBlockMillis() {
return redisXreadBlockMillis;
}

public String getRedisPrefix() {
return redisPrefix;
}
}
75 changes: 43 additions & 32 deletions src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.zeebe.monitor.zeebe.redis;

import com.hazelcast.core.HazelcastInstance;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.monitor.config.RedisConfig;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import io.zeebe.monitor.zeebe.protobuf.importers.*;
import io.zeebe.redis.connect.java.RedisConnectionBuilder;
import io.zeebe.redis.connect.java.ZeebeRedis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -28,44 +26,57 @@ public class RedisImportService {
@Autowired private ErrorProtobufImporter errorImporter;

public ZeebeRedis importFrom(final RedisClient redisClient, RedisConfig redisConfig) {
final var builder = ZeebeRedis.newBuilder(redisClient)
.consumerGroup(redisConfig.getRedisConumerGroup())
.xreadCount(redisConfig.getRedisXreadCount()).xreadBlockMillis(redisConfig.getRedisXreadBlockMillis())
.prefix(redisConfig.getRedisPrefix());
addListener(builder);
return builder.build();
}

final var builder =
ZeebeRedis.newBuilder(redisClient)
.consumerGroup(redisConfig.getRedisConumerGroup())
.xreadCount(redisConfig.getRedisXreadCount()).xreadBlockMillis(redisConfig.getRedisXreadBlockMillis())
.addProcessListener(
record -> ifEvent(record, Schema.ProcessRecord::getMetadata, processAndElementImporter::importProcess))
public ZeebeRedis importFrom(final RedisClusterClient redisClient, RedisConfig redisConfig) {
final var builder = ZeebeRedis.newBuilder(redisClient)
.withStandardClusterOptions()
.consumerGroup(redisConfig.getRedisConumerGroup())
.xreadCount(redisConfig.getRedisXreadCount()).xreadBlockMillis(redisConfig.getRedisXreadBlockMillis())
.prefix(redisConfig.getRedisPrefix());
addListener(builder);
return builder.build();
}

private void addListener(RedisConnectionBuilder connectionBuilder) {
connectionBuilder
.addProcessListener(record ->
ifEvent(record, Schema.ProcessRecord::getMetadata, processAndElementImporter::importProcess))
.addProcessInstanceListener(
record ->
ifEvent(
record,
Schema.ProcessInstanceRecord::getMetadata,
processAndElementImporter::importProcessInstance))
record ->
ifEvent(
record,
Schema.ProcessInstanceRecord::getMetadata,
processAndElementImporter::importProcessInstance))
.addIncidentListener(
record -> ifEvent(record, Schema.IncidentRecord::getMetadata, incidentImporter::importIncident))
record -> ifEvent(record, Schema.IncidentRecord::getMetadata, incidentImporter::importIncident))
.addJobListener(
record -> ifEvent(record, Schema.JobRecord::getMetadata, jobImporter::importJob))
record -> ifEvent(record, Schema.JobRecord::getMetadata, jobImporter::importJob))
.addVariableListener(
record -> ifEvent(record, Schema.VariableRecord::getMetadata, variableImporter::importVariable))
record -> ifEvent(record, Schema.VariableRecord::getMetadata, variableImporter::importVariable))
.addTimerListener(
record -> ifEvent(record, Schema.TimerRecord::getMetadata, timerImporter::importTimer))
record -> ifEvent(record, Schema.TimerRecord::getMetadata, timerImporter::importTimer))
.addMessageListener(
record -> ifEvent(record, Schema.MessageRecord::getMetadata, messageImporter::importMessage))
record -> ifEvent(record, Schema.MessageRecord::getMetadata, messageImporter::importMessage))
.addMessageSubscriptionListener(
record ->
ifEvent(
record,
Schema.MessageSubscriptionRecord::getMetadata,
messageSubscriptionImporter::importMessageSubscription))
record ->
ifEvent(
record,
Schema.MessageSubscriptionRecord::getMetadata,
messageSubscriptionImporter::importMessageSubscription))
.addMessageStartEventSubscriptionListener(
record ->
ifEvent(
record,
Schema.MessageStartEventSubscriptionRecord::getMetadata,
messageSubscriptionImporter::importMessageStartEventSubscription))
record ->
ifEvent(
record,
Schema.MessageStartEventSubscriptionRecord::getMetadata,
messageSubscriptionImporter::importMessageStartEventSubscription))
.addErrorListener(errorImporter::importError);

return builder.build();
}

private <T> void ifEvent(
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.zeebe.monitor.config.RedisConfig;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -34,10 +35,15 @@ public void start() {
var redisUri = RedisURI.create(config.getRedisConnection());

LOG.info("Connecting to Redis {}, consumer group {}", redisUri, config.getRedisConumerGroup());
var redisClient = RedisClient.create(redisUri);

LOG.info("Importing records from Redis...");
closeable = importService.importFrom(redisClient, config);
if (config.isUseClusterClient()) {
var redisClient = RedisClusterClient.create(redisUri);
LOG.info("Importing records from Redis cluster...");
closeable = importService.importFrom(redisClient, config);
} else {
var redisClient = RedisClient.create(redisUri);
LOG.info("Importing records from Redis...");
closeable = importService.importFrom(redisClient, config);
}
}

@PreDestroy
Expand Down

0 comments on commit a593bfb

Please sign in to comment.