Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kotlin Coroutine Flow not tracked correctly if it emits more that 1 item and takes time #8812

Closed
YuliaMittova opened this issue Jun 27, 2023 · 5 comments · Fixed by #8870
Closed
Labels
bug Something isn't working repro provided

Comments

@YuliaMittova
Copy link

Describe the bug
This bug was originally created here: open-telemetry/opentelemetry-java#5543
Sample project is here: https://github.com/YuliaMittova/opentelemetry_issue_5543

Problem: If the function returns Flow containing more than one item, and this function takes time (like a typical DB operation), all spans will be broken: context propagation and measurement.
If you check the project above and try to fetch one student (there will be one item in the Flow without any delays), then the report will look correct:
with_flow_only_one_item_emitted

But if you try to fetch all students (in this case, Flow will have several items with delays between them. Please check function StudentEntityRepositoryImpl.findAll for code), the report will be broken: spans will have wrong parents and not measured correctly:
with_flow_many_items

Steps to reproduce
Create a project which will use coroutines Flow, and this flow will return more than one item with delays. The sample project is here: https://github.com/YuliaMittova/opentelemetry_issue_5543

What did you expect to see?
All spans have correct parents and are measured correctly (parents are not finishing before the children). Please check the picture below:
with_flow_only_one_item_emitted

What did you see instead?
The report is broken: spans have incorrect parent, not measure correctly:
with_flow_many_items_with_arrows

What version are you using?

implementation("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:1.24.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.27.0")
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("org.springframework.boot:spring-boot-starter-data-jdbc") // Needed for Flyway
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springdoc:springdoc-openapi-webflux-ui:1.6.15")
implementation("com.expediagroup:graphql-kotlin-spring-server:6.4.0")
implementation("com.expediagroup:graphql-kotlin-schema-generator:6.4.0")
implementation("com.graphql-java:graphql-java-extended-scalars:20.0")
implementation("org.postgresql:r2dbc-postgresql")
implementation("org.springframework.boot:spring-boot-starter-security")
developmentOnly("org.springframework.boot:spring-boot-devtools")
implementation("com.apollographql.apollo3:apollo-runtime:3.8.0")

Environment
Docker, locally

Additional context
Looks like not only I have this problem. I guess this issue might be related: #8659

@laurit
Copy link
Contributor

laurit commented Jun 27, 2023

@YuliaMittova @WithSpan annotation does not work with kotlin coroutines you'll have to instrument these methods manually.

@yuliamitttovahuspy
Copy link

@laurit , I tried the manual approach and mentioned it in this issue: https://github.com/open-telemetry/opentelemetry-java/issues/5543
For me working code looks like this:

suspend fun <Result> withFixedSpan(
    name: String,
    parameters: (SpanBuilder.() -> Unit)? = null,
    parentSpan: Span? = null,
    block: suspend (span: Span?) -> Result,
): Result {
    val tracer = GlobalOpenTelemetry.getTracer("com.example", "0.0.0")
    val parentSpanToSet = parentSpan ?: Span.current()
    val childSpan: Span = tracer.spanBuilder(name).run {
        if (parameters != null)
            parameters()
        coroutineContext[CoroutineName]?.let {
            setAttribute("coroutine.name", it.name)
        }
        setParent(Context.current().with(parentSpanToSet))
        startSpan()
    }

    return withContext(parentSpanToSet.asContextElement()) {
        try {
            childSpan.makeCurrent().use { scope -> block(childSpan) }
        } catch (throwable: Throwable) {
            childSpan.setStatus(StatusCode.ERROR)
            childSpan.recordException(throwable)
            throw throwable
        } finally {
            childSpan.end()
        }
    }
}

and to use it you need to wrap every function call you want to be tracked the following way:

        return withFixedSpan(
            name = "SampleEntityRepositoryImpl.findAllBy",
        ) {
            r2dbcEntityTemplate.select(SampleEntity::class.java)
                .matching(filter.toQuery(sorting))
                .flow().toList()
        }

This approach needs lots of changes and makes code harder to read.
Are you planning to fix this bug?

@yuliamitttovahuspy
Copy link

@mateuszrzeszutek @laurit I have noticed that the PR above is not reviewed yet. What is the average time from the PR opened to merged? Do you think I can help somehow with it?

@mateuszrzeszutek
Copy link
Member

That one particular PR is just super extra complex -- I'm planning to review it myself, but it might take me a while because it contains some really crazy arcane bytecode wizardry 🧙

@yuliamitttovahuspy
Copy link

Thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working repro provided
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants