Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUM-6235 Handle sse request #2270

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions detekt_custom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,7 @@ datadog:
- "okhttp3.Response.code()"
- "okhttp3.Response.header(kotlin.String, kotlin.String?)"
- "okhttp3.ResponseBody.contentLength()"
- "okhttp3.ResponseBody.contentType()"
- "okio.Buffer.constructor()"
# endregion
# region org.json
Expand Down
15 changes: 11 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ timber = "5.0.1"
coroutines = "1.4.2"

# Local Server
ktor = "1.6.0"
ktor = "1.6.8"
ktorServer = "3.0.0-rc-1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a concern if we are using a rc here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lib is only used for the sample app, so it's ok


# Otel
jctools = "3.3.0"
Expand Down Expand Up @@ -233,8 +234,10 @@ coroutinesCore = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", ver

# Local Server
ktorCore = { module = "io.ktor:ktor", version.ref = "ktor" }
ktorNetty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
ktorGson = { module = "io.ktor:ktor-gson", version.ref = "ktor" }
ktorServerCore = { module = "io.ktor:ktor-server-core", version.ref = "ktorServer" }
ktorServerNetty = { module = "io.ktor:ktor-server-netty", version.ref = "ktorServer" }
ktorServerSSE = { module = "io.ktor:ktor-server-sse", version.ref = "ktorServer" }

