Skip to content

Commit

Permalink
Refresh invocation's attachments in each invoke.
Browse files Browse the repository at this point in the history
Fix bug apache#2981
  • Loading branch information
carryxyh committed Dec 28, 2018
1 parent 5ea4fa5 commit d0514a6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ public void destroy() {
* @param invokers invoker candidates
* @param selected exclude selected invokers or not
* @return the invoker which will final to do invoke.
* @throws RpcException
* @throws RpcException exception
*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
.getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
Expand All @@ -142,7 +142,7 @@ protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
}

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

if (CollectionUtils.isEmpty(invokers)) {
return null;
Expand Down Expand Up @@ -180,19 +180,20 @@ private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
* Reselect, use invokers not in `selected` first, if all invokers are in `selected`,
* just pick an available one using loadbalance policy.
*
* @param loadbalance
* @param invocation
* @param invokers
* @param selected
* @return
* @throws RpcException
* @param loadbalance load balance policy
* @param invocation invocation
* @param invokers invoker candidates
* @param selected exclude selected invokers or not
* @param availablecheck check invoker available if true
* @return the reselect result to do invoke
* @throws RpcException exception
*/
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

//Allocating one in advance, this list is certain to be used.
List<Invoker<T>> reselectInvokers = new ArrayList<>(
invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

// First, try picking a invoker not in `selected`.
for (Invoker<T> invoker : invokers) {
Expand Down Expand Up @@ -233,6 +234,12 @@ public Result invoke(final Invocation invocation) throws RpcException {
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
// We need to remove the interface because the interface here may not come from last invoke.
//
// See https://github.com/apache/incubator-dubbo/issues/2981
if (invocation.getAttachments() != null) {
invocation.getAttachments().remove(Constants.INTERFACE_KEY);
}
}

List<Invoker<T>> invokers = list(invocation);
Expand All @@ -242,7 +249,6 @@ public Result invoke(final Invocation invocation) throws RpcException {
}

protected void checkWhetherDestroyed() {

if (destroyed.get()) {
throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ public Result invoke(Invocation inv) throws RpcException {
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}

Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
Expand All @@ -151,6 +149,14 @@ public Result invoke(Invocation inv) throws RpcException {
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

// If we have attachment, refresh the attachment to ensure that the last invoke's attach does not affect this call.
// In most cases, attachments contains interface, timeout, group, and token.
// These info should be refreshed in each invoke.
//
// See https://github.com/apache/incubator-dubbo/issues/2981
if (attachment != null && attachment.size() > 0) {
invocation.addAttachments(attachment);
}

try {
return doInvoke(invocation);
Expand Down

0 comments on commit d0514a6

Please sign in to comment.