Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Consumer does not get records from poll after OFFSET_OUT_OF_RANGE in auto.offset.reset = earliest #352

Closed
jikeshdpatel opened this issue Jul 14, 2022 Discussed in #351 · 9 comments
Assignees
Labels
duplicate This issue or pull request already exists

Comments

@jikeshdpatel
Copy link

jikeshdpatel commented Jul 14, 2022

[maintainer note]:
May have been a symptom of #365

Discussed in #351

Originally posted by jikeshdpatel July 14, 2022
Hi @astubbs , this is a great library and improves the performance drastically. However, I am seeing some strange issue. I feel like it is because of the auto_offset_reset = earliest.

  • Populated events in topic
  • Started parallel consumer with auto_offset_reset = earliest and it started consuming from beginning.
  • Stopped the process let's say at offset 5000, at partition 3 and PC was behind 1000 events. Similarly stats for other 5 partitions.
  • Waited until the topic's retention period.
  • Produced more records. waited for retention period and then produced more records.
  • Started PC and it only consumes from a partition 5 (last partition). I am not sure whether PC was lagging behind on partition 5.
  • I have started a spring kafka listener (with auto_offset_reset = earliest) for partition 3 and it started consuming
  • Stopped spring kafka and started PC (with auto_offset_reset = earliest) and it now consumes from partitions 5 and 3
  • Stopped PC and started it with auto_offset_reset = latest and now it starts consuming from all the partitions.

The processing order was based on KEY.

Can you please advise on how should I further debug it ?

Thank you for your help!

Below are the config:

2022-07-14 17:17:32.036 INFO [main] [org.apache.kafka.coborker01mmon.config.AbstractConfig.logAll(AbstractConfig.java:372)] - ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [broker01:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = client-01
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = app-group-01
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-07-14 17:17:32.038 DEBUG [main] [org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:696)] - [Consumer clientId=client-01, groupId=app-group-01] Initializing the Kafka consumer
2022-07-14 17:17:32.135 INFO [main] [org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)] - Successfully logged in.
2022-07-14 17:17:32.141 DEBUG [main] [org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:264)] - Created SSL context with keystore null, truststore null, provider SunJSSE.
2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:119)] - Kafka version: 6.2.1-ccs
2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:120)] - Kafka commitId: fa4bec046a2df3a6
2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:121)] - Kafka startTimeMs: 1657833452190
2022-07-14 17:17:32.193 DEBUG [main] [org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:815)] - [Consumer clientId=client-01, groupId=app-group01] Kafka consumer initialized
2022-07-14 17:17:32.199 INFO [main] [io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:231)] - Confluent Parallel Consumer initialise... Options: ParallelConsumerOptions(consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6c42c856, producer=null, managedExecutorService=java:comp/DefaultManagedExecutorService, managedThreadFactory=java:comp/DefaultManagedThreadFactory, ordering=KEY, commitMode=PERIODIC_CONSUMER_ASYNCHRONOUS, maxConcurrency=100, defaultMessageRetryDelay=PT1S, retryDelayProvider=null, sendTimeout=PT10S, offsetCommitTimeout=PT10S, batchSize=1, thresholdForTimeSpendInQueueWarning=PT10S, maxFailureHistory=10)

Below are logs : OFFSET_OUT_OF_RANGE partition, it does reset the offset but does not get any records. However, there are records with the new offset value.

