Skip to content

Commit

Permalink
Merge pull request #184 from thingsboard/bugfix/jedis-cluster-topolog…
Browse files Browse the repository at this point in the history
…y-refresh

Added Redis cluster topology refresh options for Jedis implementation
  • Loading branch information
dmytro-landiak authored Nov 28, 2024
2 parents 29278f2 + 4998de1 commit 305ba25
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 68 deletions.
18 changes: 15 additions & 3 deletions application/src/main/resources/thingsboard-mqtt-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,25 @@ lettuce:
shutdown-quiet-period: "${REDIS_LETTUCE_SHUTDOWN_QUIET_PERIOD_SEC:1}"
# The shutdown timeout for lettuce client set in seconds
shutdown-timeout: "${REDIS_LETTUCE_SHUTDOWN_TIMEOUT_SEC:10}"
cluster:
topology-refresh:
# Enables or disables periodic cluster topology updates.
# Useful for Redis Cluster setup to handle topology changes,
# such as node failover, restarts, or IP address changes
enabled: "${REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_ENABLED:false}"
# Specifies the interval (in seconds) for periodic cluster topology updates
period: "${REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_PERIOD_SEC:60}"

# Redis jedis configuration parameters
jedis:
cluster:
topology-refresh:
# Enables or disables periodic cluster topology updates.
# Useful for Redis Sentinel or Cluster setups to handle topology changes,
# Useful for Redis cluster setup to handle topology changes,
# such as node failover, restarts, or IP address changes
enabled: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED:false}"
enabled: "${REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_ENABLED:false}"
# Specifies the interval (in seconds) for periodic cluster topology updates
period: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_PERIOD_SEC:60}"
period: "${REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_PERIOD_SEC:60}"

# SQL DAO configuration parameters
spring:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.cache;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisNode;

