From 3df74dcb7ee268c57b17b8a211d8dbbaacb4b76b Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Wed, 25 Oct 2017 19:31:23 +0800 Subject: [PATCH 1/7] Fixd #672 Monitor blocks when registry unreachable --- .../support/AbstractMonitorFactory.java | 83 +++++++++++++++++-- .../dubbo/monitor/support/MonitorFilter.java | 3 + 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index 3a8f477822a..4e34cbe82f5 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -17,6 +17,9 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.NamedThreadFactory; import com.alibaba.dubbo.monitor.Monitor; import com.alibaba.dubbo.monitor.MonitorFactory; import com.alibaba.dubbo.monitor.MonitorService; @@ -24,7 +27,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** @@ -33,6 +42,7 @@ * @author william.liangf */ public abstract class AbstractMonitorFactory implements MonitorFactory { + private static final Logger logger = LoggerFactory.getLogger(AbstractMonitorFactory.class); // 注册中心获取过程锁 private static final ReentrantLock LOCK = new ReentrantLock(); @@ -40,6 +50,14 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { // 注册中心集合 Map private static final Map MONITORS = new ConcurrentHashMap(); + private static final Map> MONITOR_CREATORS = new ConcurrentHashMap>(); + + private static final ExecutorService monitorScanner = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboMonitorScanner", true)); + + static { + monitorScanner.submit(new MonitorScanTask()); + } + public static Collection getMonitors() { return Collections.unmodifiableCollection(MONITORS.values()); } @@ -50,14 +68,26 @@ public Monitor getMonitor(URL url) { LOCK.lock(); try { Monitor monitor = MONITORS.get(key); - if (monitor != null) { + if (monitor != null || MONITOR_CREATORS.get(key) != null) { return monitor; } - monitor = createMonitor(url); - if (monitor == null) { - throw new IllegalStateException("Can not create monitor " + url); + + final URL monitorUrl = url; + FutureTask task = new FutureTask(new MonitorCreator(monitorUrl)); + Thread thread = new Thread(task); + thread.setName("DubboMointorCreator-thread-1"); + thread.setDaemon(true); + thread.start(); + try { + System.out.println("main: " + System.currentTimeMillis()); + monitor = task.get(10, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + MONITOR_CREATORS.put(key, task); } - MONITORS.put(key, monitor); + if (monitor != null) { + MONITORS.put(key, monitor); + } + return monitor; } finally { // 释放锁 @@ -67,4 +97,47 @@ public Monitor getMonitor(URL url) { protected abstract Monitor createMonitor(URL url); + static class MonitorScanTask implements Runnable { + @Override + public void run() { + while (true) { + for (Map.Entry> entry : MONITOR_CREATORS.entrySet()) { + System.out.println(MONITOR_CREATORS.size()); + String key = entry.getKey(); + Future future = MONITOR_CREATORS.get(key); + if (future != null) { + try { + Monitor monitor = future.get(10, TimeUnit.MILLISECONDS); + System.out.println(monitor); + MONITORS.put(key, monitor); + MONITOR_CREATORS.remove(key); + } catch (Throwable t) { + logger.info(t); + } + } + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + } + } + } + + class MonitorCreator implements Callable { + + private URL url; + + public MonitorCreator(URL url) { + this.url = url; + } + + @Override + public Monitor call() throws Exception { + Monitor monitor = AbstractMonitorFactory.this.createMonitor(url); + System.out.println("thread: " + System.currentTimeMillis()); + return monitor; + } + } + } \ No newline at end of file diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/MonitorFilter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/MonitorFilter.java index 7d0061c657d..702346fc15c 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/MonitorFilter.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/MonitorFilter.java @@ -87,6 +87,9 @@ private void collect(Invoker invoker, Invocation invocation, Result result, S String method = RpcUtils.getMethodName(invocation); // 获取方法名 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); Monitor monitor = monitorFactory.getMonitor(url); + if (monitor == null) { + return; + } int localPort; String remoteKey; String remoteValue; From 2383ec28a0121f3ecff592cd71410baa86826c49 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Wed, 25 Oct 2017 19:54:36 +0800 Subject: [PATCH 2/7] Delete extra scan task for saving resource --- .../support/AbstractMonitorFactory.java | 51 +++++-------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index 4e34cbe82f5..156d6756e49 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -19,7 +19,6 @@ import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; import com.alibaba.dubbo.monitor.Monitor; import com.alibaba.dubbo.monitor.MonitorFactory; import com.alibaba.dubbo.monitor.MonitorService; @@ -29,8 +28,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; @@ -52,12 +49,6 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { private static final Map> MONITOR_CREATORS = new ConcurrentHashMap>(); - private static final ExecutorService monitorScanner = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboMonitorScanner", true)); - - static { - monitorScanner.submit(new MonitorScanTask()); - } - public static Collection getMonitors() { return Collections.unmodifiableCollection(MONITORS.values()); } @@ -68,10 +59,21 @@ public Monitor getMonitor(URL url) { LOCK.lock(); try { Monitor monitor = MONITORS.get(key); - if (monitor != null || MONITOR_CREATORS.get(key) != null) { + if (monitor != null) { return monitor; } + Future future = MONITOR_CREATORS.get(key); + if (future != null) { + try { + monitor = future.get(100, TimeUnit.MICROSECONDS); + MONITORS.put(key, monitor); + MONITOR_CREATORS.remove(key); + return monitor; + } catch (Throwable t) { + } + } + final URL monitorUrl = url; FutureTask task = new FutureTask(new MonitorCreator(monitorUrl)); Thread thread = new Thread(task); @@ -79,7 +81,6 @@ public Monitor getMonitor(URL url) { thread.setDaemon(true); thread.start(); try { - System.out.println("main: " + System.currentTimeMillis()); monitor = task.get(10, TimeUnit.MILLISECONDS); } catch (Throwable t) { MONITOR_CREATORS.put(key, task); @@ -97,33 +98,6 @@ public Monitor getMonitor(URL url) { protected abstract Monitor createMonitor(URL url); - static class MonitorScanTask implements Runnable { - @Override - public void run() { - while (true) { - for (Map.Entry> entry : MONITOR_CREATORS.entrySet()) { - System.out.println(MONITOR_CREATORS.size()); - String key = entry.getKey(); - Future future = MONITOR_CREATORS.get(key); - if (future != null) { - try { - Monitor monitor = future.get(10, TimeUnit.MILLISECONDS); - System.out.println(monitor); - MONITORS.put(key, monitor); - MONITOR_CREATORS.remove(key); - } catch (Throwable t) { - logger.info(t); - } - } - } - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } - } - } - } - class MonitorCreator implements Callable { private URL url; @@ -135,7 +109,6 @@ public MonitorCreator(URL url) { @Override public Monitor call() throws Exception { Monitor monitor = AbstractMonitorFactory.this.createMonitor(url); - System.out.println("thread: " + System.currentTimeMillis()); return monitor; } } From 714a7a2973bbeed5b764880efd33e4ad18c636d7 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 26 Oct 2017 12:09:40 +0800 Subject: [PATCH 3/7] Add resource protection for async monitor --- .../support/AbstractMonitorFactory.java | 41 ++++++++++++------- .../dubbo/monitor/dubbo/DubboMonitorTest.java | 34 +++++++++------ 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index 156d6756e49..6da8788b2b1 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -49,10 +49,18 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { private static final Map> MONITOR_CREATORS = new ConcurrentHashMap>(); + private int count = 0; + public static Collection getMonitors() { return Collections.unmodifiableCollection(MONITORS.values()); } + /** + * TODO 待ListenableFuture模式优化 + * + * @param url + * @return + */ public Monitor getMonitor(URL url) { url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName()); String key = url.toServiceStringWithoutResolving(); @@ -69,24 +77,29 @@ public Monitor getMonitor(URL url) { monitor = future.get(100, TimeUnit.MICROSECONDS); MONITORS.put(key, monitor); MONITOR_CREATORS.remove(key); - return monitor; } catch (Throwable t) { } + return monitor; } - final URL monitorUrl = url; - FutureTask task = new FutureTask(new MonitorCreator(monitorUrl)); - Thread thread = new Thread(task); - thread.setName("DubboMointorCreator-thread-1"); - thread.setDaemon(true); - thread.start(); - try { - monitor = task.get(10, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - MONITOR_CREATORS.put(key, task); - } - if (monitor != null) { - MONITORS.put(key, monitor); + // 数量:key=注册中心地址,数量一般很少 + if (count < 10) { + final URL monitorUrl = url; + FutureTask task = new FutureTask(new MonitorCreator(monitorUrl)); + Thread thread = new Thread(task); + thread.setName("DubboMointorCreator-thread-" + ++count); + thread.setDaemon(true); + thread.start(); + try { + monitor = task.get(10, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + MONITOR_CREATORS.put(key, task); + } + if (monitor != null) { + MONITORS.put(key, monitor); + } + } else { + monitor = this.createMonitor(url); } return monitor; diff --git a/dubbo-monitor/dubbo-monitor-default/src/test/java/com/alibaba/dubbo/monitor/dubbo/DubboMonitorTest.java b/dubbo-monitor/dubbo-monitor-default/src/test/java/com/alibaba/dubbo/monitor/dubbo/DubboMonitorTest.java index b7003870d98..c43502f370d 100644 --- a/dubbo-monitor/dubbo-monitor-default/src/test/java/com/alibaba/dubbo/monitor/dubbo/DubboMonitorTest.java +++ b/dubbo-monitor/dubbo-monitor-default/src/test/java/com/alibaba/dubbo/monitor/dubbo/DubboMonitorTest.java @@ -129,20 +129,30 @@ public void testMonitorFactory() throws Exception { Exporter exporter = protocol.export(proxyFactory.getInvoker(monitorService, MonitorService.class, URL.valueOf("dubbo://127.0.0.1:17979/" + MonitorService.class.getName()))); try { - Monitor monitor = monitorFactory.getMonitor(URL.valueOf("dubbo://127.0.0.1:17979?interval=10")); - try { - monitor.collect(statistics); - int i = 0; - while (monitorService.getStatistics() == null && i < 200) { - i++; - Thread.sleep(10); + Monitor monitor = null; + long start = System.currentTimeMillis(); + // 如果60s都拿不到 + while (System.currentTimeMillis() - start < 60000) { + monitor = monitorFactory.getMonitor(URL.valueOf("dubbo://127.0.0.1:17979?interval=10")); + if (monitor == null) { + continue; } - URL result = monitorService.getStatistics(); - Assert.assertEquals(1, result.getParameter(MonitorService.SUCCESS, 0)); - Assert.assertEquals(3, result.getParameter(MonitorService.ELAPSED, 0)); - } finally { - monitor.destroy(); + try { + monitor.collect(statistics); + int i = 0; + while (monitorService.getStatistics() == null && i < 200) { + i++; + Thread.sleep(10); + } + URL result = monitorService.getStatistics(); + Assert.assertEquals(1, result.getParameter(MonitorService.SUCCESS, 0)); + Assert.assertEquals(3, result.getParameter(MonitorService.ELAPSED, 0)); + } finally { + monitor.destroy(); + } + break; } + Assert.assertNotNull(monitor); } finally { exporter.unexport(); } From 9b36dc629dd38eacac03318bb27031617ffcc99f Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 26 Oct 2017 12:33:49 +0800 Subject: [PATCH 4/7] Check isDone before get --- .../monitor/support/AbstractMonitorFactory.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index 6da8788b2b1..8245b06ce8b 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -73,11 +73,13 @@ public Monitor getMonitor(URL url) { Future future = MONITOR_CREATORS.get(key); if (future != null) { - try { - monitor = future.get(100, TimeUnit.MICROSECONDS); - MONITORS.put(key, monitor); - MONITOR_CREATORS.remove(key); - } catch (Throwable t) { + if (future.isDone()) { + try { + monitor = future.get(); + MONITORS.put(key, monitor); + MONITOR_CREATORS.remove(key); + } catch (Throwable t) { + } } return monitor; } From ae69842d362253bbd69cd7dffd09586144604a14 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 26 Oct 2017 14:20:53 +0800 Subject: [PATCH 5/7] Rewrite monitor create process with ListenableFuture --- .../common/concurrent/ExecutionList.java | 159 ++++++++++++++++++ .../common/concurrent/ListenableFuture.java | 119 +++++++++++++ .../concurrent/ListenableFutureTask.java | 75 +++++++++ .../support/AbstractMonitorFactory.java | 85 +++++----- .../support/AbstractMonitorFactoryTest.java | 26 ++- 5 files changed, 417 insertions(+), 47 deletions(-) create mode 100644 dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java create mode 100644 dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java create mode 100644 dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java new file mode 100644 index 00000000000..2e4b039287a --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java @@ -0,0 +1,159 @@ +package com.alibaba.dubbo.common.concurrent; + +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; + +import java.util.concurrent.Executor; + +/** + *

