Skip to content

Commit

Permalink
Merge pull request #2125, binding attachments into invocation so that…
Browse files Browse the repository at this point in the history
… a rpc invoke have a same invocation.

Fixes #1978
  • Loading branch information
carryxyh authored and chickenlj committed Jul 31, 2018
1 parent e90d95c commit 971d80d
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
Expand All @@ -33,11 +35,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* AbstractClusterInvoker
*
*/
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

Expand Down Expand Up @@ -92,10 +94,10 @@ public void destroy() {

/**
* Select a invoker using loadbalance policy.</br>
* a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
* b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
* the selected invoker has the minimum chance to be one in the previously selected list, and also
* the selected invoker has the minimum chance to be one in the previously selected list, and also
* guarantees this invoker is available.
*
* @param loadbalance load balance policy
Expand Down Expand Up @@ -225,6 +227,13 @@ private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
*
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand All @@ -57,50 +56,55 @@ public ForkingClusterInvoker(Directory<T> directory) {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
Expand Down Expand Up @@ -132,6 +133,32 @@ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance load

}


@Test
public void testBindingAttachment() {
final String attachKey = "attach";
final String attachValue = "value";

// setup attachment
RpcContext.getContext().setAttachment(attachKey, attachValue);
Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);

cluster = new AbstractClusterInvoker(dic) {
@Override
protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
throws RpcException {
// attachment will be bind to invocation
String value = invocation.getAttachment(attachKey);
Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
return null;
}
};

// invoke
cluster.invoke(invocation);
}

@Test
public void testSelect_Invokersize0() throws Exception {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testNoInvoke() {
* then we should reselect from the latest invokers before retry.
*/
@Test
public void testInvokerDestoryAndReList() {
public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.Assert;
Expand All @@ -30,14 +31,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
* ForkingClusterInvokerTest
*
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
Expand Down Expand Up @@ -71,6 +72,7 @@ public void setUp() throws Exception {
invokers.add(invoker3);

}

private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
Expand Down Expand Up @@ -106,7 +108,7 @@ private void resetInvokerToNoException() {
}

@Test
public void testInvokeExceptoin() {
public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Expand All @@ -120,8 +122,32 @@ public void testInvokeExceptoin() {
}
}

@Test
public void testClearRpcContext() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);

String attachKey = "attach";
String attachValue = "value";

RpcContext.getContext().setAttachment(attachKey, attachValue);

Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
try {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
Assert.assertTrue("Successed to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
Assert.assertTrue("clear attachment failed!", afterInvoke != null && afterInvoke.size() == 0);
}

@Test()
public void testInvokeNoExceptoin() {
public void testInvokeNoException() {

resetInvokerToNoException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Result invoke(Invocation inv) throws RpcException {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null) {
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
Expand Down

0 comments on commit 971d80d

Please sign in to comment.