@Slf4j
public class JedisClusterNodesUtil {

public static RedisNode parseClusterNodeLine(String line) {
String[] parts = line.split(" ");
if (parts.length < 8) {
throw new IllegalArgumentException("Invalid cluster node line format: " + line);
}
try {
// Node ID
String id = parts[0];

// Extract host and port from <ip:port@cport>
String[] hostPort = parts[1].split(":");
String host = hostPort[0];
int port = Integer.parseInt(hostPort[1].split("@")[0]);

// Flags to determine node type
String flags = parts[2];
RedisNode.NodeType type = flags.contains("master") ?
RedisNode.NodeType.MASTER : RedisNode.NodeType.REPLICA;

RedisNode.RedisNodeBuilder redisNodeBuilder = RedisNode.newRedisNode()
.listeningAt(host, port)
.withId(id)
.promotedAs(type);

String masterId = parts[3];
boolean masterIdUnknown = "-".equals(masterId);
if (masterIdUnknown) {
return redisNodeBuilder.build();
}
return redisNodeBuilder.replicaOf(masterId).build();
} catch (Exception e) {
throw new RuntimeException("Error parsing cluster node line: " + line, e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.cache;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

// TODO: replace jedis from TBMQ implementation and use only lettuce.
@Slf4j
@Component
@Profile("!install")
@RequiredArgsConstructor
@ConditionalOnExpression("'cluster'.equals('${redis.connection.type}') and 'true'.equals('${jedis.cluster.topology-refresh.enabled}')")
public class JedisClusterTopologyRefresher {

private final JedisConnectionFactory factory;

@Scheduled(initialDelayString = "${jedis.cluster.topology-refresh.period}", fixedDelayString = "${jedis.cluster.topology-refresh.period}", timeUnit = TimeUnit.SECONDS)
public void refreshTopology() {
if (!factory.isRedisClusterAware()) {
log.trace("Redis cluster configuration is not set!");
return;
}
try {
RedisClusterConfiguration clusterConfig = factory.getClusterConfiguration();
Set<RedisNode> currentNodes = clusterConfig.getClusterNodes();
log.trace("Current Redis cluster nodes: {}", currentNodes);

for (RedisNode node : currentNodes) {
if (!node.hasValidHost()) {
log.debug("Skip Redis node with invalid host: {}", node);
continue;
}
if (node.getPort() == null) {
log.debug("Skip Redis node with null port: {}", node);
continue;
}
try (Jedis jedis = new Jedis(node.getHost(), node.getPort())) {
if (factory.getPassword() != null) {
jedis.auth(factory.getPassword());
}
Set<RedisNode> redisNodes = getRedisNodes(node, jedis);
if (currentNodes.equals(redisNodes)) {
log.trace("Redis cluster topology is up to date!");
break;
}
clusterConfig.setClusterNodes(redisNodes);
log.trace("Successfully updated Redis cluster topology, nodes: {}", redisNodes);
break;
} catch (Exception e) {
log.debug("Failed to refresh cluster topology using node: {}", node.getHost(), e);
}
}
} catch (Exception e) {
log.warn("Failed to refresh cluster topology", e);
}
}

private Set<RedisNode> getRedisNodes(RedisNode node, Jedis jedis) {
String clusterNodes = jedis.clusterNodes();
log.trace("Caller Redis node: {}:{} CLUSTER NODES output:{}{}", node.getHost(), node.getPort(), System.lineSeparator(), clusterNodes);
// Split the clusterNodes string into individual lines and parse each line
// Each line is composed of the following fields:
// <id> <ip:port@cport[,hostname]> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>
return Arrays.stream(clusterNodes.split(System.lineSeparator()))
.map(JedisClusterNodesUtil::parseClusterNodeLine)
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ public class LettuceConfig {

private int shutdownQuietPeriod;
private int shutdownTimeout;
private LettuceTopologyRefreshConfig topologyRefresh;
private ClusterConfig cluster;

@Data
public static class ClusterConfig {
private LettuceTopologyRefreshConfig topologyRefresh;

@Data
public static class LettuceTopologyRefreshConfig {

private boolean enabled;
private int period;

}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.github.bucket4j.distributed.serialization.Mapper;
import io.github.bucket4j.redis.jedis.Bucket4jJedis;
import io.github.bucket4j.redis.jedis.cas.JedisBasedProxyManager;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import lombok.Data;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -59,7 +58,7 @@
public abstract class TBRedisCacheConfiguration<C extends RedisConfiguration> {

private final CacheSpecsMap cacheSpecsMap;
private final LettuceConfig lettuceConfig;
protected final LettuceConfig lettuceConfig;

@Value("${redis.pool_config.maxTotal:128}")
private int maxTotal;
Expand Down Expand Up @@ -112,22 +111,15 @@ public LettuceConnectionFactory lettuceConnectionFactory() {
lettucePoolingClientConfigBuilder.shutdownQuietPeriod(Duration.ofSeconds(lettuceConfig.getShutdownQuietPeriod()));
lettucePoolingClientConfigBuilder.shutdownTimeout(Duration.ofSeconds(lettuceConfig.getShutdownTimeout()));

ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(lettuceConfig.getTopologyRefresh().isEnabled())
.refreshPeriod(Duration.ofSeconds(lettuceConfig.getTopologyRefresh().getPeriod()))
.enableAllAdaptiveRefreshTriggers()
.build();

ClusterClientOptions clientOptions = ClusterClientOptions
.builder()
.timeoutOptions(TimeoutOptions.enabled())
.topologyRefreshOptions(topologyRefreshOptions)
.build();
lettucePoolingClientConfigBuilder.clientOptions(clientOptions);
lettucePoolingClientConfigBuilder.clientOptions(getLettuceClientOptions());

return new LettuceConnectionFactory(getRedisConfiguration(), lettucePoolingClientConfigBuilder.build());
}

protected ClientOptions getLettuceClientOptions() {
return ClientOptions.builder().timeoutOptions(TimeoutOptions.enabled()).build();
}

protected abstract JedisConnectionFactory loadFactory();

protected abstract boolean useDefaultPoolConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package org.thingsboard.mqtt.broker.cache;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
Expand All @@ -25,6 +29,8 @@
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.UnifiedJedis;

import java.time.Duration;

@Configuration
@ConditionalOnProperty(prefix = "redis.connection", value = "type", havingValue = "cluster")
public class TBRedisClusterConfiguration extends TBRedisCacheConfiguration<RedisClusterConfiguration> {
Expand Down Expand Up @@ -73,4 +79,19 @@ protected RedisClusterConfiguration getRedisConfiguration() {
return clusterConfiguration;
}

@Override
protected ClientOptions getLettuceClientOptions() {
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(lettuceConfig.getCluster().getTopologyRefresh().isEnabled())
.refreshPeriod(Duration.ofSeconds(lettuceConfig.getCluster().getTopologyRefresh().getPeriod()))
.enableAllAdaptiveRefreshTriggers()
.build();

return ClusterClientOptions
.builder()
.timeoutOptions(TimeoutOptions.enabled())
.topologyRefreshOptions(topologyRefreshOptions)
.build();
}

}
Loading

0 comments on commit 305ba25

Please sign in to comment.