-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add KafkaMessageQueueTrigger for enhanced message queue trigger usability on Kafka queue
#51718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add KafkaMessageQueueTrigger for enhanced message queue trigger usability on Kafka queue
#51718
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
8dba86d to
462cb79
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces the KafkaMessageQueueTrigger to provide a more specific interface for Kafka-based message queues in Airflow, along with a suite of tests, example DAGs, and documentation updates.
- Adds unit and system tests to verify the new trigger’s behavior.
- Implements KafkaMessageQueueTrigger and updates provider metadata, documentation, and version numbers accordingly.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/apache/kafka/tests/unit/apache/kafka/triggers/test_msg_queue.py | Adds tests to verify queue URI generation and trigger serialization/run behavior for KafkaMessageQueueTrigger. |
| providers/apache/kafka/tests/system/apache/kafka/example_dag_message_queue_trigger.py | Updates example DAG to reference the correct apply_function for MessageQueueTrigger usage. |
| providers/apache/kafka/tests/system/apache/kafka/example_dag_kafka_message_queue_trigger.py | Introduces an example DAG demonstrating usage of KafkaMessageQueueTrigger. |
| providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/msg_queue.py | Implements the KafkaMessageQueueTrigger class with Kafka-specific logic for queue URI generation and connection handling. |
| providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py | Updates provider info to include the new trigger module. |
| providers/apache/kafka/src/airflow/providers/apache/kafka/init.py | Bumps the version from 1.9.0 to 1.9.1. |
| providers/apache/kafka/pyproject.toml | Updates package version and documentation/changelog URLs. |
| providers/apache/kafka/provider.yaml | Lists the new version 1.9.1. |
| providers/apache/kafka/docs/triggers.rst | Adds documentation for KafkaMessageQueueTrigger. |
| providers/apache/kafka/docs/message-queues/index.rst | Updates message-queue docs to reference the new trigger. |
| providers/apache/kafka/docs/index.rst | Updates version information. |
| providers/apache/kafka/docs/changelog.rst | Adds changelog entries for 1.9.1. |
| providers/apache/kafka/README.rst | Updates version and documentation URLs. |
providers/apache/kafka/tests/unit/apache/kafka/triggers/test_msg_queue.py
Outdated
Show resolved
Hide resolved
providers/apache/kafka/tests/unit/apache/kafka/triggers/test_msg_queue.py
Outdated
Show resolved
Hide resolved
|
TBR |
d83536d to
03cebb3
Compare
vincbeck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/msg_queue.py
Outdated
Show resolved
Hide resolved
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/msg_queue.py
Outdated
Show resolved
Hide resolved
03cebb3 to
482387d
Compare
|
@vincbeck Thanks for the reviews 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
After your change, the usage is much more clear.
IMO, we should also update the KafkaMessageQueueProvider to reduce the inconsistency on both side.
We could reuse your get_kafka_queue_uri in KafkaMessageQueueProvider.trigger_kwargs as well.
@jason810496 TIA. ACK. I think we might still need to keep the feasibility for the direct access Perhaps should we open new PRs for it? |
ACK. I think we might still need to keep the feasibility for the direct access Sure, having a common module as utils is good and refactoring in followup PR is more suitable. |
ea7a1ec to
84332c1
Compare
84332c1 to
8e22018
Compare
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
…bility on Kafka queue (apache#51718) * WIP feat: add KafkaMessageQueueTrigger for enhanced Kafka message queue operations * fix: update provider.yaml, tests and lint * revert manually updated and bumped kafka provider version * add airflow version checking for tests * correct formatting in docs * make linting happy
Closes #515225.
Why
While using the
MessageQueueTriggerin Airflow 3.0+, the parameters in the common message queue trigger are not specific enough, leading to potential issues like unclear usage, difficulty in understanding, or unexpected behavior when using it on KafkaWhat
This PR introduces the
KafkaMessageQueueTriggerclass to specify the parameters for the message queue trigger to enhance the usability