Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple trace Id's generated from kafka consumer #2531

Open
kurvatch opened this issue Mar 8, 2021 · 17 comments
Open

Multiple trace Id's generated from kafka consumer #2531

kurvatch opened this issue Mar 8, 2021 · 17 comments
Labels
bug Something isn't working repro provided

Comments

@kurvatch
Copy link

kurvatch commented Mar 8, 2021

Describe the bug
We have deployed three services Service A, Service B and Service C. Service A interacts with Service B via API call and drops a message to Kafka Topic which is consumed by Service C. Service C consumes the record and calls external API call and inserts data into database. We observe multiple trace Id's generated for database and external API calls from kafka consumer instead of multiple span Id's.

Services Interaction: https://github.com/sunvuz/OpenTelemetry-poc/blob/30cdb74b4ac53bbba08fbc510e5900b3dc6bdf12/Kafka%20Consumer%20-New%20Traceid%20issue.PNG

Zipkin: Image with multiple trace Id's from consumer https://github.com/sunvuz/OpenTelemetry-poc/blob/30cdb74b4ac53bbba08fbc510e5900b3dc6bdf12/Kafka%20Consumer%20-New%20Traceid%20issue%20-Zipkin.PNG

Steps to reproduce
Sample Service's mimicking above behavior has been updated at : https://github.com/sunvuz/OpenTelemetry-poc

What did you expect to see?
Span Id for database and external API call Interaction instead of Trace Id's

What did you see instead?
New Trace Id's generated for database and external API interaction.

What version are you using?
v1.0.0 same is the behavior with 0.x.x versions

Environment
JDK 8+
OS : Linux

Additional context
Add any other context about the problem here.

@kurvatch kurvatch added the bug Something isn't working label Mar 8, 2021
@previousdeveloper
Copy link

We have also the same problem. We use spring boot Kafka consumer. When consumer listens to batch message every operation has its own trace ids instead of spanId. There is no problem when no batch message consumer. I think it's related to #1890

@sunvuz
Copy link

sunvuz commented Mar 10, 2021

@previousdeveloper
Tried the following options

  1. In springboot - application.yml ,setting spring.kafka.listener.type = single
  2. Setting org.apache.kafka.clients.consumer.ConsumerConfig , MAX_POLL_RECORDS_CONFIG = 1
    Still have the same issue .

Please share your configuration for "no batch message consumer"

@dquagebeur
Copy link

Hi,
I dont know if it is the same problem but it looks very similar. In our case, we are using kafka-connect.

First step, a connector produces a message, insert it into kafka. The message contains a "traceparent" header (displayed in akhq).

The second connector consumes the message to make external http requests.
The first http request contains one traceparent header, the second call contains two traceparent headers (not a second value, but to headers names traceparent with differents values), and so on ...

We have upgraded to version 1.0.0 and the problem is still there.

@previousdeveloper
Copy link

@anuraaga hey do you have any idea about this issue?

@anuraaga
Copy link
Contributor

@previousdeveloper I'm not an expert in Kafka so it will take a bit to dig in. But the main thing that comes to mind is currently, I don't think we have instrumentation of reactive streams with kafka client

https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients

which is used in the sample app

https://github.com/sunvuz/OpenTelemetry-poc/blob/master/consumeMicroService/src/main/java/com/sample/awsplayground/consumer/SampleConsumer.java#L114

I wouldn't be surprised if this breaks context propagation into the consumer, which would mean we need to implement support for it.

@trask
Copy link
Member

trask commented May 3, 2022

If you are still having this issue, can you try with the latest 1.13.1? there were a couple of related kafka instrumentation fixes that could resolve this issue

@m-kay
Copy link

m-kay commented May 16, 2022

I have tried version 1.13.1 of the javaagent and it still does not work with batch listeners.

Is there a possibility to manually add the links from all the messages in the batch to the auto generated spans?

@mateuszrzeszutek
Copy link
Member

Hey @m-kay ,
Are you using spring-kafka? What version?

@m-kay
Copy link

m-kay commented May 16, 2022

I'm using spring-kafka 2.8.5 and spring-cloud-stream-binder-kafka 3.2.2. Therefore my consumer is defined as function bean and not with a @KafkaListener annotated method. Does that may cause the problem?

@mateuszrzeszutek
Copy link
Member

Hmm, that's possible - we're only testing plain spring-kafka usage. Can you prepare a small repro app? That'd really help us pinpoint the issue.

@m-kay
Copy link

m-kay commented May 16, 2022

I have created a demo app with 2 consumers, one is consuming the messages in batch mode and the other one as single messages: https://github.com/m-kay/otel-demo-http-sink

When sending a message to the topic demo-input-topic with a header traceparent set, it is picked-up correctly by the single message consumer but not by the batch consumer.

@m-kay
Copy link

m-kay commented May 16, 2022

I also added a batch listener using the annotation @KafkaListener but that doesn't work either 🤔

@m-kay
Copy link

m-kay commented May 30, 2022

Is there something I could do with manual instrumentation to get around this issue? Maybe add all the trace parents from the batch as links to the current span which is generated from auto instrumentation?

@m-kay
Copy link

m-kay commented Mar 6, 2023

@mateuszrzeszutek did you already have a chance to look into this?

@mateuszrzeszutek
Copy link
Member

Hey @m-kay ,
Sorry, I haven't had the time to take a look at this.

@jbaris
Copy link

jbaris commented Apr 28, 2023

Do you have any progress on this? We have also the same issue...

@jbaris
Copy link

jbaris commented Apr 29, 2023

I think the issue is at instrumentation/kafka/kafka-clients/kafka-clients-0.11. There is an instrumentation called ConsumerRecordsInstrumentation. It has an inner class IteratorAdvice that is applied to org.apache.kafka.clients.consumer.ConsumerRecords#iterator() method. Basically, the advice wraps the result of the iterator() method using a TracingIterator object. That object, has two main overrides:

  • next(): it closes the current scope and ends the current span, and then creates and start a new one.
  • hasNext(): it closes the current scope and ends the current span.

In our case, we are using Camel as an abstraction of Kafka, but the issue can apply to others libraries/frameworks. Camel processes the ConsumerRecords at KafkaRecordProcessorFacade#processPolledRecords(). The root cause of the issue is hasNext() is called twice:

while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
   ConsumerRecord<Object, Object> record = recordIterator.next();
   lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, record);
   ...
}

So, the process span (and the context) is closed before the expected, and a new traceId is created for the following spans.

UPDATE:
I did this ugly change, but works as expected for me
jbaris@77fd797

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working repro provided
Projects
None yet
Development

No branches or pull requests

9 participants