2022-07-14 17:17:39.407 DEBUG [pc-broker-ptest-topic-1oll] [org.apacborker01he.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:314)] - [Consumer clientId=client-01, groupId=app-group-01] Fetctest-topich READ_UNCOMMITTED at offset 842285 for partition test-topic-1 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransacticlient-01ons = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
2022-07-14 17:17:39.407 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1360)] - [Consumer clientId=client-01, groupId=app-group-01] Fetch position FetchPosition{offset=842285, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} is out of range for partition test-topic-1, resetting offset
2022-07-14 17:17:39.407 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendListOffsetRequest(Fetcher.java:983)] - [Consumer clientId=client-01, groupId=app-group-01] Sending ListOffsetRequest ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='test-topic', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=12, timestamp=-2, maxNumOffsets=1)])]) to broker borker01:9092 (id: 21 rack: 0)
2022-07-14 17:17:39.407 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=client-01, correlationId=25) and timeout 30000 to node 21: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='test-topic', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=12, timestamp=-2, maxNumOffsets=1)])])
2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:880)] - [Consumer clientId=client-01, groupId=app-group-01] Received LIST_OFFSETS response from node 21 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=client-01, correlationId=25): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='test-topic', partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=1197212, leaderEpoch=7)])])
2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:1035)] - [Consumer clientId=client-01, groupId=app-group-01] Handling ListOffsetResponse response for test-topic-1. Fetched offset 1197212, timestamp -1
2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.Metadata.updateLastSeenEpochIfNewer(Metadata.java:183)] - [Consumer clientId=client-01, groupId=app-group-01] Not replacing existing epoch 12 with new epoch 7 for partition test-topic-1
2022-07-14 17:17:39.451 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated(SubscriptionState.java:398)] - [Consumer clientId=client-01, groupId=app-group-01] Resetting offset for partition test-topic-1 to position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}}.
2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1198)] - [Consumer clientId=client-01, groupId=app-group-01] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} to node borker01:9092 (id: 21 rack: 0)
2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.FetchSessionHandler$Builder.build(FetchSessionHandler.java:259)] - [Consumer clientId=client-01, groupId=app-group-01] Built incremental fetch (sessionId=1279947631, epoch=1) for node 21. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:265)] - [Consumer clientId=client-01, groupId=app-group-01] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-topic-1), toForget=(), implied=()) to broker borker01:9092 (id: 21 rack: 0)
2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=client-01, correlationId=26) and timeout 30000 to node 21: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1279947631, sessionEpoch=1, topics=[FetchTopic(topic='test-topic', partitions=[FetchPartition(partition=1, currentLeaderEpoch=12, fetchOffset=1197212, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:55)] - Poll completed normally (after timeout of PT2S) and returned 0...
2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:185)] - Poll completed
2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:142)] - Got 0 records in poll result
2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:176)] - Subscriptions are paused: false
2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:182)] - Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running`</div>
@astubbs
Copy link
Contributor

astubbs commented Jul 15, 2022

Are your offsets in your consumer group topic being expired? the default retention for offsets used to be 24 hours. Once the offsets are gone, it should start from the beginning like there was never any offsets committed.

I'm not quite following what you're describing. Are you able to write this as an integration test? You can copy the latest version of tests using TransactionMarkersTest as a template.

@astubbs astubbs added the wait for info Waiting for additional info from user label Jul 15, 2022
@jikeshdpatel
Copy link
Author

jikeshdpatel commented Jul 15, 2022

Thanks @astubbs for your help. Really appreciate it! I am working on the integration tests and it might take time to reproduce thru tests. But in the meantime, the offsets in my consumer group for the topic are expired and I expect the same that it will start from earliest available new offset. I can see that log as below but it did not fetch any records. However when I start spring kafka using the same consumer group, I see the same logs as parallel consumer but it does fetch records with new offset returned by kafka.

2022-07-14 17:17:39.407 DEBUG [pc-broker-ptest-topic-1oll] [org.apacborker01he.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:314)] - [Consumer clientId=client-01, groupId=app-group-01] Fetctest-topich READ_UNCOMMITTED at offset **842285** for partition test-topic-1 returned fetch data (error=**OFFSET_OUT_OF_RANGE**, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransacticlient-01ons = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)

2022-07-14 17:17:39.451 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated(SubscriptionState.java:398)] - [Consumer clientId=client-01, groupId=app-group-01] **Resetting offset** for partition test-topic-1 to position FetchPosition{offset=**1197212**, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}}. 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1198)] - [Consumer clientId=client-01, groupId=app-group-01] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} to node borker01:9092 (id: 21 rack: 0) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.FetchSessionHandler$Builder.build(FetchSessionHandler.java:259)] - [Consumer clientId=client-01, groupId=app-group-01] Built incremental fetch (sessionId=1279947631, epoch=1) for node 21. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:265)] - [Consumer clientId=client-01, groupId=app-group-01] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-topic-1), toForget=(), implied=()) to broker borker01:9092 (id: 21 rack: 0) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=client-01, correlationId=26) and timeout 30000 to node 21: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1279947631, sessionEpoch=1, topics=[FetchTopic(topic='test-topic', partitions=[FetchPartition(partition=1, currentLeaderEpoch=12, fetchOffset=1197212, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:55)] - Poll completed normally (after timeout of PT2S) and **returned 0**... 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:185)] - Poll completed 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:142)] - Got 0 records in poll result 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:176)] - Subscriptions are paused: false 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:182)] - Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running

@jikeshdpatel
Copy link
Author

Hi @astubbs I have tested it further and it looks like the parallel consumer is getting records. The issue I see is that the current offset in confluent cloud > consumer lag for it's group does not move when the parallel consumer instance failed to commit the processed records due to the partition was reassigned. What could be the reason behind not moving the current offset in confluent cloud even if I see the records are being processed from such partition ?

@astubbs
Copy link
Contributor

astubbs commented Jul 20, 2022

Hi I suspect this is because PC doesn't know about "reset to earliest", so the underlying consumer re-downloads the records, but PC has already seen them so ignores them.

If the offset gets reset and the consumer starts from earliest again, do you want PC to also reprocess everything?

Was PC stopped, then offsets expired, then PC started again? Or did the offsets get expires while PC was running?

@jikeshdpatel
Copy link
Author

jikeshdpatel commented Jul 20, 2022

Hi @astubbs thanks for response. Yes, I want at least once processing behavior. If last committed offset is reset and system is down for 24hrs then it will reprocess everything from earliest available event (no deleted from topic) instead of process only latest events. Although this should rarely happen in prod but I still don't want to loose events using offset reset to latest.

When I looked at my logs again, it looks like when there are some processed offsets in local to be committed to broker but due to rebalance after scaling or some other reason it was not committed then next time PC somehow knows that it was processed even though the offset commit was failed and don't reprocess but then on it always commits the last uncommitted offset which is already moved on. I have attached the last commit failure from partitions 0 - 4 and that's the offsets I see on confluent cloud as current offset. (unable to upload confluent cloud screen print)

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:59:47.204Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-3 at offset 977994: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2.log:ERROR 2022-07-19T16:59:47.141Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-1 at offset 1269418: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2020220720115455104.log.gz:ERROR 2022-07-20T15:43:00.207Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2020220720115455104.log.gz:ERROR 2022-07-20T15:51:58.043Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-4 at offset 1072807: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2120220720120005253.log.gz:ERROR 2022-07-20T15:51:57.968Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2120220720120005253.log.gz:ERROR 2022-07-20T15:51:58.044Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-3.log:ERROR 2022-07-19T16:59:47.161Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1121707: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-4.log:ERROR 2022-07-19T16:59:55.376Z [kafka-coordinator-heartbeat-thread | fs-async-events-pc-auth5][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-4 at offset 1072807: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-5.log:ERROR 2022-07-19T16:59:47.061Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-0 at offset 1205613: The coordinator is not aware of this member.

cp-enricher-pc-60-earlist-0.log:ERROR 2022-07-19T20:03:53.771Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-1 at offset 1270220: The coordinator is not aware of this member.

Below are some additional logs. Please let me know if you want the committedMetadata hash.

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:50:56.605Z [pc-pool-3-thread-1][consumer1][partition=3][offset=977994]: Retriable exception to process message at 977994 . 

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:59:47.204Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition topic-name-3 at offset 977994: The coordinator is not aware of this member.

ERROR 2022-07-19T16:59:47.209Z [pc-broker-poll][BrokerPollSystem][][][][] : Unknown error
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1262) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1163) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1180) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1155) ~[kafka-clients-6.2.1-ccs.jar:?]

@astubbs
Copy link
Contributor

astubbs commented Jul 20, 2022

At least once? Are you saying PC has been skipping records?

@jikeshdpatel
Copy link
Author

Hi @astubbs actually sorry for confusion but I did not mean PC is skipping records. I meant if I use auto.offset.reset as latest instead of earliest. I guess that's not related to PC.

The issue I am seeing is once the PC is unable to commit the offset due to rebalancing or other reason then the current offset does not move. What I mean by does not move is I see that uncommitted offset in confluent cloud consumer dashboard. However that record is not being processed as duplicated.

@jikeshdpatel
Copy link
Author

jikeshdpatel commented Aug 1, 2022

Hi @astubbs just an update on this. This happens when the last committed offset is out of range. So out of range check during start up and reset the offsets before starting PC seems to solve this problem.

        Properties consumerProperties = populateConsumerProperties(pcKafkaProperties);
        consumerProperties.put(MAX_POLL_RECORDS_CONFIG, 1);
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);

        AtomicBoolean partitionsAssigned = new AtomicBoolean(false);
        try (kafkaConsumer) {
            kafkaConsumer.subscribe(List.of(pcKafkaProperties.getInputTopics().split(";")), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    logger.info("Partitions are revoked while resetting offset to earliest during start up {} ",
                            partitions);
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
                    Map<TopicPartition, OffsetAndMetadata> committedOffsets = kafkaConsumer.committed(new HashSet<>(partitions));
                    Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
                    kafkaConsumer.seekToBeginning(partitions);
                    for (TopicPartition partition: partitions) {
                        long nextOffset = kafkaConsumer.position(partition); // get the beginning offset of the partition
                        logger.info("Partition {} position remote call is made and got nextOffset {} ",
                                partition, nextOffset);
                        earliestOffsets.put(partition, nextOffset);
                        if (committedOffsets == null ||
                                committedOffsets.get(partition) == null ||
                                nextOffset > committedOffsets.get(partition).offset()) {
                            // earliest offset is ahead of last committed offset, it means committed offset needs to be reset
                            resetOffsets.put(partition, new OffsetAndMetadata(nextOffset));
                        }
                    }
                    if (!resetOffsets.isEmpty()) {
                        kafkaConsumer.commitSync(resetOffsets);
                        logger.warn("Offsets are reset to {} . This should not happen in prod, please evaluate" +
                                " apps downtime", resetOffsets);
                    } else {
                        logger.info("All good! No need to reset offsets {} ", resetOffsets);
                    }
                    logger.info("Earliest offsets {} ", earliestOffsets);
                    logger.info("Last committed offsets {} ", committedOffsets);

                    partitionsAssigned.set(true);
                }
            });

            while (!partitionsAssigned.get()) {
                kafkaConsumer.poll(Duration.ofMillis(1000L));
                logger.info("Partitions are not yet assigned during offset reset step");
            }
        } finally {
            kafkaConsumer.close();
        }
    }

@astubbs
Copy link
Contributor

astubbs commented Aug 18, 2022

I’m afraid this all might be been confused by issue #365 (Received duplicated records when rebalance occurs), which is fixed in release 0.5.2.1 (latest is 0.5.2.2).

What is the retention period of the input topic? OFFSET_OUT_OF_RANGE can be for an offset below what’s available too, not only above. OFFSET_OUT_OF_RANGE is a normal error.

The issue I see is that the current offset in confluent cloud > consumer lag

That statement doesn’t compile - consumer lag is the difference between the partition head offset, and the consumers position in the partition. You cannot compare those two things the way you are.

does not move when the parallel consumer instance failed to commit the processed records due to the partition was reassigned

That was the regression bug #365 :/ Sorry about that.

If last committed offset is reset and system is down for 24hrs then it will reprocess everything from earliest available event

Just to check - you know you can increase the offset expired time to whatever you want? It used to default to 24 hours, now it's 7 days, but a lot of people max it infinite and only delete them on request (in production).

And if you want this system, as per normal consumer setup, just set the policy as you have to earliest.

PC somehow knows that it was processed even though the offset commit was failed and don't reprocess

Ah yes, probably because an earlier offset commit’s metadata included the status of the completed records (in other words they were committed earlier in a metadata commit) - see https://github.com/confluentinc/parallel-consumer#offset_map for more information.

I meant if I use auto.offset.reset as latest instead of earliest. I guess that's not related to PC.

Yes, that’s unrelated, as above.

The issue I am seeing is once the PC is unable to commit the offset due to rebalancing or other reason then the current offset does not move. What I mean by does not move is I see that uncommitted offset in confluent cloud consumer dashboard. However that record is not being processed as duplicated.

Yeah its probably the issue as above, but as mentioned above, offsets may have been previously marked as completed in the metadata from older commits.

consumerProperties.put(MAX_POLL_RECORDS_CONFIG, 1);

Why do you set this to one?

code snippit

Do you have this in a repo i can see?

nextOffset > committedOffsets.get(partition).offset()) {
      // earliest offset is ahead of last committed offset, it means committed offset needs to be reset
      resetOffsets.put(partition, new OffsetAndMetadata(nextOffset));

This code seems odd, how is nextOffset the same as earliest? They are not.

Why do you think there would be a problem? Generally - position() returns the next record that would be returned by the broker, it is not the last committed offset.
It is totally expected that #positioncould return an offset higher than#comitted` immediately after assignment - if a partition is reassigned to the same consumer, position() might be higher than committed. Record compaction, record retention periods etc.
If this occurs, it does not need to be manually handled. simple the next commit will be correct.

Have a play with the latest version and let me know if you still see any issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duplicate This issue or pull request already exists
Projects
None yet
Development

No branches or pull requests

2 participants