From 6c40cff6e92b82a3882f6e2102fecffd5363fe98 Mon Sep 17 00:00:00 2001 From: zhaojinchao Date: Tue, 16 Jan 2024 16:32:49 +0800 Subject: [PATCH 1/3] Refactor EtcdRepository watch consume --- .../cluster/etcd/EtcdRepository.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 191063016a63b..e3fde28fbb214 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KeyValue; @@ -33,6 +34,7 @@ import io.etcd.jetcd.watch.WatchEvent; import lombok.Getter; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties; @@ -44,12 +46,16 @@ import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** * Registry repository of ETCD. */ +@Slf4j public final class EtcdRepository implements ClusterPersistRepository { private Client client; @@ -59,6 +65,8 @@ public final class EtcdRepository implements ClusterPersistRepository { @Getter private DistributedLockHolder distributedLockHolder; + private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build()); + @Override public void init(final ClusterPersistRepositoryConfiguration config) { etcdProps = new EtcdProperties(config.getProps()); @@ -148,8 +156,13 @@ public void watch(final String key, final DataChangedEventListener dataChangedEv for (WatchEvent each : response.getEvents()) { Type type = getEventChangedType(each); if (Type.IGNORED != type) { - dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(StandardCharsets.UTF_8), - each.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)); + CompletableFuture.runAsync(() -> + dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(StandardCharsets.UTF_8), + each.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { + if (null != throwable) { + log.error("Consume event failed", throwable); + } + }); } } }); @@ -176,6 +189,7 @@ private Type getEventChangedType(final WatchEvent event) { @Override public void close() { client.close(); + EVENT_LISTENER_EXECUTOR.shutdown(); } @Override From 5855ef89d5013e65ef654351251174c9ffb85db2 Mon Sep 17 00:00:00 2001 From: zhaojinchao Date: Tue, 16 Jan 2024 16:46:32 +0800 Subject: [PATCH 2/3] Fix checkstyle --- .../cluster/etcd/EtcdRepository.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index e3fde28fbb214..a87265dc17482 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -58,6 +58,8 @@ @Slf4j public final class EtcdRepository implements ClusterPersistRepository { + private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build()); + private Client client; private EtcdProperties etcdProps; @@ -65,8 +67,6 @@ public final class EtcdRepository implements ClusterPersistRepository { @Getter private DistributedLockHolder distributedLockHolder; - private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build()); - @Override public void init(final ClusterPersistRepositoryConfiguration config) { etcdProps = new EtcdProperties(config.getProps()); @@ -156,13 +156,7 @@ public void watch(final String key, final DataChangedEventListener dataChangedEv for (WatchEvent each : response.getEvents()) { Type type = getEventChangedType(each); if (Type.IGNORED != type) { - CompletableFuture.runAsync(() -> - dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(StandardCharsets.UTF_8), - each.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { - if (null != throwable) { - log.error("Consume event failed", throwable); - } - }); + dispatchEvent(dataChangedEventListener, each, type); } } }); @@ -186,6 +180,16 @@ private Type getEventChangedType(final WatchEvent event) { } } + private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) { + CompletableFuture.runAsync(() -> + dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8), + event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { + if (null != throwable) { + log.error("Dispatch event failed", throwable); + } + }); + } + @Override public void close() { client.close(); From 3b7414468576639ecb557359c9a61d089af98a06 Mon Sep 17 00:00:00 2001 From: zhaojinchao Date: Tue, 16 Jan 2024 16:52:52 +0800 Subject: [PATCH 3/3] Fix spotless --- .../repository/cluster/etcd/EtcdRepository.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index a87265dc17482..18bf3d24746e9 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -181,13 +181,12 @@ private Type getEventChangedType(final WatchEvent event) { } private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) { - CompletableFuture.runAsync(() -> - dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8), - event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { - if (null != throwable) { - log.error("Dispatch event failed", throwable); - } - }); + CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8), + event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { + if (null != throwable) { + log.error("Dispatch event failed", throwable); + } + }); } @Override