Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #672: Monitor blocks when registry is unreachable #777

Merged
merged 7 commits into from
Oct 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.alibaba.dubbo.common.concurrent;

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;

import java.util.concurrent.Executor;

/**
* <p>A list of listeners, each with an associated {@code Executor}, that
* guarantees that every {@code Runnable} that is {@linkplain #add added} will
* be executed after {@link #execute()} is called. Any {@code Runnable} added
* after the call to {@code execute} is still guaranteed to execute. There is no
* guarantee, however, that listeners will be executed in the order that they
* are added.
* <p>
* <p>Exceptions thrown by a listener will be propagated up to the executor.
* Any exception thrown during {@code Executor.execute} (e.g., a {@code
* RejectedExecutionException} or an exception thrown by {@linkplain
* MoreExecutors#sameThreadExecutor inline execution}) will be caught and
* logged.
*
* @author Nishant Thakkar
* @author Sven Mawson
* @since 1.0
*/
public final class ExecutionList {
// Logger to log exceptions caught when running runnables.
static final Logger logger = LoggerFactory.getLogger(ExecutionList.class.getName());

/**
* The runnable, executor pairs to execute. This acts as a stack threaded through the
* {@link RunnableExecutorPair#next} field.
*/
private RunnableExecutorPair runnables;

private boolean executed;

/**
* Creates a new, empty {@link ExecutionList}.
*/
public ExecutionList() {
}

/**
* Adds the {@code Runnable} and accompanying {@code Executor} to the list of
* listeners to execute. If execution has already begun, the listener is
* executed immediately.
* <p>
* <p>Note: For fast, lightweight listeners that would be safe to execute in
* any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
* listeners, {@code sameThreadExecutor()} carries some caveats: First, the
* thread that the listener runs in depends on whether the {@code
* ExecutionList} has been executed at the time it is added. In particular,
* listeners may run in the thread that calls {@code add}. Second, the thread
* that calls {@link #execute} may be an internal implementation thread, such
* as an RPC network thread, and {@code sameThreadExecutor()} listeners may
* run in this thread. Finally, during the execution of a {@code
* sameThreadExecutor} listener, all other registered but unexecuted
* listeners are prevented from running, even if those listeners are to run
* in other executors.
*/
public void add(Runnable runnable, Executor executor) {
// Fail fast on a null. We throw NPE here because the contract of
// Executor states that it throws NPE on null listener, so we propagate
// that contract up into the add method as well.
if (runnable == null || executor == null) {
throw new NullPointerException("Both Runnable and Executor can not be null!");
}

// Lock while we check state. We must maintain the lock while adding the
// new pair so that another thread can't run the list out from under us.
// We only add to the list if we have not yet started execution.
synchronized (this) {
if (!executed) {
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
// Execute the runnable immediately. Because of scheduling this may end up
// getting called before some of the previously added runnables, but we're
// OK with that. If we want to change the contract to guarantee ordering
// among runnables we'd have to modify the logic here to allow it.
executeListener(runnable, executor);
}

/**
* Runs this execution list, executing all existing pairs in the order they
* were added. However, note that listeners added after this point may be
* executed before those previously added, and note that the execution order
* of all listeners is ultimately chosen by the implementations of the
* supplied executors.
* <p>
* <p>This method is idempotent. Calling it several times in parallel is
* semantically equivalent to calling it exactly once.
*
* @since 10.0 (present in 1.0 as {@code run})
*/
public void execute() {
// Lock while we update our state so the add method above will finish adding
// any listeners before we start to run them.
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
executed = true;
list = runnables;
runnables = null; // allow GC to free listeners even if this stays around for a while.
}
// If we succeeded then list holds all the runnables we to execute. The pairs in the stack are
// in the opposite order from how they were added so we need to reverse the list to fulfill our
// contract.
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
// could drop the contract on the method that enforces this queue like behavior since depending
// on it is likely to be a bug anyway.

// N.B. All writes to the list and the next pointers must have happened before the above
// synchronized block, so we can iterate the list without the lock held here.
RunnableExecutorPair reversedList = null;
while (list != null) {
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}

/**
* Submits the given runnable to the given {@link Executor} catching and logging all
* {@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
private static void executeListener(Runnable runnable, Executor executor) {
try {
executor.execute(runnable);
} catch (RuntimeException e) {
// Log it and keep going, bad runnable and/or executor. Don't
// punish the other runnables if we're given a bad one. We only
// catch RuntimeException because we want Errors to propagate up.
logger.error("RuntimeException while executing runnable "
+ runnable + " with executor " + executor, e);
}
}

private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
RunnableExecutorPair next;

RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.alibaba.dubbo.common.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;

/**
* A {@link Future} that accepts completion listeners. Each listener has an
* associated executor, and it is invoked using this executor once the future's
* computation is {@linkplain Future#isDone() complete}. If the computation has
* already completed when the listener is added, the listener will execute
* immediately.
* <p>
* <p>See the Guava User Guide article on <a href=
* "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
* {@code ListenableFuture}</a>.
* <p>
* <h3>Purpose</h3>
* <p>
* <p>Most commonly, {@code ListenableFuture} is used as an input to another
* derived {@code Future}, as in {@link Futures#allAsList(Iterable)
* Futures.allAsList}. Many such methods are impossible to implement efficiently
* without listener support.
* <p>
* <p>It is possible to call {@link #addListener addListener} directly, but this
* is uncommon because the {@code Runnable} interface does not provide direct
* access to the {@code Future} result. (Users who want such access may prefer
* {@link Futures#addCallback Futures.addCallback}.) Still, direct {@code
* addListener} calls are occasionally useful:<pre> {@code
* final String name = ...;
* inFlight.add(name);
* ListenableFuture<Result> future = service.query(name);
* future.addListener(new Runnable() {
* public void run() {
* processedCount.incrementAndGet();
* inFlight.remove(name);
* lastProcessed.set(name);
* logger.info("Done with {0}", name);
* }
* }, executor);}</pre>
* <p>
* <h3>How to get an instance</h3>
* <p>
* <p>Developers are encouraged to return {@code ListenableFuture} from their
* methods so that users can take advantages of the utilities built atop the
* class. The way that they will create {@code ListenableFuture} instances
* depends on how they currently create {@code Future} instances:
* <ul>
* <li>If they are returned from an {@code ExecutorService}, convert that
* service to a {@link ListeningExecutorService}, usually by calling {@link
* MoreExecutors#listeningDecorator(ExecutorService)
* MoreExecutors.listeningDecorator}. (Custom executors may find it more
* convenient to use {@link ListenableFutureTask} directly.)
* <li>If they are manually filled in by a call to {@link FutureTask#set} or a
* similar method, create a {@link SettableFuture} instead. (Users with more
* complex needs may prefer {@link AbstractFuture}.)
* </ul>
* <p>
* <p>Occasionally, an API will return a plain {@code Future} and it will be
* impossible to change the return type. For this case, we provide a more
* expensive workaround in {@code JdkFutureAdapters}. However, when possible, it
* is more efficient and reliable to create a {@code ListenableFuture} directly.
*
* @author Sven Mawson
* @author Nishant Thakkar
* @since 1.0
*/
public interface ListenableFuture<V> extends Future<V> {
/**
* Registers a listener to be {@linkplain Executor#execute(Runnable) run} on
* the given executor. The listener will run when the {@code Future}'s
* computation is {@linkplain Future#isDone() complete} or, if the computation
* is already complete, immediately.
* <p>
* <p>There is no guaranteed ordering of execution of listeners, but any
* listener added through this method is guaranteed to be called once the
* computation is complete.
* <p>
* <p>Exceptions thrown by a listener will be propagated up to the executor.
* Any exception thrown during {@code Executor.execute} (e.g., a {@code
* RejectedExecutionException} or an exception thrown by {@linkplain
* MoreExecutors#sameThreadExecutor inline execution}) will be caught and
* logged.
* <p>
* <p>Note: For fast, lightweight listeners that would be safe to execute in
* any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
* listeners, {@code sameThreadExecutor()} carries some caveats. For
* example, the listener may run on an unpredictable or undesirable thread:
* <p>
* <ul>
* <li>If this {@code Future} is done at the time {@code addListener} is
* called, {@code addListener} will execute the listener inline.
* <li>If this {@code Future} is not yet done, {@code addListener} will
* schedule the listener to be run by the thread that completes this {@code
* Future}, which may be an internal system thread such as an RPC network
* thread.
* </ul>
* <p>
* <p>Also note that, regardless of which thread executes the
* {@code sameThreadExecutor()} listener, all other registered but unexecuted
* listeners are prevented from running during its execution, even if those
* listeners are to run in other executors.
* <p>
* <p>This is the most general listener interface. For common operations
* performed using listeners, see {@link
* com.google.common.util.concurrent.Futures}. For a simplified but general
* listener interface, see {@link
* com.google.common.util.concurrent.Futures#addCallback addCallback()}.
*
* @param listener the listener to run when the computation is complete
* @param executor the executor to run the listener in
* @throws NullPointerException if the executor or listener was null
* @throws RejectedExecutionException if we tried to execute the listener
* immediately but the executor rejected it.
*/
void addListener(Runnable listener, Executor executor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.alibaba.dubbo.common.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;

/**
* A {@link FutureTask} that also implements the {@link ListenableFuture}
* interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not
* provide an overrideable {@link FutureTask#done() done()} method. For similar
* functionality, call {@link #addListener}.
* <p>
* <p>
*
* @author Sven Mawson
* @since 1.0
*/
public class ListenableFutureTask<V> extends FutureTask<V>
implements ListenableFuture<V> {
// TODO(cpovirk): explore ways of making ListenableFutureTask final. There are
// some valid reasons such as BoundedQueueExecutorService to allow extends but it
// would be nice to make it final to avoid unintended usage.

// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();

/**
* Creates a {@code ListenableFutureTask} that will upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @since 10.0
*/
public static <V> ListenableFutureTask<V> create(Callable<V> callable) {
return new ListenableFutureTask<V>(callable);
}

/**
* Creates a {@code ListenableFutureTask} that will upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If you don't
* need a particular result, consider using constructions of the form:
* {@code ListenableFuture<?> f = ListenableFutureTask.create(runnable,
* null)}
* @since 10.0
*/
public static <V> ListenableFutureTask<V> create(
Runnable runnable, V result) {
return new ListenableFutureTask<V>(runnable, result);
}

ListenableFutureTask(Callable<V> callable) {
super(callable);
}

ListenableFutureTask(Runnable runnable, V result) {
super(runnable, result);
}

@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

/**
* Internal implementation detail used to invoke the listeners.
*/
@Override
protected void done() {
executionList.execute();
}
}
Loading