Skip to content

Conversation

@prdai
Copy link
Contributor

@prdai prdai commented Jul 15, 2025

Add Azure Service Bus Queue and Subscription triggers

This PR adds two new triggers for monitoring Azure Service Bus:

  • AzureServiceBusQueueTrigger - monitors Service Bus queues for messages
  • AzureServiceBusSubscriptionTrigger - monitors topic subscriptions for messages

These triggers enable event-driven DAG execution when messages arrive in Azure Service Bus, supporting async message processing workflows.

Changes

  • New trigger classes with async message processing
  • Base trigger class for shared functionality
  • Documentation with usage examples
  • Comprehensive test coverage

Testing

  • Unit tests for both trigger classes
  • Integration scenarios and edge cases
  • Async test patterns with proper mocking

related: AIP-82 (#52712)

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the PR.

Would it be simpler if the BaseAzureServiceBusTrigger just raised the TriggerEvent with the service_bus_client directly?
I understand the idea behind using asyncio.Queue here, but IMO, we don't need to be constrained by the current methods available in MessageHook or use an asyncio.Queue just to accommodate the MessageHook interface.

@prdai
Copy link
Contributor Author

prdai commented Jul 27, 2025

Hi, Sorry for the Delayed Response.

Would it be simpler if the BaseAzureServiceBusTrigger just raised the TriggerEvent with the service_bus_client directly?

it would be simpler the only reason we have the base class is so that we can handle both the queue's and subscriptions and since that their hooks are different.

I understand the idea behind using asyncio.Queue here, but IMO, we don't need to be constrained by the current methods available in MessageHook or use an asyncio.Queue just to accommodate the MessageHook interface.

what is the best way you would recommend? I built it around the hooks so that it is easier but it is a bit more coupled together

@jason810496
Copy link
Member

what is the best way you would recommend? I built it around the hooks so that it is easier but it is a bit more coupled together

IMHO, add a new method for just receive message without execution callback can make the trigger more simpler.
WDYT?

  • First, please add a new read_message method (or a better name if you have one) that allows reading a message without the ability to execute a callback.
  • Second, the trigger can then simply raise the message, similar to how it is handled in the Redis implementation.

async def run(self):
hook = RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub()
hook.subscribe(self.channels)
async_get_message = sync_to_async(hook.get_message)
while True:
message = await async_get_message()
if message and message["type"] == "message":
yield TriggerEvent(message)
break
await asyncio.sleep(self.poll_interval)

@prdai
Copy link
Contributor Author

prdai commented Aug 8, 2025

hi, thanks for the recommendation.

i looked into the read_message method you had mentioned (similar to the redis implementation), the only issue is that the Azure Service Bus hook handle's providing the messages it receives much differently as far as I understand to redis, where redis return the specific message with the hook.get_message, ASB always goes for the callback method and does not really return anything else (the only way we can make it work similar to the redis implementation is by updating the hook but it seems out of scope for this). I get the concern of the coupling with the MessageHook interfaces but given its implementation its really hard to decouple anything that we build using it.

How should I proceed?

thanks :)

@jason810496
Copy link
Member

Sorry for the late reply.

ASB always goes for the callback method and does not really return anything else (the only way we can make it work similar to the redis implementation is by updating the hook but it seems out of scope for this)

IMO, this is what I mention "no need to be restricted with existed implementation of BaseAzureServiceBusHook". Adding a new method that just raise TriggerEvent without callback will be much more simpler and align behavior of other providers.

I get the concern of the coupling with the MessageHook interfaces but given its implementation its really hard to decouple anything that we build using it.

Yes, so this is why I suggest to add a new method ( maybe named as read_message ) to decouple the "callback" and get rid of asyncio.Queue.

WDYT?

@prdai
Copy link
Contributor Author

prdai commented Aug 9, 2025

hi, thanks for the clarification, i had misunderstood what you meant by the read_message method :)

@jason810496
Copy link
Member

hi, thanks for the clarification, i had misunderstood what you meant by the read_message method :)

Oh, sorry I should have described it more clearly.
Please feel free to raise any concerns at any time!

@prdai
Copy link
Contributor Author

prdai commented Aug 9, 2025

Oh, sorry I should have described it more clearly.
Please feel free to raise any concerns at any time!

sure will do, and sorry for not asking for clarification before :)

@prdai prdai requested a review from jason810496 August 19, 2025 13:16
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! LGTM overall.
It seems some part of the documentation should be covered in further Queue Provider PR.

Just FYI, the "Queue URI" should be replace by "scheme" in further Queue Provider PR.
You may refer to 5c03795 as an example of how to adapt to “scheme”.