A list of listeners, each with an associated {@code Executor}, that + * guarantees that every {@code Runnable} that is {@linkplain #add added} will + * be executed after {@link #execute()} is called. Any {@code Runnable} added + * after the call to {@code execute} is still guaranteed to execute. There is no + * guarantee, however, that listeners will be executed in the order that they + * are added. + *

+ *

Exceptions thrown by a listener will be propagated up to the executor. + * Any exception thrown during {@code Executor.execute} (e.g., a {@code + * RejectedExecutionException} or an exception thrown by {@linkplain + * MoreExecutors#sameThreadExecutor inline execution}) will be caught and + * logged. + * + * @author Nishant Thakkar + * @author Sven Mawson + * @since 1.0 + */ +public final class ExecutionList { + // Logger to log exceptions caught when running runnables. + static final Logger logger = LoggerFactory.getLogger(ExecutionList.class.getName()); + + /** + * The runnable, executor pairs to execute. This acts as a stack threaded through the + * {@link RunnableExecutorPair#next} field. + */ + private RunnableExecutorPair runnables; + + private boolean executed; + + /** + * Creates a new, empty {@link ExecutionList}. + */ + public ExecutionList() { + } + + /** + * Adds the {@code Runnable} and accompanying {@code Executor} to the list of + * listeners to execute. If execution has already begun, the listener is + * executed immediately. + *

+ *

Note: For fast, lightweight listeners that would be safe to execute in + * any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier + * listeners, {@code sameThreadExecutor()} carries some caveats: First, the + * thread that the listener runs in depends on whether the {@code + * ExecutionList} has been executed at the time it is added. In particular, + * listeners may run in the thread that calls {@code add}. Second, the thread + * that calls {@link #execute} may be an internal implementation thread, such + * as an RPC network thread, and {@code sameThreadExecutor()} listeners may + * run in this thread. Finally, during the execution of a {@code + * sameThreadExecutor} listener, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + */ + public void add(Runnable runnable, Executor executor) { + // Fail fast on a null. We throw NPE here because the contract of + // Executor states that it throws NPE on null listener, so we propagate + // that contract up into the add method as well. + if (runnable == null || executor == null) { + throw new NullPointerException("Both Runnable and Executor can not be null!"); + } + + // Lock while we check state. We must maintain the lock while adding the + // new pair so that another thread can't run the list out from under us. + // We only add to the list if we have not yet started execution. + synchronized (this) { + if (!executed) { + runnables = new RunnableExecutorPair(runnable, executor, runnables); + return; + } + } + // Execute the runnable immediately. Because of scheduling this may end up + // getting called before some of the previously added runnables, but we're + // OK with that. If we want to change the contract to guarantee ordering + // among runnables we'd have to modify the logic here to allow it. + executeListener(runnable, executor); + } + + /** + * Runs this execution list, executing all existing pairs in the order they + * were added. However, note that listeners added after this point may be + * executed before those previously added, and note that the execution order + * of all listeners is ultimately chosen by the implementations of the + * supplied executors. + *

+ *

This method is idempotent. Calling it several times in parallel is + * semantically equivalent to calling it exactly once. + * + * @since 10.0 (present in 1.0 as {@code run}) + */ + public void execute() { + // Lock while we update our state so the add method above will finish adding + // any listeners before we start to run them. + RunnableExecutorPair list; + synchronized (this) { + if (executed) { + return; + } + executed = true; + list = runnables; + runnables = null; // allow GC to free listeners even if this stays around for a while. + } + // If we succeeded then list holds all the runnables we to execute. The pairs in the stack are + // in the opposite order from how they were added so we need to reverse the list to fulfill our + // contract. + // This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we + // could drop the contract on the method that enforces this queue like behavior since depending + // on it is likely to be a bug anyway. + + // N.B. All writes to the list and the next pointers must have happened before the above + // synchronized block, so we can iterate the list without the lock held here. + RunnableExecutorPair reversedList = null; + while (list != null) { + RunnableExecutorPair tmp = list; + list = list.next; + tmp.next = reversedList; + reversedList = tmp; + } + while (reversedList != null) { + executeListener(reversedList.runnable, reversedList.executor); + reversedList = reversedList.next; + } + } + + /** + * Submits the given runnable to the given {@link Executor} catching and logging all + * {@linkplain RuntimeException runtime exceptions} thrown by the executor. + */ + private static void executeListener(Runnable runnable, Executor executor) { + try { + executor.execute(runnable); + } catch (RuntimeException e) { + // Log it and keep going, bad runnable and/or executor. Don't + // punish the other runnables if we're given a bad one. We only + // catch RuntimeException because we want Errors to propagate up. + logger.error("RuntimeException while executing runnable " + + runnable + " with executor " + executor, e); + } + } + + private static final class RunnableExecutorPair { + final Runnable runnable; + final Executor executor; + RunnableExecutorPair next; + + RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { + this.runnable = runnable; + this.executor = executor; + this.next = next; + } + } +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java new file mode 100644 index 00000000000..7b1ff0e9d77 --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java @@ -0,0 +1,119 @@ +package com.alibaba.dubbo.common.concurrent; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; + +/** + * A {@link Future} that accepts completion listeners. Each listener has an + * associated executor, and it is invoked using this executor once the future's + * computation is {@linkplain Future#isDone() complete}. If the computation has + * already completed when the listener is added, the listener will execute + * immediately. + *

+ *

See the Guava User Guide article on + * {@code ListenableFuture}. + *

+ *

Purpose

+ *

+ *

Most commonly, {@code ListenableFuture} is used as an input to another + * derived {@code Future}, as in {@link Futures#allAsList(Iterable) + * Futures.allAsList}. Many such methods are impossible to implement efficiently + * without listener support. + *

+ *

It is possible to call {@link #addListener addListener} directly, but this + * is uncommon because the {@code Runnable} interface does not provide direct + * access to the {@code Future} result. (Users who want such access may prefer + * {@link Futures#addCallback Futures.addCallback}.) Still, direct {@code + * addListener} calls are occasionally useful:

   {@code
+ *   final String name = ...;
+ *   inFlight.add(name);
+ *   ListenableFuture future = service.query(name);
+ *   future.addListener(new Runnable() {
+ *     public void run() {
+ *       processedCount.incrementAndGet();
+ *       inFlight.remove(name);
+ *       lastProcessed.set(name);
+ *       logger.info("Done with {0}", name);
+ *     }
+ *   }, executor);}
+ *

+ *

How to get an instance

+ *

+ *

Developers are encouraged to return {@code ListenableFuture} from their + * methods so that users can take advantages of the utilities built atop the + * class. The way that they will create {@code ListenableFuture} instances + * depends on how they currently create {@code Future} instances: + *

    + *
  • If they are returned from an {@code ExecutorService}, convert that + * service to a {@link ListeningExecutorService}, usually by calling {@link + * MoreExecutors#listeningDecorator(ExecutorService) + * MoreExecutors.listeningDecorator}. (Custom executors may find it more + * convenient to use {@link ListenableFutureTask} directly.) + *
  • If they are manually filled in by a call to {@link FutureTask#set} or a + * similar method, create a {@link SettableFuture} instead. (Users with more + * complex needs may prefer {@link AbstractFuture}.) + *
+ *

+ *

Occasionally, an API will return a plain {@code Future} and it will be + * impossible to change the return type. For this case, we provide a more + * expensive workaround in {@code JdkFutureAdapters}. However, when possible, it + * is more efficient and reliable to create a {@code ListenableFuture} directly. + * + * @author Sven Mawson + * @author Nishant Thakkar + * @since 1.0 + */ +public interface ListenableFuture extends Future { + /** + * Registers a listener to be {@linkplain Executor#execute(Runnable) run} on + * the given executor. The listener will run when the {@code Future}'s + * computation is {@linkplain Future#isDone() complete} or, if the computation + * is already complete, immediately. + *

+ *

There is no guaranteed ordering of execution of listeners, but any + * listener added through this method is guaranteed to be called once the + * computation is complete. + *

+ *

Exceptions thrown by a listener will be propagated up to the executor. + * Any exception thrown during {@code Executor.execute} (e.g., a {@code + * RejectedExecutionException} or an exception thrown by {@linkplain + * MoreExecutors#sameThreadExecutor inline execution}) will be caught and + * logged. + *

+ *

Note: For fast, lightweight listeners that would be safe to execute in + * any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier + * listeners, {@code sameThreadExecutor()} carries some caveats. For + * example, the listener may run on an unpredictable or undesirable thread: + *

+ *

    + *
  • If this {@code Future} is done at the time {@code addListener} is + * called, {@code addListener} will execute the listener inline. + *
  • If this {@code Future} is not yet done, {@code addListener} will + * schedule the listener to be run by the thread that completes this {@code + * Future}, which may be an internal system thread such as an RPC network + * thread. + *
+ *

+ *

Also note that, regardless of which thread executes the + * {@code sameThreadExecutor()} listener, all other registered but unexecuted + * listeners are prevented from running during its execution, even if those + * listeners are to run in other executors. + *

+ *

This is the most general listener interface. For common operations + * performed using listeners, see {@link + * com.google.common.util.concurrent.Futures}. For a simplified but general + * listener interface, see {@link + * com.google.common.util.concurrent.Futures#addCallback addCallback()}. + * + * @param listener the listener to run when the computation is complete + * @param executor the executor to run the listener in + * @throws NullPointerException if the executor or listener was null + * @throws RejectedExecutionException if we tried to execute the listener + * immediately but the executor rejected it. + */ + void addListener(Runnable listener, Executor executor); +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java new file mode 100644 index 00000000000..b77d173b6b7 --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java @@ -0,0 +1,75 @@ +package com.alibaba.dubbo.common.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +/** + * A {@link FutureTask} that also implements the {@link ListenableFuture} + * interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not + * provide an overrideable {@link FutureTask#done() done()} method. For similar + * functionality, call {@link #addListener}. + *

+ *

+ * + * @author Sven Mawson + * @since 1.0 + */ +public class ListenableFutureTask extends FutureTask + implements ListenableFuture { + // TODO(cpovirk): explore ways of making ListenableFutureTask final. There are + // some valid reasons such as BoundedQueueExecutorService to allow extends but it + // would be nice to make it final to avoid unintended usage. + + // The execution list to hold our listeners. + private final ExecutionList executionList = new ExecutionList(); + + /** + * Creates a {@code ListenableFutureTask} that will upon running, execute the + * given {@code Callable}. + * + * @param callable the callable task + * @since 10.0 + */ + public static ListenableFutureTask create(Callable callable) { + return new ListenableFutureTask(callable); + } + + /** + * Creates a {@code ListenableFutureTask} that will upon running, execute the + * given {@code Runnable}, and arrange that {@code get} will return the + * given result on successful completion. + * + * @param runnable the runnable task + * @param result the result to return on successful completion. If you don't + * need a particular result, consider using constructions of the form: + * {@code ListenableFuture f = ListenableFutureTask.create(runnable, + * null)} + * @since 10.0 + */ + public static ListenableFutureTask create( + Runnable runnable, V result) { + return new ListenableFutureTask(runnable, result); + } + + ListenableFutureTask(Callable callable) { + super(callable); + } + + ListenableFutureTask(Runnable runnable, V result) { + super(runnable, result); + } + + @Override + public void addListener(Runnable listener, Executor exec) { + executionList.add(listener, exec); + } + + /** + * Internal implementation detail used to invoke the listeners. + */ + @Override + protected void done() { + executionList.execute(); + } +} \ No newline at end of file diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index 8245b06ce8b..d23a7bb3895 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -17,8 +17,11 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.concurrent.ListenableFuture; +import com.alibaba.dubbo.common.concurrent.ListenableFutureTask; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.NamedThreadFactory; import com.alibaba.dubbo.monitor.Monitor; import com.alibaba.dubbo.monitor.MonitorFactory; import com.alibaba.dubbo.monitor.MonitorService; @@ -28,9 +31,10 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** @@ -47,64 +51,36 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { // 注册中心集合 Map private static final Map MONITORS = new ConcurrentHashMap(); - private static final Map> MONITOR_CREATORS = new ConcurrentHashMap>(); + private static final Map> FUTURES = new ConcurrentHashMap>(); + + private static final ExecutorService creatorExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("DubboMonitorCreator", true)); + private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboMonitorCallback", true)); private int count = 0; + public static Collection getMonitors() { return Collections.unmodifiableCollection(MONITORS.values()); } - /** - * TODO 待ListenableFuture模式优化 - * - * @param url - * @return - */ public Monitor getMonitor(URL url) { url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName()); String key = url.toServiceStringWithoutResolving(); LOCK.lock(); try { Monitor monitor = MONITORS.get(key); - if (monitor != null) { - return monitor; - } - - Future future = MONITOR_CREATORS.get(key); - if (future != null) { - if (future.isDone()) { - try { - monitor = future.get(); - MONITORS.put(key, monitor); - MONITOR_CREATORS.remove(key); - } catch (Throwable t) { - } - } + Future future = FUTURES.get(key); + if (monitor != null || future != null) { return monitor; } - // 数量:key=注册中心地址,数量一般很少 - if (count < 10) { - final URL monitorUrl = url; - FutureTask task = new FutureTask(new MonitorCreator(monitorUrl)); - Thread thread = new Thread(task); - thread.setName("DubboMointorCreator-thread-" + ++count); - thread.setDaemon(true); - thread.start(); - try { - monitor = task.get(10, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - MONITOR_CREATORS.put(key, task); - } - if (monitor != null) { - MONITORS.put(key, monitor); - } - } else { - monitor = this.createMonitor(url); - } + final URL monitorUrl = url; + final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl)); + listenableFutureTask.addListener(new MonitorListener(key), callbackExecutor); + creatorExecutor.execute(listenableFutureTask); + FUTURES.put(key, listenableFutureTask); - return monitor; + return null; } finally { // 释放锁 LOCK.unlock(); @@ -128,4 +104,27 @@ public Monitor call() throws Exception { } } + class MonitorListener implements Runnable { + + private String key; + + public MonitorListener(String key) { + this.key = key; + } + + @Override + public void run() { + try { + ListenableFuture listenableFuture = AbstractMonitorFactory.FUTURES.get(key); + AbstractMonitorFactory.MONITORS.put(key, listenableFuture.get()); + AbstractMonitorFactory.FUTURES.remove(key); + } catch (InterruptedException e) { + logger.warn("Thread was interrupted unexpectedly, monitor will never be got."); + AbstractMonitorFactory.FUTURES.remove(key); + } catch (ExecutionException e) { + logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e); + } + } + } + } \ No newline at end of file diff --git a/dubbo-monitor/dubbo-monitor-api/src/test/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactoryTest.java b/dubbo-monitor/dubbo-monitor-api/src/test/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactoryTest.java index ce3f2e8e89d..481baefec38 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/test/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactoryTest.java +++ b/dubbo-monitor/dubbo-monitor-api/src/test/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactoryTest.java @@ -64,20 +64,38 @@ public void testMonitorFactoryCache() throws Exception { URL url = URL.valueOf("dubbo://" + NetUtils.getLocalAddress().getHostAddress() + ":2233"); Monitor monitor1 = monitorFactory.getMonitor(url); Monitor monitor2 = monitorFactory.getMonitor(url); + if (monitor1 == null || monitor2 == null) { + Thread.sleep(2000); + monitor1 = monitorFactory.getMonitor(url); + monitor2 = monitorFactory.getMonitor(url); + } Assert.assertEquals(monitor1, monitor2); } @Test public void testMonitorFactoryIpCache() throws Exception { - Monitor monitor1 = monitorFactory.getMonitor(URL.valueOf("dubbo://" + NetUtils.getLocalAddress().getHostName() + ":2233")); - Monitor monitor2 = monitorFactory.getMonitor(URL.valueOf("dubbo://" + NetUtils.getLocalAddress().getHostName() + ":2233")); + URL url = URL.valueOf("dubbo://" + NetUtils.getLocalAddress().getHostName() + ":2233"); + Monitor monitor1 = monitorFactory.getMonitor(url); + Monitor monitor2 = monitorFactory.getMonitor(url); + if (monitor1 == null || monitor2 == null) { + Thread.sleep(2000); + monitor1 = monitorFactory.getMonitor(url); + monitor2 = monitorFactory.getMonitor(url); + } Assert.assertEquals(monitor1, monitor2); } @Test public void testMonitorFactoryGroupCache() throws Exception { - Monitor monitor1 = monitorFactory.getMonitor(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2233?group=aaa")); - Monitor monitor2 = monitorFactory.getMonitor(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2233?group=bbb")); + URL url1 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2233?group=aaa"); + URL url2 = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2233?group=bbb"); + Monitor monitor1 = monitorFactory.getMonitor(url1); + Monitor monitor2 = monitorFactory.getMonitor(url2); + if (monitor1 == null || monitor2 == null) { + Thread.sleep(2000); + monitor1 = monitorFactory.getMonitor(url1); + monitor2 = monitorFactory.getMonitor(url2); + } Assert.assertNotSame(monitor1, monitor2); } From 075ce83ed39b7bd0adc2327c22fe7915d61bdc15 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Sat, 28 Oct 2017 17:47:24 +0800 Subject: [PATCH 6/7] Reduce the times entering synchronized area --- .../support/AbstractMonitorFactory.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index d23a7bb3895..e2ec10a7934 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -33,8 +33,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** @@ -53,11 +55,7 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { private static final Map> FUTURES = new ConcurrentHashMap>(); - private static final ExecutorService creatorExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("DubboMonitorCreator", true)); - private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboMonitorCallback", true)); - - private int count = 0; - + private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new NamedThreadFactory("DubboMonitorCreator", true)); public static Collection getMonitors() { return Collections.unmodifiableCollection(MONITORS.values()); @@ -66,18 +64,24 @@ public static Collection getMonitors() { public Monitor getMonitor(URL url) { url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName()); String key = url.toServiceStringWithoutResolving(); + Monitor monitor = MONITORS.get(key); + Future future = FUTURES.get(key); + if (monitor != null || future != null) { + return monitor; + } + LOCK.lock(); try { - Monitor monitor = MONITORS.get(key); - Future future = FUTURES.get(key); + monitor = MONITORS.get(key); + future = FUTURES.get(key); if (monitor != null || future != null) { return monitor; } final URL monitorUrl = url; final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl)); - listenableFutureTask.addListener(new MonitorListener(key), callbackExecutor); - creatorExecutor.execute(listenableFutureTask); + listenableFutureTask.addListener(new MonitorListener(key), executor); + executor.execute(listenableFutureTask); FUTURES.put(key, listenableFutureTask); return null; @@ -120,7 +124,7 @@ public void run() { AbstractMonitorFactory.FUTURES.remove(key); } catch (InterruptedException e) { logger.warn("Thread was interrupted unexpectedly, monitor will never be got."); - AbstractMonitorFactory.FUTURES.remove(key); +// AbstractMonitorFactory.FUTURES.remove(key); } catch (ExecutionException e) { logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e); } From a5aa6fafa8036d0a522728aeadd7d7f77bfc9eda Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Sat, 28 Oct 2017 05:05:37 -0500 Subject: [PATCH 7/7] Update AbstractMonitorFactory.java --- .../alibaba/dubbo/monitor/support/AbstractMonitorFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index e2ec10a7934..d4bda27fe54 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -124,11 +124,11 @@ public void run() { AbstractMonitorFactory.FUTURES.remove(key); } catch (InterruptedException e) { logger.warn("Thread was interrupted unexpectedly, monitor will never be got."); -// AbstractMonitorFactory.FUTURES.remove(key); + AbstractMonitorFactory.FUTURES.remove(key); } catch (ExecutionException e) { logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e); } } } -} \ No newline at end of file +}