Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafkareceiver] autocommit set false does not take effect when exporter failed #37136

Open
ChrisYe2015 opened this issue Jan 10, 2025 · 5 comments
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka

Comments

@ChrisYe2015
Copy link

Component(s)

receiver/kafka

What happened?

Description

I used kafka receiver and Jaeger exporter,and turned off autocommit in otelcol-config.yml.
When Jaeger is unavailable, I expect kafka messages not be consumed and the lag increase,but not working。
May I ask if my configuration is incorrect or if the current feature does not support Exporter failure and prevent submission of offset

Steps to Reproduce

1.Stop Jaeger
2.disable autocommit and restart otel-collector
2.Send message to kafka

Expected Result

Lag increase and continue to consume when Jaeger is ready

Actual Result

Lag = 0 and offset commited

Collector version

v0.116.0

Environment information

Environment

linux docker

OpenTelemetry Collector configuration

receivers:
  kafka/traces:
    brokers: ["kafka:9092"]
    topic: otlp_spans
    autocommit:
      enable: false
    message_marking: 
      after: true
      on_error: false

exporters:
  debug:
  #verbosity: detailed
  otlp:
    endpoint: "jaeger:8027"
    tls:
      insecure: true

service:
  telemetry:
    logs:
      level: debug
  pipelines:
    traces:
      receivers: [kafka/traces]
      processors: [batch]
      exporters: [otlp, debug]

Log output

2025-01-10T06:08:39.495Z        warn    grpc@v1.68.1/clientconn.go:1384 [core] [Channel #1 SubChannel #2]grpc: addrConn.createTransport failed to connect to {Addr: "xxx.xx.xx.xx:8027", ServerName: "xxx.xx.xx.xx:8027", }. Err: connection error: desc = "transport: Error while dialing: dial tcp xxx.xx.xx.xx:8027: connect: connection refused"     {"grpc_log": true}
2025-01-10T06:08:39.495Z        info    grpc@v1.68.1/clientconn.go:1204 [core] [Channel #1 SubChannel #2]Subchannel Connectivity change to TRANSIENT_FAILURE, last error: connection error: desc = "transport: Error while dialing: dial tcp xxx.xx.xx.xx:8027: connect: connection refused"       {"grpc_log": true}
2025-01-10T06:08:39.495Z        info    pickfirst/pickfirst.go:184      [pick-first-lb] [pick-first-lb 0xc001a13a70] Received SubConn state update: 0xc001a13b00, {ConnectivityState:TRANSIENT_FAILURE ConnectionError:connection error: desc = "transport: Error while dialing: dial tcp xxx.xx.xx.xx:8027: connect: connection refused" connectedAddress:{Addr: ServerName: Attributes:<nil> BalancerAttributes:<nil> Metadata:<nil>}}   {"grpc_log": true}
2025-01-10T06:08:50.430Z        debug   kafkareceiver@v0.116.0/kafka_receiver.go:550    Kafka message claimed   {"kind": "receiver", "name": "kafka/traces", "data_type": "traces", "value": ..."

Additional context

No response

@ChrisYe2015 ChrisYe2015 added bug Something isn't working needs triage New item requiring triage labels Jan 10, 2025
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@apolzek
Copy link
Contributor

apolzek commented Jan 10, 2025

@ChrisYe2015 could you show the details of how your messages are published in Kafka ? Since you haven’t set an encoding, I believe you are using the default, which is the otlp_proto. If you could help me reproduce the issue, I’m interested in your case. I believe it might be more related to the Kafka configurations themselves rather than the collector. For example, have you tried retention configurations like KAFKA_LOG_RETENTION_HOURS ?

@ChrisYe2015
Copy link
Author

ChrisYe2015 commented Jan 12, 2025

@apolzek Thank you for your prompt reply. For KAFKA_LOG_RETENTION_HOURS already set, and I use the following architecture to collect traces:
Opentelemetry SDK -> Opentelemetry Collector1(Unified collection)-> Kafka -> Opentelemetry Collector2(Unified processing)-> Jaeger. The configuration of Collector2 was provided earlier, and for Collector1 config:

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318
        cors:
          allowed_origins:
            - "http://*"
            - "https://*"
exporters:
  debug:
  #verbosity: detailed
  kafka:
    brokers:
      - xx.xx.xx.xx:8092
 
processors:
  batch:
 
service:
  telemetry:
    logs:
      level: info
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [kafka]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [kafka]
    logs:
      receivers: [otlp]
      processors: [batch]

kafka docker-compose.yml:

services:
  # Kafka Zookeeper
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_DATA_DIR: /var/lib/zookeeper/data
    ports:
      - "8081:2181"
    volumes:
      - ./kafka/config:/etc/kafka
      - ./kafka/zk_data:/var/lib/zookeeper/data
 
  # Kafka Broker
  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_LOG_RETENTION_HOURS: 168
    ports:
      - "8092:9092"
    volumes:
      - ./kafka/data:/var/lib/kafka/data 
      - ./kafka/config:/etc/kafka

No other additional config. Please help to check my settings, Or have any other suggestions, thanks!

@shivanshuraj1333
Copy link
Member

I'd be taking a look into this over the weekend.

@apolzek
Copy link
Contributor

apolzek commented Jan 25, 2025

@ChrisYe2015 I’m still analyzing the collector source code to confirm what I’m about to say, but regardless of whether autocommit is enabled or not, the messages are not lost. From what I understand, this configuration will only have an impact if the test involves a failure in the second OpenTelemetry Collector (the one consuming from Kafka). The scenario you expect, with an increase in lag in the Kafka consumer, didn’t happen in cases of failure with Jaeger. I believe this is for a couple of reasons. The first might be related to the offset control, where the collector knows up to which point the message has been successfully processed and exported. The second guess is that the collector has a buffer that holds these messages. I’m not an expert on the collector, but I’ll have more information soon. If anyone with experience comes by to contribute, that would be great; otherwise, please wait as I’m looking for evidence. My test is accessible here !!

It’s important to note that the lag only occurs when there is a very high volume of messages per second. It’s not directly related to a failure in the jaeger exporter, at least based on my tests 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka
Projects
None yet
Development

No branches or pull requests

3 participants