# Otel
jctools = { module = "org.jctools:jctools-core", version.ref = "jctools" }
Expand Down Expand Up @@ -306,8 +309,12 @@ glide = [

ktor = [
"ktorCore",
"ktorNetty",
"ktorGson"
"ktorGson",
]
ktorServer = [
"ktorServerCore",
"ktorServerNetty",
"ktorServerSSE"
]

traceCore = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,21 @@ internal constructor(

private fun getBodyLength(response: Response, internalLogger: InternalLogger): Long? {
return try {
val body = response.body ?: return null
val body = response.body
val contentType = body?.contentType()?.let {
// manually rebuild the mimetype as `toString()` can also include the charsets
it.type + "/" + it.subtype
}
val isStream = contentType in STREAM_CONTENT_TYPES
val isWebSocket = !response.header(WEBSOCKET_ACCEPT_HEADER, null).isNullOrBlank()
if (body == null || isStream || isWebSocket) {
return null
}
// if there is a Content-Length available, we can read it directly
// however, OkHttp will drop Content-Length header if transparent compression is
// used (since the value reported cannot be applied to decompressed body), so to be
// able to still read it, we force decompression by calling peekBody
body.contentLengthOrNull() ?: response.peekBody(MAX_BODY_PEEK)
.contentLengthOrNull()
body.contentLengthOrNull() ?: response.peekBody(MAX_BODY_PEEK).contentLengthOrNull()
} catch (e: IOException) {
internalLogger.log(
InternalLogger.Level.ERROR,
Expand Down Expand Up @@ -481,6 +489,15 @@ internal constructor(

internal companion object {

internal val STREAM_CONTENT_TYPES = setOf(
"text/event-stream",
"application/grpc",
"application/grpc+proto",
"application/grpc+json"
)

internal const val WEBSOCKET_ACCEPT_HEADER = "Sec-WebSocket-Accept"

internal const val WARN_RUM_DISABLED =
"You set up a DatadogInterceptor for %s, but RUM features are disabled. " +
"Make sure you initialized the Datadog SDK with a valid Application Id, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import fr.xgouchet.elmyr.junit5.ForgeConfiguration
import fr.xgouchet.elmyr.junit5.ForgeExtension
import io.opentracing.Tracer
import okhttp3.MediaType
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.Protocol
import okhttp3.Response
import okhttp3.ResponseBody
Expand Down Expand Up @@ -221,6 +222,88 @@ internal class DatadogInterceptorTest : TracingInterceptorNotSendingSpanTest() {
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful streaming request}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
forge: Forge
) {
// Given
val mimeType = forge.anElementFrom(DatadogInterceptor.STREAM_CONTENT_TYPES)
fakeMediaType = mimeType.toMediaType()
stubChain(mockChain, statusCode)
val expectedStartAttrs = emptyMap<String, Any?>()
val expectedStopAttrs = mapOf(
RumAttributes.TRACE_ID to fakeTraceIdAsString,
RumAttributes.SPAN_ID to fakeSpanId,
RumAttributes.RULE_PSR to fakeTracingSampleRate
) + fakeAttributes
val kind = RumResourceKind.NATIVE

// When
testedInterceptor.intercept(mockChain)

// Then
inOrder(rumMonitor.mockInstance) {
argumentCaptor<ResourceId> {
verify(rumMonitor.mockInstance).startResource(
capture(),
eq(fakeMethod),
eq(fakeUrl),
eq(expectedStartAttrs)
)
verify(rumMonitor.mockInstance).stopResource(
capture(),
eq(statusCode),
eq(null),
eq(kind),
eq(expectedStopAttrs)
)
assertThat(firstValue).isEqualTo(secondValue)
}
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful websocket request}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
@StringForgery websocketHash: String
) {
// Given
stubChain(mockChain, statusCode) {
header("Sec-WebSocket-Accept", websocketHash)
}
val expectedStartAttrs = emptyMap<String, Any?>()
val expectedStopAttrs = mapOf(
RumAttributes.TRACE_ID to fakeTraceIdAsString,
RumAttributes.SPAN_ID to fakeSpanId,
RumAttributes.RULE_PSR to fakeTracingSampleRate
) + fakeAttributes
val kind = RumResourceKind.NATIVE

// When
testedInterceptor.intercept(mockChain)

// Then
inOrder(rumMonitor.mockInstance) {
argumentCaptor<ResourceId> {
verify(rumMonitor.mockInstance).startResource(
capture(),
eq(fakeMethod),
eq(fakeUrl),
eq(expectedStartAttrs)
)
verify(rumMonitor.mockInstance).stopResource(
capture(),
eq(statusCode),
eq(null),
eq(kind),
eq(expectedStopAttrs)
)
assertThat(firstValue).isEqualTo(secondValue)
}
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful request, unknown method}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,8 +1346,12 @@ internal open class TracingInterceptorNotSendingSpanTest {

// region Internal

internal fun stubChain(chain: Interceptor.Chain, statusCode: Int) {
return stubChain(chain) { forgeResponse(statusCode) }
internal fun stubChain(
chain: Interceptor.Chain,
statusCode: Int,
responseBuilder: Response.Builder.() -> Unit = {}
) {
return stubChain(chain) { forgeResponse(statusCode, responseBuilder) }
}

internal fun stubChain(
Expand Down Expand Up @@ -1400,7 +1404,7 @@ internal open class TracingInterceptorNotSendingSpanTest {
return builder.build()
}

private fun forgeResponse(statusCode: Int): Response {
private fun forgeResponse(statusCode: Int, additionalConfig: Response.Builder.() -> Unit = {}): Response {
val builder = Response.Builder()
.request(fakeRequest)
.protocol(Protocol.HTTP_2)
Expand All @@ -1410,6 +1414,7 @@ internal open class TracingInterceptorNotSendingSpanTest {
if (fakeMediaType != null) {
builder.header(TracingInterceptor.HEADER_CT, fakeMediaType?.type.orEmpty())
}
builder.additionalConfig()
return builder.build()
}

Expand Down
1 change: 1 addition & 0 deletions sample/kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ dependencies {
implementation("com.squareup.retrofit2:converter-gson:2.9.0")
implementation(libs.okHttp)
implementation(libs.gson)
implementation("com.launchdarkly:okhttp-eventsource:2.5.0")

// Misc
implementation(libs.timber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal class TracesFragment : Fragment(), View.OnClickListener {
rootView.findViewById<Button>(R.id.start_coroutine_operation).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_request).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_404_request).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_sse_request).setOnClickListener(this)
progressBarAsync = rootView.findViewById(R.id.spinner_async)
progressBarCoroutine = rootView.findViewById(R.id.spinner_coroutine)
progressBarRequest = rootView.findViewById(R.id.spinner_request)
Expand Down Expand Up @@ -73,6 +74,7 @@ internal class TracesFragment : Fragment(), View.OnClickListener {

// region View.OnClickListener

@Suppress("LongMethod")
override fun onClick(v: View?) {
when (v?.id) {
R.id.start_async_operation -> {
Expand Down Expand Up @@ -123,6 +125,17 @@ internal class TracesFragment : Fragment(), View.OnClickListener {
}
)
}
R.id.start_sse_request -> {
setInProgress()
viewModel.startSseRequest(
onResponse = {
setCompleteStatus(R.drawable.ic_check_circle_green_24dp)
},
onException = {
setCompleteStatus(R.drawable.ic_error_red_24dp)
}
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import com.datadog.android.trace.coroutines.launchTraced
import com.datadog.android.trace.coroutines.withContextTraced
import com.datadog.android.trace.withinSpan
import com.datadog.android.vendor.sample.LocalServer
import com.launchdarkly.eventsource.EventHandler
import com.launchdarkly.eventsource.EventSource
import com.launchdarkly.eventsource.MessageEvent
import io.opentracing.Span
import io.opentracing.log.Fields
import io.opentracing.util.GlobalTracer
Expand All @@ -33,10 +36,12 @@ import kotlinx.coroutines.flow.map
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import java.net.URI
import java.time.Duration
import java.util.Locale
import java.util.Random

@Suppress("DEPRECATION")
@Suppress("DEPRECATION", "StringLiteralDuplication", "TooManyFunctions")
internal class TracesViewModel(
private val okHttpClient: OkHttpClient,
private val localServer: LocalServer
Expand Down Expand Up @@ -80,7 +85,7 @@ internal class TracesViewModel(
onException: (Throwable) -> Unit,
onCancel: () -> Unit
) {
networkRequestTask = RequestTask(
networkRequestTask = GetRequestTask(
localServer.getUrl(),
okHttpClient,
onResponse,
Expand All @@ -95,7 +100,7 @@ internal class TracesViewModel(
onException: (Throwable) -> Unit,
onCancel: () -> Unit
) {
networkRequestTask = RequestTask(
networkRequestTask = GetRequestTask(
"https://www.datadoghq.com/notfound",
okHttpClient,
onResponse,
Expand All @@ -105,6 +110,19 @@ internal class TracesViewModel(
networkRequestTask?.execute()
}

fun startSseRequest(
onResponse: () -> Unit,
onException: (Throwable) -> Unit
) {
networkRequestTask = SSERequestTask(
localServer.sseUrl(),
okHttpClient,
onResponse,
onException
)
networkRequestTask?.execute()
}

fun stopAsyncOperations() {
asyncOperationTask?.cancel(true)
networkRequestTask?.cancel(true)
Expand Down Expand Up @@ -184,9 +202,9 @@ internal class TracesViewModel(

// endregion

// region RequestTask
// region GetRequestTask

private class RequestTask(
private class GetRequestTask(
private val url: String,
private val okHttpClient: OkHttpClient,
private val onResponse: (Response) -> Unit,
Expand Down Expand Up @@ -259,6 +277,64 @@ internal class TracesViewModel(

// endregion

// region SSERequestTask

private class SSERequestTask(
private val url: String,
private val okHttpClient: OkHttpClient,
private val onResponse: () -> Unit,
private val onException: (Throwable) -> Unit
) : AsyncTask<Unit, Unit, Result>(), EventHandler {
private var currentActiveMainSpan: Span? = null

@Deprecated("Deprecated in Java")
override fun onPreExecute() {
super.onPreExecute()
currentActiveMainSpan = GlobalTracer.get().activeSpan()
}

@Deprecated("Deprecated in Java")
@Suppress("TooGenericExceptionCaught", "LogNotTimber", "MagicNumber")
override fun doInBackground(vararg params: Unit?): Result {
return try {
val eventSourceSse = EventSource.Builder(this, URI.create(url))
.client(okHttpClient)
.connectTimeout(Duration.ofSeconds(3))
.backoffResetThreshold(Duration.ofSeconds(3))
.build()

eventSourceSse?.start()
Result.Success("")
} catch (e: Exception) {
Log.e("Response", "Error", e)
Result.Failure(throwable = e)
}
}

override fun onOpen() {
Log.i("SSE", "onOpen")
}

override fun onError(e: Throwable?) {
Log.e("SSE", "onError", e)
e?.let { onException(it) }
}

override fun onComment(comment: String?) {
Log.i("SSE", "onComment: $comment")
}

override fun onMessage(message: String?, event: MessageEvent?) {
Log.i("SSE", "onMessage: $message | $event")
}

override fun onClosed() {
onResponse()
}
}

// endregion

// region AsyncOperationTask

private class AsyncOperationTask(
Expand Down
Loading