prdai and others added 5 commits August 20, 2025 23:29
…e/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
…e/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
…e/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
@prdai prdai requested a review from jason810496 August 20, 2025 18:09
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI failure still need to be resolved, thanks!

  - Replace Mock object string representations with proper Mock(body=value) pattern
  - Fix str(message.body) calls returning mock object strings instead of expected message content
  - Apply standard Azure provider mocking conventions consistent with other tests
  - Remove unnecessary comments and custom MockMessage classes
@prdai prdai requested a review from jason810496 September 9, 2025 14:41
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the update! LGTM overall.

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be done as a follow-up PR, but it would be also nice to add these 2 services as part of the unified MessageQueueTrigger . Please read documentation on how to do it.

The goal is to have one MessageQueueTrigger that is compatible with different queue providers, today we have SQS and Kafka. Would be awesome to add Azure as well

@jscheffl
Copy link
Contributor

@dabla or @bolkedebruin Do you have any knowledge about Azure Service Bus to review?

@dabla
Copy link
Contributor

dabla commented Oct 20, 2025

@dabla or @bolkedebruin Do you have any knowledge about Azure Service Bus to review?

Hallo @jscheffl , I don’t have any knowledge about this one but can also look into it, but I do agree with comments @vincbeck though! It should be implemented in such a way that it can be used by the generic MessgeQueueTrigger, otherwise we will end up having specific implementations per provider like with the SQL operators in the past, it should be provider agnostic. I actually did the same for MongoDB, still have to open a PR for that one.

@prdai
Copy link
Contributor Author

prdai commented Nov 22, 2025

It could be done as a follow-up PR, but it would be also nice to add these 2 services as part of the unified MessageQueueTrigger . Please read documentation on how to do it.

hi, can we take it up as a follow up PR?


i have addressed the other comment that was added, anything else that needs to be worked on within this PR? :) lmk thanks! and sorry for the delay

@prdai prdai requested a review from vincbeck November 22, 2025 04:49
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!
LGTM if the CI pass.

hi, can we take it up as a follow up PR?

Sure! We definitely can take this one just for adding Trigger of Azure Service Bus Queue, then the MessageQueueTrigger can be done in follow-up one.

prdai and others added 3 commits November 22, 2025 23:47
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I have no big experience with Azure Service Bus but it looks sound and there are no red flags.

@potiuk potiuk merged commit dd10952 into apache:main Nov 24, 2025
119 checks passed
Copilot AI pushed a commit to jason810496/airflow that referenced this pull request Dec 5, 2025
…ge processing (apache#53356)

* Add Azure Service Bus integration with triggers and documentation

* Implement Azure Service Bus Queue and Subscription triggers with async message processing and unit tests

* feat(Azure): Add methods to read messages from Service Bus queue and subscription

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Remove Queue URI format documentation for Azure Service Bus Queue and Subscription Providers

* Enhance Azure Service Bus integration by adding new message types and updating trigger initialization tests

* Fix Azure Service Bus message trigger test failures

- Simplified mocking to avoid connection and import issues
- Fixed mock assertions for MessageHook initialization
- Addressed compatibility issues with older Airflow versions
- Used context managers for proper mock lifecycle management

* Fix Azure Service Bus trigger test mocking issues

  - Replace Mock object string representations with proper Mock(body=value) pattern
  - Fix str(message.body) calls returning mock object strings instead of expected message content
  - Apply standard Azure provider mocking conventions consistent with other tests
  - Remove unnecessary comments and custom MockMessage classes

* Update the Trigger Event Message to Include the Actual Content

* resolve review comments

* chore: change BaseTrigger to BaseEventTrigger

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* chore: fix ci/cd

---------

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
itayweb pushed a commit to itayweb/airflow that referenced this pull request Dec 6, 2025
…ge processing (apache#53356)

* Add Azure Service Bus integration with triggers and documentation

* Implement Azure Service Bus Queue and Subscription triggers with async message processing and unit tests

* feat(Azure): Add methods to read messages from Service Bus queue and subscription

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* Remove Queue URI format documentation for Azure Service Bus Queue and Subscription Providers

* Enhance Azure Service Bus integration by adding new message types and updating trigger initialization tests

* Fix Azure Service Bus message trigger test failures

- Simplified mocking to avoid connection and import issues
- Fixed mock assertions for MessageHook initialization
- Addressed compatibility issues with older Airflow versions
- Used context managers for proper mock lifecycle management

* Fix Azure Service Bus trigger test mocking issues

  - Replace Mock object string representations with proper Mock(body=value) pattern
  - Fix str(message.body) calls returning mock object strings instead of expected message content
  - Apply standard Azure provider mocking conventions consistent with other tests
  - Remove unnecessary comments and custom MockMessage classes

* Update the Trigger Event Message to Include the Actual Content

* resolve review comments

* chore: change BaseTrigger to BaseEventTrigger

* Update providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/message_bus.py

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>

* chore: fix ci/cd

---------

Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants