Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer: proper parallelism for partition processing #814

Merged
merged 1 commit into from
May 18, 2024

Conversation

FZambia
Copy link
Member

@FZambia FZambia commented May 18, 2024

Proposed changes

Pause processing partition to not block poll for a long time and not poll partition while it's being processed. This improves parallelism of partition processing and avoids theoretical stack of Kafka polling when parititon is processing message for a long time.

@FZambia FZambia merged commit 03cdba5 into master May 18, 2024
4 checks passed
@FZambia FZambia deleted the parallel_partition_processing branch May 18, 2024 17:25
Comment on lines +64 to +68
partitioner := kgo.BasicConsistentPartitioner(func(topic string) func(*kgo.Record, int) int {
return func(r *kgo.Record, n int) int {
return int(r.Partition)
}
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just the kgo.ManualPartitioner()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks @twmb - fixing in #823

What do you think about the approach here BTW? We originally implemented consumer based on the example in franz-go repo, but without channel buffer for the partition consumer (i.e. recs channel). This quickly resulted into poll blocked when partition processing was long, and consumer excluded from group by Kafka due to poll timeout. franz-go uses buffered channel for a partition consumer, but it seems it has the same issue - just with less probability, as soon as channel buffer (5 in the example) is full - poll will block. I believe the approach here fully solves these concerns. WDYT?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it works. It may cause more bandwidth to be used, but there isn't a way to really avoid that.

There may be data that is already buffered, or a fetch already in progress, for partitions you are pausing. The data is just dropped when being polled. Once you resume the fetch partition, the partition is re-fetched at the prior offset.

It may be worth having a buffer of 1 or 5 on the channel, to allow for the chance that processing finishes before you need to pause.

Another thing to do may be some select/default case before the existing select.

select {
<-done:
<-quit:
processor <- records:
default:
}

cl.Pause...

select {
<-done:
<-quit:
processor <- records:
}

The first select default would allow you to avoid pausing if things are keeping up, then you only pause if needed when things are backed up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, yeah - looks like using a buffer makes sense, addressing in #829

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants