Skip to content

Commit a33e422

Browse files
authored
Trace websocket for spring webflux reactive handlers (#8831)
* Trace websocket for spring webflux reactive handlers * format
1 parent 2dab68d commit a33e422

File tree

10 files changed

+405
-52
lines changed

10 files changed

+405
-52
lines changed

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/NettyChannelPipelineInstrumentation.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import datadog.trace.instrumentation.netty41.server.HttpServerResponseTracingHandler;
2424
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
2525
import datadog.trace.instrumentation.netty41.server.MaybeBlockResponseHandler;
26-
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerRequestTracingHandler;
27-
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerResponseTracingHandler;
26+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerInboundTracingHandler;
27+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerOutboundTracingHandler;
2828
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerTracingHandler;
2929
import io.netty.channel.ChannelHandler;
3030
import io.netty.channel.ChannelPipeline;
@@ -34,6 +34,8 @@
3434
import io.netty.handler.codec.http.HttpResponseDecoder;
3535
import io.netty.handler.codec.http.HttpResponseEncoder;
3636
import io.netty.handler.codec.http.HttpServerCodec;
37+
import io.netty.handler.codec.http.websocketx.WebSocketFrameDecoder;
38+
import io.netty.handler.codec.http.websocketx.WebSocketFrameEncoder;
3739
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
3840
import io.netty.util.Attribute;
3941
import net.bytebuddy.asm.Advice;
@@ -82,8 +84,8 @@ public String[] helperClassNames() {
8284
packageName + ".server.HttpServerTracingHandler",
8385
packageName + ".server.MaybeBlockResponseHandler",
8486
packageName + ".server.websocket.WebSocketServerTracingHandler",
85-
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
86-
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
87+
packageName + ".server.websocket.WebSocketServerOutboundTracingHandler",
88+
packageName + ".server.websocket.WebSocketServerInboundTracingHandler",
8789
packageName + ".NettyHttp2Helper",
8890
packageName + ".NettyPipelineHelper",
8991
};
@@ -162,23 +164,31 @@ public static void addHandler(
162164
HttpServerResponseTracingHandler.INSTANCE,
163165
MaybeBlockResponseHandler.INSTANCE);
164166
} else if (handler instanceof WebSocketServerProtocolHandler) {
165-
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
166-
if (pipeline.get(HttpServerTracingHandler.class) != null) {
167-
NettyPipelineHelper.addHandlerAfter(
168-
pipeline, "HttpServerTracingHandler#0", new WebSocketServerTracingHandler());
167+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
168+
&& pipeline.get(HttpServerTracingHandler.class) != null) {
169+
// remove single websocket handler if added before
170+
if (pipeline.get(WebSocketServerInboundTracingHandler.class) != null) {
171+
pipeline.remove(WebSocketServerInboundTracingHandler.class);
169172
}
170-
if (pipeline.get(HttpServerRequestTracingHandler.class) != null) {
171-
NettyPipelineHelper.addHandlerAfter(
172-
pipeline,
173-
"HttpServerRequestTracingHandler#0",
174-
WebSocketServerRequestTracingHandler.INSTANCE);
175-
}
176-
if (pipeline.get(HttpServerResponseTracingHandler.class) != null) {
177-
NettyPipelineHelper.addHandlerAfter(
178-
pipeline,
179-
"HttpServerResponseTracingHandler#0",
180-
WebSocketServerResponseTracingHandler.INSTANCE);
173+
if (pipeline.get(WebSocketServerOutboundTracingHandler.class) != null) {
174+
pipeline.remove(WebSocketServerOutboundTracingHandler.class);
181175
}
176+
NettyPipelineHelper.addHandlerAfter(
177+
pipeline,
178+
pipeline.get(HttpServerTracingHandler.class),
179+
new WebSocketServerTracingHandler());
180+
}
181+
} else if (handler instanceof WebSocketFrameDecoder) {
182+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
183+
&& pipeline.get(WebSocketServerTracingHandler.class) == null) {
184+
NettyPipelineHelper.addHandlerAfter(
185+
pipeline, handler, WebSocketServerInboundTracingHandler.INSTANCE);
186+
}
187+
} else if (handler instanceof WebSocketFrameEncoder) {
188+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
189+
&& pipeline.get(WebSocketServerTracingHandler.class) == null) {
190+
NettyPipelineHelper.addHandlerAfter(
191+
pipeline, handler, WebSocketServerOutboundTracingHandler.INSTANCE);
182192
}
183193
}
184194
// Client pipeline handlers
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1818

