diff --git a/CHANGES.md b/CHANGES.md index 11dcc86166..5627ea6ad0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,6 +18,7 @@ Release Notes. * Archive the expired plugins `impala-jdbc-2.6.x-plugin`. * Fix a bug in Spring Cloud Gateway if HttpClientFinalizer#send does not invoke, the span created at NettyRoutingFilterInterceptor can not stop. * Fix not tracing in HttpClient v5 when HttpHost(arg[0]) is null but `RoutingSupport#determineHost` works. +* Support across thread tracing for SOFA-RPC. #### Documentation * Update docs to describe `expired-plugins`. diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java new file mode 100644 index 0000000000..cf806cb592 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.concurrent.Executor; +import lombok.AccessLevel; +import lombok.Getter; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; + +public class InvokeCallbackWrapper implements InvokeCallback { + + @Getter(AccessLevel.PACKAGE) + private ContextSnapshot contextSnapshot; + @Getter(AccessLevel.PACKAGE) + private final InvokeCallback invokeCallback; + + public InvokeCallbackWrapper(InvokeCallback invokeCallback) { + if (ContextManager.isActive()) { + this.contextSnapshot = ContextManager.capture(); + } + this.invokeCallback = invokeCallback; + } + + @Override + public void onResponse(final Object o) { + ContextManager.createLocalSpan("Thread/" + invokeCallback.getClass().getName() + "/onResponse"); + if (contextSnapshot != null) { + ContextManager.continued(contextSnapshot); + } + try { + invokeCallback.onResponse(o); + } catch (Throwable t) { + ContextManager.activeSpan().log(t); + throw t; + } finally { + contextSnapshot = null; + ContextManager.stopSpan(); + } + + } + + @Override + public void onException(final Throwable throwable) { + ContextManager.createLocalSpan("Thread/" + invokeCallback.getClass().getName() + "/onException"); + if (contextSnapshot != null) { + ContextManager.continued(contextSnapshot); + } + if (throwable != null) { + AbstractSpan abstractSpan = ContextManager.activeSpan(); + if (abstractSpan != null) { + abstractSpan.log(throwable); + } + } + try { + invokeCallback.onException(throwable); + } catch (Throwable t) { + ContextManager.activeSpan().log(t); + throw t; + } finally { + contextSnapshot = null; + ContextManager.stopSpan(); + } + } + + @Override + public Executor getExecutor() { + return invokeCallback.getExecutor(); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java new file mode 100644 index 0000000000..6547042bc6 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class SofaBoltCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alipay.remoting.BaseRemoting"; + private static final String INVOKE_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInvokeInterceptor"; + private static final String INVOKE_METHOD = "invokeWithCallback"; + + @Override + protected ClassMatch enhanceClass() { + return NameMatch.byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return null; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(INVOKE_METHOD).and( + takesArguments(4)); + } + + @Override + public String getMethodsInterceptor() { + return INVOKE_METHOD_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return true; + } + } + }; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java new file mode 100644 index 0000000000..c890b4a90d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +public class SofaBoltCallbackInvokeInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + MethodInterceptResult result) { + if (allArguments[2] instanceof InvokeCallback) { + allArguments[2] = new InvokeCallbackWrapper((InvokeCallback) allArguments[2]); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Object ret) { + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Throwable t) { + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def index 9850487d56..72682ac2bc 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def @@ -16,3 +16,4 @@ sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcConsumerInstrumentation sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcProviderInstrumentation +sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java new file mode 100644 index 0000000000..ef9c9f3cb0 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@RunWith(TracingSegmentRunner.class) +public class InvokeCallbackWrapperTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + private Executor executor = Executors.newFixedThreadPool(1); + + @Rule + public AgentServiceRule agentServiceRule = new AgentServiceRule(); + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + private InvokeCallback callback; + + @Before + public void before() { + callback = new InvokeCallback() { + @Override + public void onResponse(final Object o) { + } + + @Override + public void onException(final Throwable throwable) { + } + + @Override + public Executor getExecutor() { + return null; + } + }; + } + + static class WrapperWrapper implements InvokeCallback { + + private InvokeCallback callback; + + private CountDownLatch countDownLatch; + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public WrapperWrapper(InvokeCallback callback) { + this.countDownLatch = new CountDownLatch(1); + this.callback = callback; + } + + @Override + public void onResponse(final Object o) { + callback.onResponse(o); + countDownLatch.countDown(); + } + + @Override + public void onException(final Throwable throwable) { + callback.onException(throwable); + countDownLatch.countDown(); + } + + @Override + public Executor getExecutor() { + return null; + } + } + + @Test + public void testConstruct() { + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + Assert.assertSame(callback, wrapper.getInvokeCallback()); + Assert.assertNull(wrapper.getContextSnapshot()); + + ContextManager.createEntrySpan("sofarpc", null); + wrapper = new InvokeCallbackWrapper(callback); + Assert.assertSame(callback, wrapper.getInvokeCallback()); + Assert.assertEquals(ContextManager.getGlobalTraceId(), wrapper.getContextSnapshot().getTraceId().getId()); + Assert.assertEquals("sofarpc", wrapper.getContextSnapshot().getParentEndpoint()); + ContextManager.stopSpan(); + } + + @Test + public void testOnResponse() throws InterruptedException { + ContextManager.createEntrySpan("sofarpc", null); + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper); + executor.execute(() -> wrapperWrapper.onResponse(null)); + ContextManager.stopSpan(); + wrapperWrapper.getCountDownLatch().await(); + + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); + List spans2 = SegmentHelper.getSpans(traceSegment2); + assertThat(spans2.size(), is(1)); + assertEquals("sofarpc", traceSegment2.getRef().getParentEndpoint()); + } + + @Test + public void testOnException() throws InterruptedException { + ContextManager.createEntrySpan("sofarpc", null); + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper); + final Throwable throwable = new Throwable(); + executor.execute(() -> wrapperWrapper.onException(throwable)); + ContextManager.stopSpan(); + wrapperWrapper.getCountDownLatch().await(); + + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); + List spans2 = SegmentHelper.getSpans(traceSegment2); + assertThat(spans2.size(), is(1)); + assertThat(SpanHelper.getLogs(spans2.get(0)).size(), is(1)); + + } + +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java new file mode 100644 index 0000000000..bc082d7d2b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.concurrent.Executor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SofaBoltCallbackInvokeInterceptorTest { + InvokeCallback callback; + Object obj; + Object[] matchArgs; + Object[] mismatchArgs; + + @Before + public void before() { + callback = new InvokeCallback() { + @Override + public void onResponse(final Object o) { + + } + + @Override + public void onException(final Throwable throwable) { + + } + + @Override + public Executor getExecutor() { + return null; + } + }; + + obj = new Object(); + + matchArgs = new Object[] { + null, + null, + callback, + null + }; + mismatchArgs = new Object[] { + null, + null, + obj, + null + }; + } + + @Test + public void testOverrideArguments() { + final SofaBoltCallbackInvokeInterceptor interceptor = new SofaBoltCallbackInvokeInterceptor(); + interceptor.beforeMethod(null, null, matchArgs, null, null); + Assert.assertTrue(matchArgs[2] instanceof InvokeCallbackWrapper); + Assert.assertSame(callback, ((InvokeCallbackWrapper) matchArgs[2]).getInvokeCallback()); + + interceptor.beforeMethod(null, null, mismatchArgs, null, null); + Assert.assertSame(obj, mismatchArgs[2]); + } + +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java index 3dc0903ec0..c0c3420966 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java @@ -18,11 +18,11 @@ package org.apache.skywalking.apm.plugin.sofarpc; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.filter.ConsumerInvoker; import java.util.List; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; @@ -50,11 +50,12 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import com.alipay.sofa.rpc.client.ProviderInfo; -import com.alipay.sofa.rpc.context.RpcInternalContext; -import com.alipay.sofa.rpc.core.request.SofaRequest; -import com.alipay.sofa.rpc.core.response.SofaResponse; -import com.alipay.sofa.rpc.filter.ConsumerInvoker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; @RunWith(TracingSegmentRunner.class) public class SofaRpcConsumerInterceptorTest { @@ -121,7 +122,8 @@ public void setUp() throws Exception { @Test public void testConsumerWithAttachment() throws Throwable { - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); @@ -133,8 +135,10 @@ public void testConsumerWithAttachment() throws Throwable { @Test public void testConsumerWithException() throws Throwable { - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); - sofaRpcConsumerInterceptor.handleMethodException(enhancedInstance, null, allArguments, argumentTypes, new RuntimeException()); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.handleMethodException( + enhancedInstance, null, allArguments, argumentTypes, new RuntimeException()); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); @@ -146,7 +150,8 @@ public void testConsumerWithResultHasException() throws Throwable { when(sofaResponse.isError()).thenReturn(true); when(sofaResponse.getAppResponse()).thenReturn(new RuntimeException()); - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); @@ -180,8 +185,11 @@ private void assertCommonsAttribute(AbstractTracingSpan span) { assertThat(tags.size(), is(1)); assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.RPC_FRAMEWORK)); assertThat(SpanHelper.getComponentId(span), is(43)); - assertThat(tags.get(0) - .getValue(), is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)")); + assertThat( + tags.get(0) + .getValue(), + is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)") + ); assertThat(span.getOperationName(), is("org.apache.skywalking.apm.test.TestSofaRpcService.test(String)")); } } diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java index 43c2938e5e..80edc8016a 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java @@ -18,10 +18,11 @@ package org.apache.skywalking.apm.plugin.sofarpc; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.filter.ProviderInvoker; import java.util.List; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem; @@ -51,11 +52,11 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import com.alipay.sofa.rpc.client.ProviderInfo; -import com.alipay.sofa.rpc.context.RpcInternalContext; -import com.alipay.sofa.rpc.core.request.SofaRequest; -import com.alipay.sofa.rpc.core.response.SofaResponse; -import com.alipay.sofa.rpc.filter.ProviderInvoker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; @RunWith(TracingSegmentRunner.class) public class SofaRpcProviderInterceptorTest { diff --git a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml index f8d0f4f807..c6655d281b 100644 --- a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml @@ -34,6 +34,24 @@ segmentItems: refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: sofarpc-scenario, traceId: not null} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: Thread/com.alipay.sofa.rpc.message.bolt.BoltInvokerCallback/onResponse + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: nq 0 + endTime: nq 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - {parentEndpoint: 'GET:/sofarpc-scenario/case/sofarpc', networkAddress: '', + refType: CrossThread, parentSpanId: 2, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: sofarpc-scenario, + traceId: not null} - segmentId: not null spans: - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String) @@ -49,6 +67,19 @@ segmentItems: tags: - {key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String)'} skipAnalysis: 'false' + - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String) + parentSpanId: 0 + spanId: 2 + spanLayer: RPCFramework + startTime: nq 0 + endTime: nq 0 + componentId: 43 + isError: false + spanType: Exit + peer: 127.0.0.1:12200 + tags: + - { key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String)' } + skipAnalysis: 'false' - operationName: GET:/sofarpc-scenario/case/sofarpc parentSpanId: -1 spanId: 0 @@ -64,3 +95,20 @@ segmentItems: - {key: http.method, value: GET} - {key: http.status_code, value: '200'} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String) + parentSpanId: -1 + spanId: 0 + spanLayer: RPCFramework + startTime: nq 0 + endTime: nq 0 + componentId: 43 + isError: false + spanType: Entry + peer: '' + refs: + - { parentEndpoint: GET:/sofarpc-scenario/case/sofarpc, networkAddress: '127.0.0.1:12200', + refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not + null, parentService: sofarpc-scenario, traceId: not null } + skipAnalysis: 'false' diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java index faa8bb8639..609a0e5266 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.testcase.sofarpc; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; @@ -42,8 +43,9 @@ public static class SofaRpcConfiguration { public ProviderConfig provider() { ServerConfig config = new ServerConfig().setProtocol("bolt").setPort(12200).setDaemon(true); - ProviderConfig providerConfig = new ProviderConfig().setInterfaceId(SofaRpcDemoService.class - .getName()).setRef(new SofaRpcDemoServiceImpl()).setServer(config); + ProviderConfig providerConfig = new ProviderConfig().setInterfaceId( + SofaRpcDemoService.class + .getName()).setRef(new SofaRpcDemoServiceImpl()).setServer(config); providerConfig.export(); return providerConfig; @@ -55,5 +57,13 @@ public ConsumerConfig consumer() { .setProtocol("bolt") .setDirectUrl("bolt://127.0.0.1:12200"); } + + @Bean + public ConsumerConfig callbackConsumer() { + return new ConsumerConfig().setInterfaceId(SofaRpcDemoService.class.getName()) + .setProtocol("bolt") + .setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK) + .setDirectUrl("bolt://127.0.0.1:12200"); + } } } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java new file mode 100644 index 0000000000..81da5ba588 --- /dev/null +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.testcase.sofarpc.callback; + +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; +import com.alipay.sofa.rpc.core.request.RequestBase; +import org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService; + +public class TestCallback implements SofaResponseCallback { + + private SofaRpcDemoService service; + + public TestCallback(final SofaRpcDemoService service) { + this.service = service; + } + + @Override + public void onAppResponse(final Object o, final String s, final RequestBase requestBase) { + } + + @Override + public void onAppException(final Throwable throwable, final String s, final RequestBase requestBase) { + + } + + @Override + public void onSofaException(final SofaRpcException e, final String s, final RequestBase requestBase) { + + } +} diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java index ca9320001e..fff7b5ac34 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java @@ -19,8 +19,11 @@ package org.apache.skywalking.apm.testcase.sofarpc.controller; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import org.apache.skywalking.apm.testcase.sofarpc.callback.TestCallback; import org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @@ -32,8 +35,13 @@ public class CaseController { private static final String SUCCESS = "Success"; @Autowired + @Qualifier("consumer") private ConsumerConfig consumerConfig; + @Autowired + @Qualifier("callbackConsumer") + private ConsumerConfig callbackConsumer; + @RequestMapping("/healthCheck") @ResponseBody public String healthCheck() { @@ -45,6 +53,11 @@ public String healthCheck() { public String sofarpc() { SofaRpcDemoService service = consumerConfig.refer(); service.hello("sofarpc"); + + SofaRpcDemoService callbackService = callbackConsumer.refer(); + RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext(); + invokeCtx.setResponseCallback(new TestCallback(service)); + callbackService.callback("sofarpc"); return SUCCESS; } } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java index cd541109b7..f2d0f93e9e 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java @@ -21,4 +21,6 @@ public interface SofaRpcDemoService { String hello(String name); + + String callback(String name); } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java index 9fed6ddc3d..b946932117 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java @@ -25,4 +25,9 @@ public class SofaRpcDemoServiceImpl implements SofaRpcDemoService { public String hello(String name) { return "hello, " + name; } + + @Override + public String callback(String name) { + return "hello, " + name; + } }