-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add validation for commit_cadence in Kafka ConsumeFromTopicOperator #52015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add validation for commit_cadence in Kafka ConsumeFromTopicOperator #52015
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds validation for the commit_cadence parameter in the Kafka consumer operator and ensures that the Kafka connection configuration adheres to the expected auto-commit settings.
- Added a private _validate_commit_cadence method in the operator code that validates commit_cadence and logs warnings when necessary.
- Updated tests to cover various commit_cadence and enable.auto.commit configuration combinations.
- Updated documentation to emphasize the need to set enable.auto.commit to false when commit_cadence is used.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| providers/apache/kafka/tests/unit/apache/kafka/operators/test_consume.py | Added parameterized tests to validate commit_cadence configuration and warning behavior. |
| providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py | Introduced _validate_commit_cadence method and refactored validation logic for commit_cadence and enable.auto.commit. |
| providers/apache/kafka/docs/operators/index.rst | Documented the required Kafka connection configuration when using commit_cadence. |
Comments suppressed due to low confidence (1)
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py:207
- Consider adding an inline comment explaining why the 'never' commit_cadence is converted to None, clarifying the intended behavior for future maintainers.
if self.commit_cadence == "never":
c806005 to
3f82e50
Compare
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py
Show resolved
Hide resolved
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test coverage! LGTM +1 (few nits)
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py
Show resolved
Hide resolved
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py
Show resolved
Hide resolved
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py
Outdated
Show resolved
Hide resolved
77864c0 to
9fcee63
Compare
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
@eladkal ?
- modularize mock consumer - add test_commit_cadence_behavior
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
9fcee63 to
deed801
Compare
|
Just rebased and resolved conflict. |
closes: #34213
Why
Based on the discussion in the issue
commit_cadence="never", "end_of_batch", "end_of_operator"enable.auto.commitoption in Kafka Connection tofalseenable.auto.commitwould be on by default, and the consumer will auto commit the offset every 5 seconds.What
commit_cadencewithenable.auto.commitoption with corresponding Kafka Connectioncommit_cadenceis set butenable.auto.commitis notfalse