diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 191063016a63b..18bf3d24746e9 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KeyValue; @@ -33,6 +34,7 @@ import io.etcd.jetcd.watch.WatchEvent; import lombok.Getter; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties; @@ -44,14 +46,20 @@ import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** * Registry repository of ETCD. */ +@Slf4j public final class EtcdRepository implements ClusterPersistRepository { + private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build()); + private Client client; private EtcdProperties etcdProps; @@ -148,8 +156,7 @@ public void watch(final String key, final DataChangedEventListener dataChangedEv for (WatchEvent each : response.getEvents()) { Type type = getEventChangedType(each); if (Type.IGNORED != type) { - dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(StandardCharsets.UTF_8), - each.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)); + dispatchEvent(dataChangedEventListener, each, type); } } }); @@ -173,9 +180,19 @@ private Type getEventChangedType(final WatchEvent event) { } } + private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) { + CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8), + event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> { + if (null != throwable) { + log.error("Dispatch event failed", throwable); + } + }); + } + @Override public void close() { client.close(); + EVENT_LISTENER_EXECUTOR.shutdown(); } @Override