Skip to content

Commit 9ab7241

Browse files
authored
Make OpenTelemetryTracingBackend generic - working for any effect (#2494)
1 parent 3fad599 commit 9ab7241

File tree

6 files changed

+154
-92
lines changed

6 files changed

+154
-92
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ val zio2InteropRsVersion = "2.0.2"
152152

153153
val oxVersion = "0.5.7"
154154
val sttpModelVersion = "1.7.12"
155-
val sttpSharedVersion = "1.4.2"
155+
val sttpSharedVersion = "1.5.0"
156156

157157
val logback = "ch.qos.logback" % "logback-classic" % "1.5.14"
158158

docs/backends/wrappers/opentelemetry.md

+8-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Currently, the following OpenTelemetry features are supported:
44

55
- metrics using `OpenTelemetryMetricsBackend`, wrapping any other backend
6-
- tracing using `OpenTelemetryTracingSyncBackend`, wrapping a synchronous backend
6+
- tracing using `OpenTelemetryTracingBackend`, wrapping a synchronous backend
77
- tracing using `OpenTelemetryTracingZioBackend`, wrapping any ZIO2 backend
88
- tracing using [trace4cats](https://github.com/trace4cats/trace4cats), wrapping a cats-effect backend
99

@@ -51,7 +51,7 @@ OpenTelemetryMetricsBackend(
5151
)
5252
```
5353

54-
## Tracing (synchronous)
54+
## Tracing
5555

5656
To use, add the following dependency to your project:
5757

@@ -71,11 +71,10 @@ Other aspects of the backend can be configured as well:
7171
* how request, response, error attributes are computed
7272

7373
```{note}
74-
The backend relies on context passed using the default mechanism in OpenTelemetry SDK for Java, that is using
75-
`ThreadLocal`s. This means that the backend will **not** work properly when combined with any asynchronous
76-
execution mechanisms, such as `Future`s.
77-
78-
Moreover, for Java 21+, note that `ThreadLocal`s are not inherited when spawning a new virtual thread.
74+
Relies on the built-in OpenTelemetry Java SDK `ContextStorage` mechanism of propagating the tracing context;
75+
by default, this is using `ThreadLocal`s, which works with synchronous/direct-style environments. `Future`s are
76+
supported through instrumentation provided by the OpenTelemetry javaagent. For functional effect systems, usually
77+
a dedicated integration library is required.
7978
```
8079

8180
Example usage:
@@ -88,9 +87,9 @@ import io.opentelemetry.api.OpenTelemetry
8887
val sttpBackend: SyncBackend = ???
8988
val openTelemetry: OpenTelemetry = ???
9089

91-
OpenTelemetryTracingSyncBackend(
90+
OpenTelemetryTracingBackend(
9291
sttpBackend,
93-
OpenTelemetryTracingSyncConfig(
92+
OpenTelemetryTracingConfig(
9493
openTelemetry,
9594
spanName = request => request.uri.pathSegments.segments.headOption.map(_.v).getOrElse("root")
9695
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package sttp.client4.opentelemetry
2+
3+
import io.opentelemetry.api.OpenTelemetry
4+
import io.opentelemetry.context.Context
5+
import io.opentelemetry.context.propagation.TextMapSetter
6+
import sttp.capabilities.Effect
7+
import sttp.client4.Backend
8+
import sttp.client4.GenericBackend
9+
import sttp.client4.GenericRequest
10+
import sttp.client4.Response
11+
import sttp.client4.ResponseException
12+
import sttp.client4.StreamBackend
13+
import sttp.client4.SyncBackend
14+
import sttp.client4.WebSocketBackend
15+
import sttp.client4.WebSocketStreamBackend
16+
import sttp.client4.wrappers.DelegateBackend
17+
import sttp.client4.wrappers.FollowRedirectsBackend
18+
import sttp.monad.syntax._
19+
20+
import scala.collection.mutable
21+
22+
/** A backend wrapper which traces requests using OpenTelemetry.
23+
*
24+
* Span names and attributes are calculated using the provided [[OpenTelemetryTracingConfig]].
25+
*
26+
* To use, wrap your backend using this one, e.g.:
27+
*
28+
* {{{
29+
* val rawBackend: SyncBackend = ???
30+
* val openTelemetry: OpenTelemetry = ???
31+
*
32+
* val tracingBackend = OpenTelemetryTracingBackend(rawBackend, openTelemetry)
33+
* }}}
34+
*
35+
* Relies on the built-in OpenTelemetry Java SDK [[io.opentelemetry.context.ContextStorage]] mechanism of propagating
36+
* the tracing context; by default, this is using [[ThreadLocal]]s, which works with synchronous/direct-style
37+
* environments. [[scala.concurrent.Future]]s are supported through instrumentation provided by the OpenTelemetry
38+
* javaagent. For functional effect systems, usually a dedicated integration library is required.
39+
*/
40+
class OpenTelemetryTracingBackend[F[_], P](delegate: GenericBackend[F, P], config: OpenTelemetryTracingConfig)
41+
extends DelegateBackend[F, P](delegate) {
42+
43+
private val setter = new TextMapSetter[mutable.Map[String, String]] {
44+
def set(carrier: mutable.Map[String, String], key: String, value: String): Unit = {
45+
val _ = carrier.put(key, value)
46+
}
47+
}
48+
49+
override def send[T](request: GenericRequest[T, P with Effect[F]]): F[Response[T]] = {
50+
monad
51+
.eval {
52+
config.tracer
53+
.spanBuilder(config.spanName(request))
54+
.setAllAttributes(config.requestAttributes(request))
55+
.startSpan()
56+
}
57+
.flatMap { span =>
58+
monad.ensure2(
59+
{
60+
val scope = span.makeCurrent()
61+
try {
62+
val carrier = mutable.Map.empty[String, String]
63+
config.propagators.getTextMapPropagator().inject(Context.current(), carrier, setter)
64+
65+
val requestWithTraceContext = request.headers(carrier.toMap)
66+
67+
delegate
68+
.send(requestWithTraceContext)
69+
.map { response =>
70+
span.setAllAttributes(config.responseAttributes(request, response))
71+
response
72+
}
73+
.handleError { case e: Throwable =>
74+
ResponseException.find(e) match {
75+
case Some(re) =>
76+
span.setAllAttributes(
77+
config.responseAttributes(request, Response((), re.response.code, request.onlyMetadata))
78+
)
79+
case _ =>
80+
span.setAllAttributes(config.errorAttributes(e))
81+
}
82+
monad.error(e)
83+
}
84+
} finally {
85+
scope.close()
86+
}
87+
}, {
88+
monad.eval(span.end())
89+
}
90+
)
91+
}
92+
}
93+
}
94+
95+
object OpenTelemetryTracingBackend {
96+
def apply(delegate: SyncBackend, openTelemetry: OpenTelemetry): SyncBackend =
97+
apply(delegate, OpenTelemetryTracingConfig(openTelemetry))
98+
99+
def apply[F[_]](delegate: Backend[F], openTelemetry: OpenTelemetry): Backend[F] =
100+
apply(delegate, OpenTelemetryTracingConfig(openTelemetry))
101+
102+
def apply[F[_]](delegate: WebSocketBackend[F], openTelemetry: OpenTelemetry): WebSocketBackend[F] =
103+
apply(delegate, OpenTelemetryTracingConfig(openTelemetry))
104+
105+
def apply[F[_], S](delegate: StreamBackend[F, S], openTelemetry: OpenTelemetry): StreamBackend[F, S] =
106+
apply(delegate, OpenTelemetryTracingConfig(openTelemetry))
107+
108+
def apply[F[_], S](
109+
delegate: WebSocketStreamBackend[F, S],
110+
openTelemetry: OpenTelemetry
111+
): WebSocketStreamBackend[F, S] =
112+
apply(delegate, OpenTelemetryTracingConfig(openTelemetry))
113+
114+
def apply(delegate: SyncBackend, config: OpenTelemetryTracingConfig): SyncBackend = {
115+
// redirects should be handled before tracing
116+
FollowRedirectsBackend(new OpenTelemetryTracingBackend(delegate, config) with SyncBackend)
117+
}
118+
119+
def apply[F[_]](delegate: Backend[F], config: OpenTelemetryTracingConfig): Backend[F] = {
120+
FollowRedirectsBackend(new OpenTelemetryTracingBackend(delegate, config) with Backend[F])
121+
}
122+
123+
def apply[F[_]](delegate: WebSocketBackend[F], config: OpenTelemetryTracingConfig): WebSocketBackend[F] = {
124+
FollowRedirectsBackend(new OpenTelemetryTracingBackend(delegate, config) with WebSocketBackend[F])
125+
}
126+
127+
def apply[F[_], S](delegate: StreamBackend[F, S], config: OpenTelemetryTracingConfig): StreamBackend[F, S] = {
128+
FollowRedirectsBackend(new OpenTelemetryTracingBackend(delegate, config) with StreamBackend[F, S])
129+
}
130+
131+
def apply[F[_], S](
132+
delegate: WebSocketStreamBackend[F, S],
133+
config: OpenTelemetryTracingConfig
134+
): WebSocketStreamBackend[F, S] = {
135+
FollowRedirectsBackend(new OpenTelemetryTracingBackend(delegate, config) with WebSocketStreamBackend[F, S])
136+
}
137+
}

observability/opentelemetry-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryTracingSyncConfig.scala observability/opentelemetry-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryTracingConfig.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import sttp.client4._
88

99
import java.time.Clock
1010

11-
case class OpenTelemetryTracingSyncConfig(
11+
case class OpenTelemetryTracingConfig(
1212
tracer: Tracer,
1313
propagators: ContextPropagators,
1414
clock: Clock,
@@ -18,7 +18,7 @@ case class OpenTelemetryTracingSyncConfig(
1818
errorAttributes: Throwable => Attributes
1919
)
2020

21-
object OpenTelemetryTracingSyncConfig {
21+
object OpenTelemetryTracingConfig {
2222
def apply(
2323
openTelemetry: OpenTelemetry,
2424
clock: Clock = Clock.systemUTC(),
@@ -27,7 +27,7 @@ object OpenTelemetryTracingSyncConfig {
2727
responseAttributes: (GenericRequest[_, _], Response[_]) => Attributes =
2828
OpenTelemetryDefaults.responseAttributes _,
2929
errorAttributes: Throwable => Attributes = OpenTelemetryDefaults.errorAttributes _
30-
): OpenTelemetryTracingSyncConfig = usingTracer(
30+
): OpenTelemetryTracingConfig = usingTracer(
3131
openTelemetry
3232
.tracerBuilder(OpenTelemetryDefaults.instrumentationScopeName)
3333
.setInstrumentationVersion(OpenTelemetryDefaults.instrumentationScopeVersion)
@@ -49,8 +49,8 @@ object OpenTelemetryTracingSyncConfig {
4949
responseAttributes: (GenericRequest[_, _], Response[_]) => Attributes =
5050
OpenTelemetryDefaults.responseAttributes _,
5151
errorAttributes: Throwable => Attributes = OpenTelemetryDefaults.errorAttributes _
52-
): OpenTelemetryTracingSyncConfig =
53-
OpenTelemetryTracingSyncConfig(
52+
): OpenTelemetryTracingConfig =
53+
OpenTelemetryTracingConfig(
5454
tracer,
5555
propagators,
5656
clock,

observability/opentelemetry-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryTracingSyncBackend.scala

-74
This file was deleted.
+3-3
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import io.opentelemetry.semconv.HttpAttributes
1313
import io.opentelemetry.semconv.ErrorAttributes
1414
import scala.collection.JavaConverters._
1515

16-
class OpenTelemetryTracingSyncBackendTest extends AnyFlatSpec with Matchers {
16+
class OpenTelemetryTracingBackendTest extends AnyFlatSpec with Matchers {
1717
it should "capture successful spans" in {
1818
// given
1919
val testExporter = InMemorySpanExporter.create()
2020
val tracerProvider = SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(testExporter)).build();
2121
val otel = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()
2222

2323
val stubBackend = SyncBackendStub.whenAnyRequest.thenRespondOk()
24-
val wrappedBackend = OpenTelemetryTracingSyncBackend(stubBackend, OpenTelemetryTracingSyncConfig(otel))
24+
val wrappedBackend = OpenTelemetryTracingBackend(stubBackend, OpenTelemetryTracingConfig(otel))
2525

2626
// when
2727
basicRequest.get(uri"http://test.com/foo").send(wrappedBackend)
@@ -46,7 +46,7 @@ class OpenTelemetryTracingSyncBackendTest extends AnyFlatSpec with Matchers {
4646
val otel = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()
4747

4848
val stubBackend = SyncBackendStub.whenAnyRequest.thenRespond(throw new RuntimeException("test"))
49-
val wrappedBackend = OpenTelemetryTracingSyncBackend(stubBackend, OpenTelemetryTracingSyncConfig(otel))
49+
val wrappedBackend = OpenTelemetryTracingBackend(stubBackend, OpenTelemetryTracingConfig(otel))
5050

5151
// when
5252
intercept[RuntimeException] {

0 commit comments

Comments
 (0)