1919
@ChannelHandler.Sharable
20-
public class WebSocketServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
21-
public static WebSocketServerRequestTracingHandler INSTANCE =
22-
new WebSocketServerRequestTracingHandler();
20+
public class WebSocketServerInboundTracingHandler extends ChannelInboundHandlerAdapter {
21+
public static WebSocketServerInboundTracingHandler INSTANCE =
22+
new WebSocketServerInboundTracingHandler();
2323

2424
@Override
2525
public void channelRead(ChannelHandlerContext ctx, Object frame) {
26-
2726
if (frame instanceof WebSocketFrame) {
2827
Channel channel = ctx.channel();
2928
HandlerContext.Receiver receiverContext =
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1818

1919
@ChannelHandler.Sharable
20-
public class WebSocketServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
21-
public static WebSocketServerResponseTracingHandler INSTANCE =
22-
new WebSocketServerResponseTracingHandler();
20+
public class WebSocketServerOutboundTracingHandler extends ChannelOutboundHandlerAdapter {
21+
public static WebSocketServerOutboundTracingHandler INSTANCE =
22+
new WebSocketServerOutboundTracingHandler();
2323

2424
@Override
2525
public void write(ChannelHandlerContext ctx, Object frame, ChannelPromise promise)

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/websocket/WebSocketServerTracingHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
public class WebSocketServerTracingHandler
66
extends CombinedChannelDuplexHandler<
7-
WebSocketServerRequestTracingHandler, WebSocketServerResponseTracingHandler> {
7+
WebSocketServerInboundTracingHandler, WebSocketServerOutboundTracingHandler> {
88

99
public WebSocketServerTracingHandler() {
1010
super(
11-
WebSocketServerRequestTracingHandler.INSTANCE,
12-
WebSocketServerResponseTracingHandler.INSTANCE);
11+
WebSocketServerInboundTracingHandler.INSTANCE,
12+
WebSocketServerOutboundTracingHandler.INSTANCE);
1313
}
1414
}

dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/SpringWebfluxTest.groovy

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datadog.trace.agent.test.AgentTestRunner
22
import datadog.trace.agent.test.asserts.TraceAssert
3+
import datadog.trace.agent.test.base.OkHttpWebsocketClient
34
import datadog.trace.api.DDSpanTypes
45
import datadog.trace.api.DDTags
56
import datadog.trace.bootstrap.instrumentation.api.Tags
@@ -9,6 +10,9 @@ import dd.trace.instrumentation.springwebflux.server.EchoHandlerFunction
910
import dd.trace.instrumentation.springwebflux.server.FooModel
1011
import dd.trace.instrumentation.springwebflux.server.SpringWebFluxTestApplication
1112
import dd.trace.instrumentation.springwebflux.server.TestController
13+
import dd.trace.instrumentation.springwebflux.server.WsHandler
14+
import net.bytebuddy.utility.RandomString
15+
import org.springframework.beans.factory.annotation.Autowired
1216
import org.springframework.boot.test.context.SpringBootTest
1317
import org.springframework.boot.test.context.TestConfiguration
1418
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
@@ -21,6 +25,10 @@ import org.springframework.web.reactive.function.client.WebClient
2125
import org.springframework.web.server.ResponseStatusException
2226
import reactor.core.publisher.Mono
2327

28+
import static datadog.trace.agent.test.base.HttpServerTest.websocketCloseSpan
29+
import static datadog.trace.agent.test.base.HttpServerTest.websocketReceiveSpan
30+
import static datadog.trace.agent.test.base.HttpServerTest.websocketSendSpan
31+
2432
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
2533
classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration],
2634
properties = "server.http2.enabled=true")
@@ -40,13 +48,22 @@ class SpringWebfluxTest extends AgentTestRunner {
4048
@LocalServerPort
4149
int port
4250

43-
WebClient client = WebClient.builder().clientConnector (new ReactorClientHttpConnector()).build()
51+
@Autowired
52+
private WsHandler wsHandler
53+
54+
WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build()
4455

4556
@Override
4657
boolean useStrictTraceWrites() {
4758
false
4859
}
4960

61+
@Override
62+
protected void configurePreAgent() {
63+
super.configurePreAgent()
64+
injectSysConfig("trace.websocket.messages.enabled", "true")
65+
}
66+
5067
def "Basic GET test #testName"() {
5168
setup:
5269
String url = "http://localhost:$port$urlPath"
@@ -61,7 +78,7 @@ class SpringWebfluxTest extends AgentTestRunner {
6178
sortSpansByStart()
6279
trace(2) {
6380
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
64-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
81+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
6582
}
6683
trace(2) {
6784
span {
@@ -142,7 +159,7 @@ class SpringWebfluxTest extends AgentTestRunner {
142159
def traceParent
143160
trace(2) {
144161
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
145-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
162+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
146163
}
147164
trace(3) {
148165
span {
@@ -237,7 +254,7 @@ class SpringWebfluxTest extends AgentTestRunner {
237254
def traceParent
238255
trace(2) {
239256
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
240-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
257+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
241258
}
242259
trace(3) {
243260
span {
@@ -285,7 +302,7 @@ class SpringWebfluxTest extends AgentTestRunner {
285302
def traceParent
286303
trace(2) {
287304
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 404, true)
288-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 404, true)
305+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 404, true)
289306
}
290307
trace(2) {
291308
span {
@@ -331,7 +348,7 @@ class SpringWebfluxTest extends AgentTestRunner {
331348
String url = "http://localhost:$port/echo"
332349

333350
when:
334-
def response = client.post().uri(url).body(BodyInserters.fromPublisher(Mono.just(echoString),String)).exchange().block()
351+
def response = client.post().uri(url).body(BodyInserters.fromPublisher(Mono.just(echoString), String)).exchange().block()
335352

336353
then:
337354
response.statusCode().value() == 202
@@ -341,7 +358,7 @@ class SpringWebfluxTest extends AgentTestRunner {
341358
def traceParent
342359
trace(2) {
343360
clientSpan(it, null, "http.request", "spring-webflux-client", "POST", URI.create(url), 202)
344-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "POST", URI.create(url), 202)
361+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "POST", URI.create(url), 202)
345362
}
346363
trace(3) {
347364
span {
@@ -406,7 +423,7 @@ class SpringWebfluxTest extends AgentTestRunner {
406423
def traceParent
407424
trace(2) {
408425
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 500)
409-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 500)
426+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 500)
410427
}
411428
trace(2) {
412429
span {
@@ -495,7 +512,7 @@ class SpringWebfluxTest extends AgentTestRunner {
495512
trace(2) {
496513
sortSpansByStart()
497514
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 307)
498-
traceParent1 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 307)
515+
traceParent1 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 307)
499516
}
500517

501518
trace(2) {
@@ -540,7 +557,7 @@ class SpringWebfluxTest extends AgentTestRunner {
540557
trace(2) {
541558
sortSpansByStart()
542559
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(finalUrl))
543-
traceParent2 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(finalUrl))
560+
traceParent2 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(finalUrl))
544561
}
545562
trace(2) {
546563
sortSpansByStart()
@@ -599,7 +616,7 @@ class SpringWebfluxTest extends AgentTestRunner {
599616
def traceParent
600617
trace(2) {
601618
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
602-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
619+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
603620
}
604621
trace(2) {
605622
span {
@@ -660,6 +677,73 @@ class SpringWebfluxTest extends AgentTestRunner {
660677
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
661678
}
662679

680+
def 'test websocket server receive #msgType message of size #size and #chunks chunks'() {
681+
when:
682+
String url = "http://localhost:$port/websocket"
683+
def wsClient = new OkHttpWebsocketClient()
684+
wsClient.connect(url)
685+
wsHandler.awaitConnected()
686+
if (message instanceof String) {
687+
wsClient.send(message as String)
688+
} else {
689+
wsClient.send(message as byte[])
690+
}
691+
wsHandler.awaitExchangeComplete()
692+
wsClient.close(1001, "goodbye")
693+
694+
then:
695+
assertTraces(3, {
696+
DDSpan handshake
697+
trace(2) {
698+
sortSpansByStart()
699+
handshake = span(0)
700+
span {
701+
resourceName "GET /websocket"
702+
operationName "netty.request"
703+
spanType DDSpanTypes.HTTP_SERVER
704+
tags {
705+
"$Tags.COMPONENT" "netty"
706+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
707+
"$Tags.PEER_HOST_IPV4" "127.0.0.1"
708+
"$Tags.PEER_PORT" Integer
709+
"$Tags.HTTP_URL" url
710+
"$Tags.HTTP_HOSTNAME" "localhost"
711+
"$Tags.HTTP_METHOD" "GET"
712+
"$Tags.HTTP_STATUS" 101
713+
"$Tags.HTTP_USER_AGENT" String
714+
"$Tags.HTTP_CLIENT_IP" "127.0.0.1"
715+
"$Tags.HTTP_ROUTE" "/websocket"
716+
defaultTags()
717+
}
718+
}
719+
span {
720+
resourceName "WsHandler.handle"
721+
operationName "WsHandler.handle"
722+
spanType DDSpanTypes.HTTP_SERVER
723+
childOfPrevious()
724+
tags {
725+
"$Tags.COMPONENT" "spring-webflux-controller"
726+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
727+
"handler.type" WsHandler.getName()
728+
defaultTags()
729+
}
730+
}
731+
}
732+
trace(2) {
733+
sortSpansByStart()
734+
websocketReceiveSpan(it, handshake, msgType, size, chunks)
735+
websocketSendSpan(it, handshake, msgType, size, chunks)
736+
}
737+
trace(1) {
738+
websocketCloseSpan(it, handshake, false, 1001, "goodbye")
739+
}
740+
})
741+
where:
742+
message | msgType | chunks | size
743+
RandomString.make(10) | "text" | 1 | 10
744+
RandomString.make(20).getBytes("UTF-8") | "binary" | 1 | 20
745+
}
746+
663747
def clientSpan(
664748
TraceAssert trace,
665749
Object parentSpan,

dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/dd/trace/instrumentation/springwebflux/server/SpringWebFluxTestApplication.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
55
import org.springframework.context.annotation.Bean
66
import org.springframework.http.MediaType
77
import org.springframework.stereotype.Component
8+
import org.springframework.web.reactive.HandlerMapping
89
import org.springframework.web.reactive.function.BodyInserters
910
import org.springframework.web.reactive.function.server.HandlerFunction
1011
import org.springframework.web.reactive.function.server.RouterFunction
1112
import org.springframework.web.reactive.function.server.ServerRequest
1213
import org.springframework.web.reactive.function.server.ServerResponse
14+
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
15+
import org.springframework.web.reactive.socket.WebSocketHandler
16+
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
1317
import reactor.core.publisher.Mono
1418

1519
import java.time.Duration
@@ -26,6 +30,22 @@ class SpringWebFluxTestApplication {
2630
return route(POST("/echo"), new EchoHandlerFunction(echoHandler))
2731
}
2832

33+
@Bean
34+
WebSocketHandlerAdapter webSocketHandlerAdapter() {
35+
return new WebSocketHandlerAdapter()
36+
}
37+
38+
@Bean
39+
HandlerMapping wsHandlerMapping(WsHandler wsHandler) {
40+
Map<String, WebSocketHandler> map = new HashMap<>()
41+
map.put("/websocket", wsHandler)
42+
43+
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping()
44+
handlerMapping.setOrder(1)
45+
handlerMapping.setUrlMap(map)
46+
return handlerMapping
47+
}
48+
2949
@Bean
3050
RouterFunction<ServerResponse> greetRouterFunction(GreetingHandler greetingHandler) {
3151
return route(GET("/greet"), new HandlerFunction<ServerResponse>() {

0 commit comments

Comments
 (0)