From c6c2e34be67216427b7066ede4512346ced3be04 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Thu, 28 Nov 2024 11:02:12 +0200 Subject: [PATCH 1/2] Added refresher for jedis cluster topology refresh --- .../resources/thingsboard-mqtt-broker.yml | 17 ++- .../broker/cache/JedisClusterNodesUtil.java | 59 ++++++++++ .../cache/JedisClusterTopologyRefresher.java | 96 +++++++++++++++ .../mqtt/broker/cache/LettuceConfig.java | 16 ++- .../cache/LettuceTopologyRefreshConfig.java | 26 ----- .../cache/TBRedisCacheConfiguration.java | 22 ++-- .../cache/TBRedisClusterConfiguration.java | 21 ++++ .../cache/JedisClusterNodesUtilTest.java | 109 ++++++++++++++++++ .../cache/TbRedisCacheConfigurationTest.java | 8 +- .../resources/application-test.properties | 4 +- docker/cache-redis-sentinel.env | 1 - k8s/aws/tb-broker-cache-configmap.yml | 2 +- k8s/azure/tb-broker-cache-configmap.yml | 2 +- 13 files changed, 329 insertions(+), 54 deletions(-) create mode 100644 common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtil.java create mode 100644 common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterTopologyRefresher.java delete mode 100644 common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceTopologyRefreshConfig.java create mode 100644 common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtilTest.java diff --git a/application/src/main/resources/thingsboard-mqtt-broker.yml b/application/src/main/resources/thingsboard-mqtt-broker.yml index 43aa181db..885441416 100644 --- a/application/src/main/resources/thingsboard-mqtt-broker.yml +++ b/application/src/main/resources/thingsboard-mqtt-broker.yml @@ -613,13 +613,24 @@ lettuce: shutdown-quiet-period: "${REDIS_LETTUCE_SHUTDOWN_QUIET_PERIOD_SEC:1}" # The shutdown timeout for lettuce client set in seconds shutdown-timeout: "${REDIS_LETTUCE_SHUTDOWN_TIMEOUT_SEC:10}" + cluster: + topology-refresh: + # Enables or disables periodic cluster topology updates. + # Useful for Redis Cluster setup to handle topology changes, + # such as node failover, restarts, or IP address changes + enabled: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED:false}" + # Specifies the interval (in seconds) for periodic cluster topology updates + period: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_PERIOD_SEC:60}" + +jedis: + cluster: topology-refresh: # Enables or disables periodic cluster topology updates. - # Useful for Redis Sentinel or Cluster setups to handle topology changes, + # Useful for Redis cluster setup to handle topology changes, # such as node failover, restarts, or IP address changes - enabled: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED:false}" + enabled: "${REDIS_JEDIS_TOPOLOGY_REFRESH_ENABLED:false}" # Specifies the interval (in seconds) for periodic cluster topology updates - period: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_PERIOD_SEC:60}" + period: "${REDIS_JEDIS_TOPOLOGY_REFRESH_PERIOD_SEC:60}" # SQL DAO configuration parameters spring: diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtil.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtil.java new file mode 100644 index 000000000..2aa42a3db --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtil.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt.broker.cache; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.RedisNode; + +@Slf4j +public class JedisClusterNodesUtil { + + public static RedisNode parseClusterNodeLine(String line) { + String[] parts = line.split(" "); + if (parts.length < 8) { + throw new IllegalArgumentException("Invalid cluster node line format: " + line); + } + try { + // Node ID + String id = parts[0]; + + // Extract host and port from + String[] hostPort = parts[1].split(":"); + String host = hostPort[0]; + int port = Integer.parseInt(hostPort[1].split("@")[0]); + + // Flags to determine node type + String flags = parts[2]; + RedisNode.NodeType type = flags.contains("master") ? + RedisNode.NodeType.MASTER : RedisNode.NodeType.REPLICA; + + RedisNode.RedisNodeBuilder redisNodeBuilder = RedisNode.newRedisNode() + .listeningAt(host, port) + .withId(id) + .promotedAs(type); + + String masterId = parts[3]; + boolean masterIdUnknown = "-".equals(masterId); + if (masterIdUnknown) { + return redisNodeBuilder.build(); + } + return redisNodeBuilder.replicaOf(masterId).build(); + } catch (Exception e) { + throw new RuntimeException("Error parsing cluster node line: " + line, e); + } + } + +} diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterTopologyRefresher.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterTopologyRefresher.java new file mode 100644 index 000000000..6a0bea749 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/JedisClusterTopologyRefresher.java @@ -0,0 +1,96 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt.broker.cache; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Profile; +import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisNode; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import redis.clients.jedis.Jedis; + +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +// TODO: replace jedis from TBMQ implementation and use only lettuce. +@Slf4j +@Component +@Profile("!install") +@RequiredArgsConstructor +@ConditionalOnExpression("'cluster'.equals('${redis.connection.type}') and 'true'.equals('${jedis.cluster.topology-refresh.enabled}')") +public class JedisClusterTopologyRefresher { + + private final JedisConnectionFactory factory; + + @Scheduled(initialDelayString = "${jedis.cluster.topology-refresh.period}", fixedDelayString = "${jedis.cluster.topology-refresh.period}", timeUnit = TimeUnit.SECONDS) + public void refreshTopology() { + if (!factory.isRedisClusterAware()) { + log.trace("Redis cluster configuration is not set!"); + return; + } + try { + RedisClusterConfiguration clusterConfig = factory.getClusterConfiguration(); + Set currentNodes = clusterConfig.getClusterNodes(); + log.trace("Current Redis cluster nodes: {}", currentNodes); + + for (RedisNode node : currentNodes) { + if (!node.hasValidHost()) { + log.debug("Skip Redis node with invalid host: {}", node); + continue; + } + if (node.getPort() == null) { + log.debug("Skip Redis node with null port: {}", node); + continue; + } + try (Jedis jedis = new Jedis(node.getHost(), node.getPort())) { + if (factory.getPassword() != null) { + jedis.auth(factory.getPassword()); + } + Set redisNodes = getRedisNodes(node, jedis); + if (currentNodes.equals(redisNodes)) { + log.trace("Redis cluster topology is up to date!"); + break; + } + clusterConfig.setClusterNodes(redisNodes); + log.trace("Successfully updated Redis cluster topology, nodes: {}", redisNodes); + break; + } catch (Exception e) { + log.debug("Failed to refresh cluster topology using node: {}", node.getHost(), e); + } + } + } catch (Exception e) { + log.warn("Failed to refresh cluster topology", e); + } + } + + private Set getRedisNodes(RedisNode node, Jedis jedis) { + String clusterNodes = jedis.clusterNodes(); + log.trace("Caller Redis node: {}:{} CLUSTER NODES output:{}{}", node.getHost(), node.getPort(), System.lineSeparator(), clusterNodes); + // Split the clusterNodes string into individual lines and parse each line + // Each line is composed of the following fields: + // ... + return Arrays.stream(clusterNodes.split(System.lineSeparator())) + .map(JedisClusterNodesUtil::parseClusterNodeLine) + .collect(Collectors.toSet()); + } + +} diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceConfig.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceConfig.java index 369834ac1..8d307a666 100644 --- a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceConfig.java +++ b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceConfig.java @@ -26,6 +26,20 @@ public class LettuceConfig { private int shutdownQuietPeriod; private int shutdownTimeout; - private LettuceTopologyRefreshConfig topologyRefresh; + private ClusterConfig cluster; + + @Data + public static class ClusterConfig { + private LettuceTopologyRefreshConfig topologyRefresh; + + @Data + public static class LettuceTopologyRefreshConfig { + + private boolean enabled; + private int period; + + } + + } } diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceTopologyRefreshConfig.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceTopologyRefreshConfig.java deleted file mode 100644 index 5d7e36049..000000000 --- a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/LettuceTopologyRefreshConfig.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.mqtt.broker.cache; - -import lombok.Data; - -@Data -public class LettuceTopologyRefreshConfig { - - private boolean enabled; - private int period; - -} diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisCacheConfiguration.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisCacheConfiguration.java index 9b507b738..2e4611bb2 100644 --- a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisCacheConfiguration.java +++ b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisCacheConfiguration.java @@ -18,9 +18,8 @@ import io.github.bucket4j.distributed.serialization.Mapper; import io.github.bucket4j.redis.jedis.Bucket4jJedis; import io.github.bucket4j.redis.jedis.cas.JedisBasedProxyManager; +import io.lettuce.core.ClientOptions; import io.lettuce.core.TimeoutOptions; -import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import lombok.Data; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.annotation.Value; @@ -59,7 +58,7 @@ public abstract class TBRedisCacheConfiguration { private final CacheSpecsMap cacheSpecsMap; - private final LettuceConfig lettuceConfig; + protected final LettuceConfig lettuceConfig; @Value("${redis.pool_config.maxTotal:128}") private int maxTotal; @@ -112,22 +111,15 @@ public LettuceConnectionFactory lettuceConnectionFactory() { lettucePoolingClientConfigBuilder.shutdownQuietPeriod(Duration.ofSeconds(lettuceConfig.getShutdownQuietPeriod())); lettucePoolingClientConfigBuilder.shutdownTimeout(Duration.ofSeconds(lettuceConfig.getShutdownTimeout())); - ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() - .enablePeriodicRefresh(lettuceConfig.getTopologyRefresh().isEnabled()) - .refreshPeriod(Duration.ofSeconds(lettuceConfig.getTopologyRefresh().getPeriod())) - .enableAllAdaptiveRefreshTriggers() - .build(); - - ClusterClientOptions clientOptions = ClusterClientOptions - .builder() - .timeoutOptions(TimeoutOptions.enabled()) - .topologyRefreshOptions(topologyRefreshOptions) - .build(); - lettucePoolingClientConfigBuilder.clientOptions(clientOptions); + lettucePoolingClientConfigBuilder.clientOptions(getLettuceClientOptions()); return new LettuceConnectionFactory(getRedisConfiguration(), lettucePoolingClientConfigBuilder.build()); } + protected ClientOptions getLettuceClientOptions() { + return ClientOptions.builder().timeoutOptions(TimeoutOptions.enabled()).build(); + } + protected abstract JedisConnectionFactory loadFactory(); protected abstract boolean useDefaultPoolConfig(); diff --git a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisClusterConfiguration.java b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisClusterConfiguration.java index 3d9d65424..7d3133df8 100644 --- a/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisClusterConfiguration.java +++ b/common/cache/src/main/java/org/thingsboard/mqtt/broker/cache/TBRedisClusterConfiguration.java @@ -15,6 +15,10 @@ */ package org.thingsboard.mqtt.broker.cache; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; @@ -25,6 +29,8 @@ import redis.clients.jedis.JedisCluster; import redis.clients.jedis.UnifiedJedis; +import java.time.Duration; + @Configuration @ConditionalOnProperty(prefix = "redis.connection", value = "type", havingValue = "cluster") public class TBRedisClusterConfiguration extends TBRedisCacheConfiguration { @@ -73,4 +79,19 @@ protected RedisClusterConfiguration getRedisConfiguration() { return clusterConfiguration; } + @Override + protected ClientOptions getLettuceClientOptions() { + ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() + .enablePeriodicRefresh(lettuceConfig.getCluster().getTopologyRefresh().isEnabled()) + .refreshPeriod(Duration.ofSeconds(lettuceConfig.getCluster().getTopologyRefresh().getPeriod())) + .enableAllAdaptiveRefreshTriggers() + .build(); + + return ClusterClientOptions + .builder() + .timeoutOptions(TimeoutOptions.enabled()) + .topologyRefreshOptions(topologyRefreshOptions) + .build(); + } + } diff --git a/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtilTest.java b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtilTest.java new file mode 100644 index 000000000..328269ea8 --- /dev/null +++ b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/JedisClusterNodesUtilTest.java @@ -0,0 +1,109 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt.broker.cache; + +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisNode; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatException; + +class JedisClusterNodesUtilTest { + + @Test + void testParseClusterNodeLineMasterNode() { + String line = "e91061e872bf29860c974fe719865a022f3ef7be 172.21.0.10:6379@16379 master - 0 1732720493433 1 connected 0-5460"; + + RedisNode redisNode = JedisClusterNodesUtil.parseClusterNodeLine(line); + + assertThat(redisNode).isNotNull(); + assertThat(redisNode.getId()).isEqualTo("e91061e872bf29860c974fe719865a022f3ef7be"); + assertThat(redisNode.getHost()).isEqualTo("172.21.0.10"); + assertThat(redisNode.getPort()).isEqualTo(6379); + assertThat(redisNode.isMaster()).isTrue(); + assertThat(redisNode.getMasterId()).isNull(); + } + + @Test + void testParseClusterNodeLineReplicaNode() { + String line = "d704a7a52d73ee3f969728b9cc7eae61f06f992f 172.21.0.6:6379@16379 slave e91061e872bf29860c974fe719865a022f3ef7be 0 1732720492429 1 connected"; + + RedisNode redisNode = JedisClusterNodesUtil.parseClusterNodeLine(line); + + assertThat(redisNode).isNotNull(); + assertThat(redisNode.getId()).isEqualTo("d704a7a52d73ee3f969728b9cc7eae61f06f992f"); + assertThat(redisNode.getHost()).isEqualTo("172.21.0.6"); + assertThat(redisNode.getPort()).isEqualTo(6379); + assertThat(redisNode.isReplica()).isTrue(); + assertThat(redisNode.getMasterId()).isNotNull().isEqualTo("e91061e872bf29860c974fe719865a022f3ef7be"); + } + + @Test + void testParseClusterNodeLineWithHostname() { + String line = "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,hostname4 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected"; + + RedisNode redisNode = JedisClusterNodesUtil.parseClusterNodeLine(line); + + assertThat(redisNode).isNotNull(); + assertThat(redisNode.getId()).isEqualTo("07c37dfeb235213a872192d90877d0cd55635b91"); + assertThat(redisNode.getHost()).isEqualTo("127.0.0.1"); + assertThat(redisNode.getPort()).isEqualTo(30004); + assertThat(redisNode.isReplica()).isTrue(); + assertThat(redisNode.getMasterId()).isNotNull().isEqualTo("e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca"); + } + + @Test + void testParseClusterNodeLineWithSlots() { + // Arrange + String line = "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002 master - 0 1426238316232 2 connected 5461-10922"; + + // Act + RedisNode redisNode = JedisClusterNodesUtil.parseClusterNodeLine(line); + + // Assert + assertThat(redisNode).isNotNull(); + assertThat(redisNode.getId()).isEqualTo("67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1"); + assertThat(redisNode.getHost()).isEqualTo("127.0.0.1"); + assertThat(redisNode.getPort()).isEqualTo(30002); + assertThat(redisNode.isMaster()).isTrue(); + assertThat(redisNode.getMasterId()).isNull(); + } + + @Test + void testParseClusterNodeLineWithMultipleFlags() { + // Arrange + String line = "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460"; + + // Act + RedisNode redisNode = JedisClusterNodesUtil.parseClusterNodeLine(line); + + // Assert + assertThat(redisNode).isNotNull(); + assertThat(redisNode.getId()).isEqualTo("e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca"); + assertThat(redisNode.getHost()).isEqualTo("127.0.0.1"); + assertThat(redisNode.getPort()).isEqualTo(30001); + assertThat(redisNode.isMaster()).isTrue(); + assertThat(redisNode.getMasterId()).isNull(); + } + + @Test + void testParseClusterNodeLineIncompleteFields() { + String line = "e91061e872bf29860c974fe719865a022f3ef7be 172.21.0.10:6379@16379 master -"; + + assertThatException().isThrownBy(() -> JedisClusterNodesUtil.parseClusterNodeLine(line)) + .isInstanceOf(IllegalArgumentException.class).withMessage("Invalid cluster node line format: " + line); + } +} diff --git a/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java index fe0917e66..c0371e57c 100644 --- a/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java +++ b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java @@ -44,8 +44,8 @@ "cache.specs.mqttClientCredentials.timeToLiveInMinutes=1440", "lettuce.config.shutdown-quiet-period=1", "lettuce.config.shutdown-timeout=10", - "lettuce.config.topology-refresh.enabled=false", - "lettuce.config.topology-refresh.period=60" + "lettuce.config.cluster.topology-refresh.enabled=false", + "lettuce.config.cluster.topology-refresh.period=60" }) @Slf4j public class TbRedisCacheConfigurationTest { @@ -94,8 +94,8 @@ public void verifyLettuceConnectionFactoryProperties() { var topologyRefreshOptions = clusterClientOptions.getTopologyRefreshOptions(); assertThat(topologyRefreshOptions).isNotNull(); - assertThat(topologyRefreshOptions.isPeriodicRefreshEnabled()).isEqualTo(lettuceConfig.getTopologyRefresh().isEnabled()); - assertThat(topologyRefreshOptions.getRefreshPeriod()).isEqualTo(Duration.ofSeconds(lettuceConfig.getTopologyRefresh().getPeriod())); + assertThat(topologyRefreshOptions.isPeriodicRefreshEnabled()).isEqualTo(lettuceConfig.getCluster().getTopologyRefresh().isEnabled()); + assertThat(topologyRefreshOptions.getRefreshPeriod()).isEqualTo(Duration.ofSeconds(lettuceConfig.getCluster().getTopologyRefresh().getPeriod())); assertThat(topologyRefreshOptions.getAdaptiveRefreshTriggers()).isEqualTo(EnumSet.allOf(ClusterTopologyRefreshOptions.RefreshTrigger.class)); var timeoutOptions = clusterClientOptions.getTimeoutOptions(); diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 5eb752dba..d00e5db11 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -13,8 +13,8 @@ lettuce.auto-flush=true #lettuce.flush-interval-ms=5 lettuce.config.shutdown-quiet-period=1 lettuce.config.shutdown-timeout=10 -lettuce.config.topology-refresh.enabled=false -lettuce.config.topology-refresh.period=60 +lettuce.config.cluster.topology-refresh.enabled=false +lettuce.config.cluster.topology-refresh.period=60 mqtt.persistent-session.device.persisted-messages.ttl=600 mqtt.persistent-session.device.persisted-messages.limit=10 diff --git a/docker/cache-redis-sentinel.env b/docker/cache-redis-sentinel.env index 5f086269b..256773284 100644 --- a/docker/cache-redis-sentinel.env +++ b/docker/cache-redis-sentinel.env @@ -4,4 +4,3 @@ REDIS_SENTINELS=redis-sentinel:26379 REDIS_SENTINEL_PASSWORD=sentinel REDIS_SENTINEL_USE_DEFAULT_POOL_CONFIG=false REDIS_PASSWORD=thingsboard -REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED=true diff --git a/k8s/aws/tb-broker-cache-configmap.yml b/k8s/aws/tb-broker-cache-configmap.yml index 70a8bb584..d1c435c9a 100644 --- a/k8s/aws/tb-broker-cache-configmap.yml +++ b/k8s/aws/tb-broker-cache-configmap.yml @@ -29,5 +29,5 @@ data: REDIS_HOST: "YOUR_REDIS_ENDPOINT_URL_WITHOUT_PORT" #REDIS_PASSWORD: "YOUR_REDIS_PASSWORD" - # Recommended to enable in Kubernetes environments in case of Redis cluster or master-replica setups to handle dynamic IP changes and nodes failover + # Recommended to enable in Kubernetes environments in case of Redis cluster setup to handle dynamic IP changes and nodes failover #REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED: "true" diff --git a/k8s/azure/tb-broker-cache-configmap.yml b/k8s/azure/tb-broker-cache-configmap.yml index 70a8bb584..d1c435c9a 100644 --- a/k8s/azure/tb-broker-cache-configmap.yml +++ b/k8s/azure/tb-broker-cache-configmap.yml @@ -29,5 +29,5 @@ data: REDIS_HOST: "YOUR_REDIS_ENDPOINT_URL_WITHOUT_PORT" #REDIS_PASSWORD: "YOUR_REDIS_PASSWORD" - # Recommended to enable in Kubernetes environments in case of Redis cluster or master-replica setups to handle dynamic IP changes and nodes failover + # Recommended to enable in Kubernetes environments in case of Redis cluster setup to handle dynamic IP changes and nodes failover #REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED: "true" From 4998de1abab618c6a6637d2283c0da5091f16310 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Thu, 28 Nov 2024 14:22:25 +0200 Subject: [PATCH 2/2] fixed env variables naming & fixed redis cache configuration test --- .../main/resources/thingsboard-mqtt-broker.yml | 9 +++++---- .../cache/TbRedisCacheConfigurationTest.java | 16 +++------------- docker/cache-redis-cluster.env | 3 ++- k8s/aws/tb-broker-cache-configmap.yml | 3 ++- k8s/azure/tb-broker-cache-configmap.yml | 3 ++- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/application/src/main/resources/thingsboard-mqtt-broker.yml b/application/src/main/resources/thingsboard-mqtt-broker.yml index 885441416..7d43347c4 100644 --- a/application/src/main/resources/thingsboard-mqtt-broker.yml +++ b/application/src/main/resources/thingsboard-mqtt-broker.yml @@ -618,19 +618,20 @@ lettuce: # Enables or disables periodic cluster topology updates. # Useful for Redis Cluster setup to handle topology changes, # such as node failover, restarts, or IP address changes - enabled: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED:false}" + enabled: "${REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_ENABLED:false}" # Specifies the interval (in seconds) for periodic cluster topology updates - period: "${REDIS_LETTUCE_TOPOLOGY_REFRESH_PERIOD_SEC:60}" + period: "${REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_PERIOD_SEC:60}" +# Redis jedis configuration parameters jedis: cluster: topology-refresh: # Enables or disables periodic cluster topology updates. # Useful for Redis cluster setup to handle topology changes, # such as node failover, restarts, or IP address changes - enabled: "${REDIS_JEDIS_TOPOLOGY_REFRESH_ENABLED:false}" + enabled: "${REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_ENABLED:false}" # Specifies the interval (in seconds) for periodic cluster topology updates - period: "${REDIS_JEDIS_TOPOLOGY_REFRESH_PERIOD_SEC:60}" + period: "${REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_PERIOD_SEC:60}" # SQL DAO configuration parameters spring: diff --git a/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java index c0371e57c..cc9126934 100644 --- a/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java +++ b/common/cache/src/test/java/org/thingsboard/mqtt/broker/cache/TbRedisCacheConfigurationTest.java @@ -15,8 +15,7 @@ */ package org.thingsboard.mqtt.broker.cache; -import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; +import io.lettuce.core.ClientOptions; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -32,7 +31,6 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import java.time.Duration; -import java.util.EnumSet; import static org.assertj.core.api.Assertions.assertThat; @@ -88,17 +86,9 @@ public void verifyLettuceConnectionFactoryProperties() { assertThat(clientOptionsOpt).isNotEmpty(); var clientOptions = clientOptionsOpt.get(); - assertThat(clientOptions).isInstanceOf(ClusterClientOptions.class); + assertThat(clientOptions).isInstanceOf(ClientOptions.class); - var clusterClientOptions = (ClusterClientOptions) clientOptions; - - var topologyRefreshOptions = clusterClientOptions.getTopologyRefreshOptions(); - assertThat(topologyRefreshOptions).isNotNull(); - assertThat(topologyRefreshOptions.isPeriodicRefreshEnabled()).isEqualTo(lettuceConfig.getCluster().getTopologyRefresh().isEnabled()); - assertThat(topologyRefreshOptions.getRefreshPeriod()).isEqualTo(Duration.ofSeconds(lettuceConfig.getCluster().getTopologyRefresh().getPeriod())); - assertThat(topologyRefreshOptions.getAdaptiveRefreshTriggers()).isEqualTo(EnumSet.allOf(ClusterTopologyRefreshOptions.RefreshTrigger.class)); - - var timeoutOptions = clusterClientOptions.getTimeoutOptions(); + var timeoutOptions = clientOptions.getTimeoutOptions(); assertThat(timeoutOptions).isNotNull(); assertThat(timeoutOptions.isTimeoutCommands()).isTrue(); diff --git a/docker/cache-redis-cluster.env b/docker/cache-redis-cluster.env index 39251fad7..ece311331 100644 --- a/docker/cache-redis-cluster.env +++ b/docker/cache-redis-cluster.env @@ -2,4 +2,5 @@ REDIS_CONNECTION_TYPE=cluster REDIS_NODES=redis-node-0:6379,redis-node-1:6379,redis-node-2:6379,redis-node-3:6379,redis-node-4:6379,redis-node-5:6379 REDIS_CLUSTER_USE_DEFAULT_POOL_CONFIG=false REDIS_PASSWORD=thingsboard -REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED=true +REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_ENABLED=true +REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_ENABLED=true diff --git a/k8s/aws/tb-broker-cache-configmap.yml b/k8s/aws/tb-broker-cache-configmap.yml index d1c435c9a..e42487955 100644 --- a/k8s/aws/tb-broker-cache-configmap.yml +++ b/k8s/aws/tb-broker-cache-configmap.yml @@ -30,4 +30,5 @@ data: #REDIS_PASSWORD: "YOUR_REDIS_PASSWORD" # Recommended to enable in Kubernetes environments in case of Redis cluster setup to handle dynamic IP changes and nodes failover - #REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED: "true" + #REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_ENABLED: "true" + #REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_ENABLED: "true" diff --git a/k8s/azure/tb-broker-cache-configmap.yml b/k8s/azure/tb-broker-cache-configmap.yml index d1c435c9a..e42487955 100644 --- a/k8s/azure/tb-broker-cache-configmap.yml +++ b/k8s/azure/tb-broker-cache-configmap.yml @@ -30,4 +30,5 @@ data: #REDIS_PASSWORD: "YOUR_REDIS_PASSWORD" # Recommended to enable in Kubernetes environments in case of Redis cluster setup to handle dynamic IP changes and nodes failover - #REDIS_LETTUCE_TOPOLOGY_REFRESH_ENABLED: "true" + #REDIS_LETTUCE_CLUSTER_TOPOLOGY_REFRESH_ENABLED: "true" + #REDIS_JEDIS_CLUSTER_TOPOLOGY_REFRESH_ENABLED: "true"