-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fix max_messages warning of Kafka ConsumeFromTopicOperator #48646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix max_messages warning of Kafka ConsumeFromTopicOperator #48646
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
jason810496
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
IMHO, instead of removing the warning, we should fix this part
airflow/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py
Lines 100 to 107 in d517c63
| self.max_messages = max_messages or True | |
| self.max_batch_size = max_batch_size | |
| self.poll_timeout = poll_timeout | |
| if self.max_messages is True: | |
| self.read_to_end = True | |
| else: | |
| self.read_to_end = False |
by making the self.max_messages attribute still int or None instead of ambiguous with bool ( also have to check the further usage of self.max_messages logic and adopt with this type change if needed ).
95d35ac to
95896f1
Compare
|
@jason810496 Sounds ok to me if we want to go that way. I now changed the logic of |
6b5042a to
0d67aaa
Compare
0d67aaa to
32356dd
Compare
jason810496
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change!
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
The current logic of ConsumeFromTopicOperator is that
:param max_messages: defaults to None implying read to the end of the topic.But if the
max_messages=None, we also have very misleading warning log e.g.max_batch_size (1000) > max_messages (True). Setting max_messages to 1000This warning log contradicts with the current logic and in fact, we also don't set the
max_messagestomax_batch_sizeeither.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.