Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

InOrderConsumer doesn't work as intended. #913

Merged

Conversation

AdrielVelazquez
Copy link
Contributor

@AdrielVelazquez AdrielVelazquez commented May 7, 2024

💸 TL;DR + Details

The InOrderConsumer Kafka Class makes a lot of assumptions that are False and inefficient with how Kafka Processes Events.

The current logic of using only 1 consumer and commit back the offsets doesn't take into consideration that the upstream messages are sending messages randomly to each partition.

You would need to enable transactions upstream for the producer, as well as delivering messages with a Key. This would guarantee two things.

  1. All messages with a given key are guaranteed to land in a single partition.

https://docs.confluent.io/platform/current/clients/producer.html#concepts

The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition.

If the key is provided, the partitioner will hash the key with murmur2 algorithm and divide it by the number of partitions. The result is that the same key is always assigned to the same partition.

  1. Consumers can switch to reading committed transactions instead of uncommitted transactions.

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#isolation-level

📜 Details

So what is this PR doing so differently?

Technically, this keeps the same exact APIs in place, but only solves 1 problem. Stop committing back to Kafka so frequently.

Instead of manually committing every offset which swarms Kafka with requests. Store the offset locally and commit the last max seen offset per partition back to Kafka in the auto.commit frequency.

This keeps the behavior that you're committing every offset, but only delivering it every couple of seconds.

One More Detail

My lilnter went a little crazy when committing this. I can revert that.

@AdrielVelazquez AdrielVelazquez requested a review from a team as a code owner May 7, 2024 23:39
@AdrielVelazquez AdrielVelazquez requested a review from chriskuehl May 7, 2024 23:39
Copy link
Member

@chriskuehl chriskuehl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks reasonable to me, but I'm not very familiar with Kafka so I can't really provide insight into the decision itself. Could you nominate an additional reviewer (maybe someone from your team) who can double-check that the change looks right before we merge it?

Copy link
Contributor

@tomncooper tomncooper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linting changes do make it harder to see what has logically changed. I am assuming the main changes are lines: 476, 477 and 496.

The only comments I have are a few language nits on the doc string and a question over the value of auto.commit.interval.ms. If the consumer crashes then there is a now a chance of duplicate message reads. The worst case being crashing just before the end of an interval in which case you would replay auto.commit.interval.ms worth of messages. Which, for what it is worth, is pretty standard practice for Kafka consumers.

Do we want to surface that, or is auto.commit.interval.ms already set to a low value? Do we want to allow users to tune that?

Copy link

@skykistler skykistler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one clarification to the comment

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <code@tomcooper.dev>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <code@tomcooper.dev>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <code@tomcooper.dev>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Sky Kistler <skyler.kistler@gmail.com>

Linting errors

Formatting?

Black formatting

Black formatting

Black formatting

Linting

Black linting again

Another attempt
@AdrielVelazquez AdrielVelazquez force-pushed the DPWF-1052_inorder_inefficient branch from a47e209 to b1c8e96 Compare May 10, 2024 20:29
@AdrielVelazquez
Copy link
Contributor Author

@tomncooper and @skykistler This is good to review again.

I mentioned the config and the default in the docstring, but I would rather not be explicit on the value.

@AdrielVelazquez
Copy link
Contributor Author

@chriskuehl Sorry should have tagged you also. But I cleaned up all the linting formatting my setup had.

@chriskuehl chriskuehl merged commit 0b6e40d into reddit:develop May 20, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants