Skip to content

Commit

Permalink
Merge pull request #777 from chickenlj:bugfix#672_MonitorFactory_block
Browse files Browse the repository at this point in the history
Fixed #672: Monitor blocks when registry is unreachable (#777)
  • Loading branch information
chickenlj authored Oct 28, 2017
1 parent be965bf commit 9581e0f
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.alibaba.dubbo.common.concurrent;

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;

import java.util.concurrent.Executor;

/**
* <p>A list of listeners, each with an associated {@code Executor}, that
* guarantees that every {@code Runnable} that is {@linkplain #add added} will
* be executed after {@link #execute()} is called. Any {@code Runnable} added
* after the call to {@code execute} is still guaranteed to execute. There is no
* guarantee, however, that listeners will be executed in the order that they
* are added.
* <p>
* <p>Exceptions thrown by a listener will be propagated up to the executor.
* Any exception thrown during {@code Executor.execute} (e.g., a {@code
* RejectedExecutionException} or an exception thrown by {@linkplain
* MoreExecutors#sameThreadExecutor inline execution}) will be caught and
* logged.
*
* @author Nishant Thakkar
* @author Sven Mawson
* @since 1.0
*/
public final class ExecutionList {
// Logger to log exceptions caught when running runnables.
static final Logger logger = LoggerFactory.getLogger(ExecutionList.class.getName());

/**
* The runnable, executor pairs to execute. This acts as a stack threaded through the
* {@link RunnableExecutorPair#next} field.
*/
private RunnableExecutorPair runnables;

private boolean executed;

/**
* Creates a new, empty {@link ExecutionList}.
*/
public ExecutionList() {
}

/**
* Adds the {@code Runnable} and accompanying {@code Executor} to the list of
* listeners to execute. If execution has already begun, the listener is
* executed immediately.
* <p>
* <p>Note: For fast, lightweight listeners that would be safe to execute in
* any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
* listeners, {@code sameThreadExecutor()} carries some caveats: First, the
* thread that the listener runs in depends on whether the {@code
* ExecutionList} has been executed at the time it is added. In particular,
* listeners may run in the thread that calls {@code add}. Second, the thread
* that calls {@link #execute} may be an internal implementation thread, such
* as an RPC network thread, and {@code sameThreadExecutor()} listeners may
* run in this thread. Finally, during the execution of a {@code
* sameThreadExecutor} listener, all other registered but unexecuted
* listeners are prevented from running, even if those listeners are to run
* in other executors.
*/
public void add(Runnable runnable, Executor executor) {
// Fail fast on a null. We throw NPE here because the contract of
// Executor states that it throws NPE on null listener, so we propagate
// that contract up into the add method as well.
if (runnable == null || executor == null) {
throw new NullPointerException("Both Runnable and Executor can not be null!");
}

// Lock while we check state. We must maintain the lock while adding the
// new pair so that another thread can't run the list out from under us.
// We only add to the list if we have not yet started execution.
synchronized (this) {
if (!executed) {
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
// Execute the runnable immediately. Because of scheduling this may end up
// getting called before some of the previously added runnables, but we're
// OK with that. If we want to change the contract to guarantee ordering
// among runnables we'd have to modify the logic here to allow it.
executeListener(runnable, executor);
}

/**
* Runs this execution list, executing all existing pairs in the order they
* were added. However, note that listeners added after this point may be
* executed before those previously added, and note that the execution order
* of all listeners is ultimately chosen by the implementations of the
* supplied executors.
* <p>
* <p>This method is idempotent. Calling it several times in parallel is
* semantically equivalent to calling it exactly once.
*
* @since 10.0 (present in 1.0 as {@code run})
*/
public void execute() {
// Lock while we update our state so the add method above will finish adding
// any listeners before we start to run them.
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
executed = true;
list = runnables;
runnables = null; // allow GC to free listeners even if this stays around for a while.
}
// If we succeeded then list holds all the runnables we to execute. The pairs in the stack are
// in the opposite order from how they were added so we need to reverse the list to fulfill our
// contract.
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
// could drop the contract on the method that enforces this queue like behavior since depending
// on it is likely to be a bug anyway.

// N.B. All writes to the list and the next pointers must have happened before the above
// synchronized block, so we can iterate the list without the lock held here.
RunnableExecutorPair reversedList = null;
while (list != null) {
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}

/**
* Submits the given runnable to the given {@link Executor} catching and logging all
* {@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
private static void executeListener(Runnable runnable, Executor executor) {
try {
executor.execute(runnable);
} catch (RuntimeException e) {
// Log it and keep going, bad runnable and/or executor. Don't
// punish the other runnables if we're given a bad one. We only
// catch RuntimeException because we want Errors to propagate up.
logger.error("RuntimeException while executing runnable "
+ runnable + " with executor " + executor, e);
}
}

private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
RunnableExecutorPair next;

RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.alibaba.dubbo.common.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;

/**
* A {@link Future} that accepts completion listeners. Each listener has an
* associated executor, and it is invoked using this executor once the future's
* computation is {@linkplain Future#isDone() complete}. If the computation has
* already completed when the listener is added, the listener will execute
* immediately.
* <p>
* <p>See the Guava User Guide article on <a href=
* "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
* {@code ListenableFuture}</a>.
* <p>
* <h3>Purpose</h3>
* <p>
* <p>Most commonly, {@code ListenableFuture} is used as an input to another
* derived {@code Future}, as in {@link Futures#allAsList(Iterable)
* Futures.allAsList}. Many such methods are impossible to implement efficiently
* without listener support.
* <p>
* <p>It is possible to call {@link #addListener addListener} directly, but this
* is uncommon because the {@code Runnable} interface does not provide direct
* access to the {@code Future} result. (Users who want such access may prefer
* {@link Futures#addCallback Futures.addCallback}.) Still, direct {@code
* addListener} calls are occasionally useful:<pre> {@code
* final String name = ...;
* inFlight.add(name);
* ListenableFuture<Result> future = service.query(name);
* future.addListener(new Runnable() {
* public void run() {
* processedCount.incrementAndGet();
* inFlight.remove(name);
* lastProcessed.set(name);
* logger.info("Done with {0}", name);
* }
* }, executor);}</pre>
* <p>
* <h3>How to get an instance</h3>
* <p>
* <p>Developers are encouraged to return {@code ListenableFuture} from their
* methods so that users can take advantages of the utilities built atop the
* class. The way that they will create {@code ListenableFuture} instances
* depends on how they currently create {@code Future} instances:
* <ul>
* <li>If they are returned from an {@code ExecutorService}, convert that
* service to a {@link ListeningExecutorService}, usually by calling {@link
* MoreExecutors#listeningDecorator(ExecutorService)
* MoreExecutors.listeningDecorator}. (Custom executors may find it more
* convenient to use {@link ListenableFutureTask} directly.)
* <li>If they are manually filled in by a call to {@link FutureTask#set} or a
* similar method, create a {@link SettableFuture} instead. (Users with more
* complex needs may prefer {@link AbstractFuture}.)
* </ul>
* <p>
* <p>Occasionally, an API will return a plain {@code Future} and it will be
* impossible to change the return type. For this case, we provide a more
* expensive workaround in {@code JdkFutureAdapters}. However, when possible, it
* is more efficient and reliable to create a {@code ListenableFuture} directly.
*
* @author Sven Mawson
* @author Nishant Thakkar
* @since 1.0
*/
public interface ListenableFuture<V> extends Future<V> {
/**
* Registers a listener to be {@linkplain Executor#execute(Runnable) run} on
* the given executor. The listener will run when the {@code Future}'s
* computation is {@linkplain Future#isDone() complete} or, if the computation
* is already complete, immediately.
* <p>
* <p>There is no guaranteed ordering of execution of listeners, but any
* listener added through this method is guaranteed to be called once the
* computation is complete.
* <p>
* <p>Exceptions thrown by a listener will be propagated up to the executor.
* Any exception thrown during {@code Executor.execute} (e.g., a {@code
* RejectedExecutionException} or an exception thrown by {@linkplain
* MoreExecutors#sameThreadExecutor inline execution}) will be caught and
* logged.
* <p>
* <p>Note: For fast, lightweight listeners that would be safe to execute in
* any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
* listeners, {@code sameThreadExecutor()} carries some caveats. For
* example, the listener may run on an unpredictable or undesirable thread:
* <p>
* <ul>
* <li>If this {@code Future} is done at the time {@code addListener} is
* called, {@code addListener} will execute the listener inline.
* <li>If this {@code Future} is not yet done, {@code addListener} will
* schedule the listener to be run by the thread that completes this {@code
* Future}, which may be an internal system thread such as an RPC network
* thread.
* </ul>
* <p>
* <p>Also note that, regardless of which thread executes the
* {@code sameThreadExecutor()} listener, all other registered but unexecuted
* listeners are prevented from running during its execution, even if those
* listeners are to run in other executors.
* <p>
* <p>This is the most general listener interface. For common operations
* performed using listeners, see {@link
* com.google.common.util.concurrent.Futures}. For a simplified but general
* listener interface, see {@link
* com.google.common.util.concurrent.Futures#addCallback addCallback()}.
*
* @param listener the listener to run when the computation is complete
* @param executor the executor to run the listener in
* @throws NullPointerException if the executor or listener was null
* @throws RejectedExecutionException if we tried to execute the listener
* immediately but the executor rejected it.
*/
void addListener(Runnable listener, Executor executor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.alibaba.dubbo.common.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;

/**
* A {@link FutureTask} that also implements the {@link ListenableFuture}
* interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not
* provide an overrideable {@link FutureTask#done() done()} method. For similar
* functionality, call {@link #addListener}.
* <p>
* <p>
*
* @author Sven Mawson
* @since 1.0
*/
public class ListenableFutureTask<V> extends FutureTask<V>
implements ListenableFuture<V> {
// TODO(cpovirk): explore ways of making ListenableFutureTask final. There are
// some valid reasons such as BoundedQueueExecutorService to allow extends but it
// would be nice to make it final to avoid unintended usage.

// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();

/**
* Creates a {@code ListenableFutureTask} that will upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @since 10.0
*/
public static <V> ListenableFutureTask<V> create(Callable<V> callable) {
return new ListenableFutureTask<V>(callable);
}

/**
* Creates a {@code ListenableFutureTask} that will upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If you don't
* need a particular result, consider using constructions of the form:
* {@code ListenableFuture<?> f = ListenableFutureTask.create(runnable,
* null)}
* @since 10.0
*/
public static <V> ListenableFutureTask<V> create(
Runnable runnable, V result) {
return new ListenableFutureTask<V>(runnable, result);
}

ListenableFutureTask(Callable<V> callable) {
super(callable);
}

ListenableFutureTask(Runnable runnable, V result) {
super(runnable, result);
}

@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

/**
* Internal implementation detail used to invoke the listeners.
*/
@Override
protected void done() {
executionList.execute();
}
}
Loading

0 comments on commit 9581e0f

Please sign in to comment.