11import datadog.trace.agent.test.AgentTestRunner
22import datadog.trace.agent.test.asserts.TraceAssert
3+ import datadog.trace.agent.test.base.HttpServerTest
4+ import datadog.trace.agent.test.base.OkHttpWebsocketClient
5+ import datadog.trace.agent.test.base.WebsocketServer
36import datadog.trace.api.DDSpanTypes
47import datadog.trace.api.DDTags
58import datadog.trace.bootstrap.instrumentation.api.Tags
@@ -9,6 +12,9 @@ import dd.trace.instrumentation.springwebflux.server.EchoHandlerFunction
912import dd.trace.instrumentation.springwebflux.server.FooModel
1013import dd.trace.instrumentation.springwebflux.server.SpringWebFluxTestApplication
1114import dd.trace.instrumentation.springwebflux.server.TestController
15+ import dd.trace.instrumentation.springwebflux.server.WsHandler
16+ import net.bytebuddy.utility.RandomString
17+ import org.springframework.beans.factory.annotation.Autowired
1218import org.springframework.boot.test.context.SpringBootTest
1319import org.springframework.boot.test.context.TestConfiguration
1420import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
@@ -18,12 +24,25 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector
1824import org.springframework.web.reactive.function.BodyExtractors
1925import org.springframework.web.reactive.function.BodyInserters
2026import org.springframework.web.reactive.function.client.WebClient
27+ import org.springframework.web.reactive.socket.WebSocketHandler
28+ import org.springframework.web.reactive.socket.WebSocketSession
29+ import org.springframework.web.reactive.socket.client.WebSocketClient
2130import org.springframework.web.server.ResponseStatusException
2231import reactor.core.publisher.Mono
2332
33+ import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
34+ import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
35+ import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
36+ import static datadog.trace.agent.test.base.HttpServerTest.websocketCloseSpan
37+ import static datadog.trace.agent.test.base.HttpServerTest.websocketReceiveSpan
38+ import static datadog.trace.agent.test.base.HttpServerTest.websocketSendSpan
39+ import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
40+ import static org.junit.Assume.assumeTrue
41+ import static org.junit.Assume.assumeTrue
42+
2443@SpringBootTest (webEnvironment = SpringBootTest.WebEnvironment .RANDOM_PORT ,
25- classes = [SpringWebFluxTestApplication , ForceNettyAutoConfiguration ],
26- properties = " server.http2.enabled=true" )
44+ classes = [SpringWebFluxTestApplication , ForceNettyAutoConfiguration ],
45+ properties = " server.http2.enabled=true" )
2746class SpringWebfluxTest extends AgentTestRunner {
2847
2948 @TestConfiguration
@@ -40,13 +59,22 @@ class SpringWebfluxTest extends AgentTestRunner {
4059 @LocalServerPort
4160 int port
4261
43- WebClient client = WebClient . builder(). clientConnector (new ReactorClientHttpConnector ()). build()
62+ @Autowired
63+ private WsHandler wsHandler
64+
65+ WebClient client = WebClient . builder(). clientConnector(new ReactorClientHttpConnector ()). build()
4466
4567 @Override
4668 boolean useStrictTraceWrites () {
4769 false
4870 }
4971
72+ @Override
73+ protected void configurePreAgent () {
74+ super . configurePreAgent()
75+ injectSysConfig(" trace.websocket.messages.enabled" , " true" )
76+ }
77+
5078 def " Basic GET test #testName" () {
5179 setup :
5280 String url = " http://localhost:$port $urlPath "
@@ -61,7 +89,7 @@ class SpringWebfluxTest extends AgentTestRunner {
6189 sortSpansByStart()
6290 trace(2 ) {
6391 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url))
64- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
92+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
6593 }
6694 trace(2 ) {
6795 span {
@@ -142,7 +170,7 @@ class SpringWebfluxTest extends AgentTestRunner {
142170 def traceParent
143171 trace(2 ) {
144172 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url))
145- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
173+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
146174 }
147175 trace(3 ) {
148176 span {
@@ -237,7 +265,7 @@ class SpringWebfluxTest extends AgentTestRunner {
237265 def traceParent
238266 trace(2 ) {
239267 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url))
240- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
268+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
241269 }
242270 trace(3 ) {
243271 span {
@@ -285,7 +313,7 @@ class SpringWebfluxTest extends AgentTestRunner {
285313 def traceParent
286314 trace(2 ) {
287315 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url), 404 , true )
288- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 404 , true )
316+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 404 , true )
289317 }
290318 trace(2 ) {
291319 span {
@@ -331,7 +359,7 @@ class SpringWebfluxTest extends AgentTestRunner {
331359 String url = " http://localhost:$port /echo"
332360
333361 when :
334- def response = client. post(). uri(url). body(BodyInserters . fromPublisher(Mono . just(echoString),String )). exchange(). block()
362+ def response = client. post(). uri(url). body(BodyInserters . fromPublisher(Mono . just(echoString), String )). exchange(). block()
335363
336364 then :
337365 response. statusCode(). value() == 202
@@ -341,7 +369,7 @@ class SpringWebfluxTest extends AgentTestRunner {
341369 def traceParent
342370 trace(2 ) {
343371 clientSpan(it, null , " http.request" , " spring-webflux-client" , " POST" , URI . create(url), 202 )
344- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " POST" , URI . create(url), 202 )
372+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " POST" , URI . create(url), 202 )
345373 }
346374 trace(3 ) {
347375 span {
@@ -406,7 +434,7 @@ class SpringWebfluxTest extends AgentTestRunner {
406434 def traceParent
407435 trace(2 ) {
408436 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url), 500 )
409- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 500 )
437+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 500 )
410438 }
411439 trace(2 ) {
412440 span {
@@ -479,13 +507,13 @@ class SpringWebfluxTest extends AgentTestRunner {
479507
480508 when :
481509 def response = client. get(). uri(url). exchange()
482- .flatMap(response -> {
483- if (response. statusCode(). is3xxRedirection()) {
484- String redirectUrl = response. headers(). header(" Location" ). get(0 )
485- return response. bodyToMono(Void . class). then(client. get(). uri(URI . create(" http://localhost:$port " ). resolve(redirectUrl)). exchange())
486- }
487- return Mono . just(response)
488- }). block()
510+ .flatMap(response -> {
511+ if (response. statusCode(). is3xxRedirection()) {
512+ String redirectUrl = response. headers(). header(" Location" ). get(0 )
513+ return response. bodyToMono(Void . class). then(client. get(). uri(URI . create(" http://localhost:$port " ). resolve(redirectUrl)). exchange())
514+ }
515+ return Mono . just(response)
516+ }). block()
489517
490518 then :
491519 response. statusCode(). value() == 200
@@ -495,7 +523,7 @@ class SpringWebfluxTest extends AgentTestRunner {
495523 trace(2 ) {
496524 sortSpansByStart()
497525 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url), 307 )
498- traceParent1 = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 307 )
526+ traceParent1 = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url), 307 )
499527 }
500528
501529 trace(2 ) {
@@ -531,7 +559,7 @@ class SpringWebfluxTest extends AgentTestRunner {
531559 " request.predicate" " (GET && /double-greet-redirect)"
532560 " handler.type" { String tagVal ->
533561 return (tagVal. contains(INNER_HANDLER_FUNCTION_CLASS_TAG_PREFIX )
534- || tagVal. contains(" Lambda" ))
562+ || tagVal. contains(" Lambda" ))
535563 }
536564 defaultTags()
537565 }
@@ -540,7 +568,7 @@ class SpringWebfluxTest extends AgentTestRunner {
540568 trace(2 ) {
541569 sortSpansByStart()
542570 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(finalUrl))
543- traceParent2 = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(finalUrl))
571+ traceParent2 = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(finalUrl))
544572 }
545573 trace(2 ) {
546574 sortSpansByStart()
@@ -599,7 +627,7 @@ class SpringWebfluxTest extends AgentTestRunner {
599627 def traceParent
600628 trace(2 ) {
601629 clientSpan(it, null , " http.request" , " spring-webflux-client" , " GET" , URI . create(url))
602- traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
630+ traceParent = clientSpan(it, span(0 ), " netty.client.request" , " netty-client" , " GET" , URI . create(url))
603631 }
604632 trace(2 ) {
605633 span {
@@ -660,6 +688,73 @@ class SpringWebfluxTest extends AgentTestRunner {
660688 " annotation API delayed response" | " /foo-delayed" | " /foo-delayed" | " getFooDelayed" | new FooModel (3L , " delayed" ). toString()
661689 }
662690
691+ def ' test websocket server receive #msgType message of size #size and #chunks chunks' () {
692+ when :
693+ String url = " http://localhost:$port /websocket"
694+ def wsClient = new OkHttpWebsocketClient ()
695+ wsClient. connect(url)
696+ wsHandler. awaitConnected()
697+ if (message instanceof String ) {
698+ wsClient. send(message as String )
699+ } else {
700+ wsClient. send(message as byte [])
701+ }
702+ wsHandler. awaitExchangeComplete()
703+ wsClient. close(1001 , " goodbye" )
704+
705+ then :
706+ assertTraces(3 , {
707+ DDSpan handshake
708+ trace(2 ) {
709+ sortSpansByStart()
710+ handshake = span(0 )
711+ span {
712+ resourceName " GET /websocket"
713+ operationName " netty.request"
714+ spanType DDSpanTypes . HTTP_SERVER
715+ tags {
716+ " $Tags . COMPONENT " " netty"
717+ " $Tags . SPAN_KIND " Tags . SPAN_KIND_SERVER
718+ " $Tags . PEER_HOST_IPV4 " " 127.0.0.1"
719+ " $Tags . PEER_PORT " Integer
720+ " $Tags . HTTP_URL " url
721+ " $Tags . HTTP_HOSTNAME " " localhost"
722+ " $Tags . HTTP_METHOD " " GET"
723+ " $Tags . HTTP_STATUS " 101
724+ " $Tags . HTTP_USER_AGENT " String
725+ " $Tags . HTTP_CLIENT_IP " " 127.0.0.1"
726+ " $Tags . HTTP_ROUTE " " /websocket"
727+ defaultTags()
728+ }
729+ }
730+ span {
731+ resourceName " WsHandler.handle"
732+ operationName " WsHandler.handle"
733+ spanType DDSpanTypes . HTTP_SERVER
734+ childOfPrevious()
735+ tags {
736+ " $Tags . COMPONENT " " spring-webflux-controller"
737+ " $Tags . SPAN_KIND " Tags . SPAN_KIND_SERVER
738+ " handler.type" WsHandler . getName()
739+ defaultTags()
740+ }
741+ }
742+ }
743+ trace(2 ) {
744+ sortSpansByStart()
745+ websocketReceiveSpan(it, handshake, msgType, size, chunks)
746+ websocketSendSpan(it, handshake, msgType, size, chunks)
747+ }
748+ trace(1 ) {
749+ websocketCloseSpan(it, handshake, false , 1001 , " goodbye" )
750+ }
751+ })
752+ where :
753+ message | msgType | chunks | size
754+ RandomString . make(10 ) | " text" | 1 | 10
755+ RandomString . make(20 ). getBytes(" UTF-8" ) | " binary" | 1 | 20
756+ }
757+
663758 def clientSpan (
664759 TraceAssert trace ,
665760 Object parentSpan ,
0 commit comments