Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peeking message on subscriptions with sessions enabled fails when using NEXT_AVAILABLE_SESSION #30907

Closed
ManuelEibl opened this issue Jun 27, 2023 · 11 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@ManuelEibl
Copy link

  • Package Name: servicebus
  • Package Version: 7.11.0 (but I also tested with 7.10.0 and 7.9.0 to exclude pyamqp)
  • Operating System: Ubuntu 22.04
  • Python Version: 3.11.3

Describe the bug
When trying to peek a message on a service bus subscriptions where sessions are enabled fails.
The received error is azure.servicebus._pyamqp.error.AMQPLinkError: Error condition: com.microsoft:timeout.

To Reproduce
Steps to reproduce the behavior:

  1. Create an instance of a ServiceBusClient from connection string
  2. call get_subscription_receiver on the client instance to get a receiver and pass NEXT_AVAILABLE_SESSION as the sesion_id
  3. Try to peek_messages
from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION

conn_str = "the-connection-string"
topic = "some-topic"
subscription = "subscription-with-sessions-enabled"

sb_client = ServiceBusClient.from_connection_string(conn_str)
receiver = sb_client.get_subscription_receiver(topic, subscription, session_id=NEXT_AVAILABLE_SESSION)
receiver.peek_messages()

Expected behavior
Expecting to receive a list of messages on the subscription for the next available session

Additional context
On first glance it sounds like a simple connection issue caused by Azure. However, in a related .Net project we can work with the same sessions by using the Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Class. Therefore, I don't expect it to be a simple temporary connection issue. It also works if using a hard coded session_id such as "abc" which immediately returns a result (e.g. empty list if it does not exist or has no waiting messages).
Therefore, I assume that the usage of NEXT_AVAILABLE_SESSION as session_id causes this issue in some way.

Complete stack trace:

Unable to attach new link: ValueError('Invalid link')
Traceback (most recent call last):
File "/some-path/service_bus_issue_poc.py", line 11, in
result = receiver.peek_messages()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/some-path/.venv/lib/python3.11/site-packages/azure/servicebus/_servicebus_receiver.py", line 804, in peek_messages
self._open()
File "/some-path/.venv/lib/python3.11/site-packages/azure/servicebus/_servicebus_receiver.py", line 365, in _open
while not self._handler.client_ready():
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/some-path/.venv/lib/python3.11/site-packages/azure/servicebus/_pyamqp/client.py", line 396, in client_ready
if not self._client_ready():
^^^^^^^^^^^^^^^^^^^^
File "/some-path/.venv/lib/python3.11/site-packages/azure/servicebus/_pyamqp/client.py", line 826, in _client_ready
if self._link.get_state().value != 3: # ATTACHED
^^^^^^^^^^^^^^^^^^^^^^
File "/some-path/.venv/lib/python3.11/site-packages/azure/servicebus/_pyamqp/link.py", line 107, in get_state
raise self._error
azure.servicebus._pyamqp.error.AMQPLinkError: Error condition: com.microsoft:timeout
Error Description: The operation did not complete within the allotted timeout of 00:01:05. The time allotted to this operation may have been a portion of a longer timeout. For more information on exception types and proper exception handling, please refer to https://aka.ms/ServiceBusExceptions

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-triage Workflow: This issue needs the team to triage. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus labels Jun 27, 2023
@kashifkhan
Copy link
Member

Thank you for the feedback @ManuelEibl . We will investigate and get back to you

Just a few follow ups

  • Are you getting this error when there are sessions present with messages in them?
  • The dotnet SDK returns an empty list when the same situation exits ?

@kashifkhan kashifkhan added Messaging Messaging crew needs-author-feedback Workflow: More information is needed from author to address the issue. labels Jun 27, 2023
@xiangyan99 xiangyan99 removed the needs-team-triage Workflow: This issue needs the team to triage. label Jun 27, 2023
@github-actions
Copy link

Hi @ManuelEibl. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

@ManuelEibl
Copy link
Author

No you are right. When there are messages I get the messages.
The behavior in the dotnet sdk is actually the same. Using the processor this was not readily visible.

So this means this is intended behavior?
So as a user I should set a timeout to define how long I want to wait when peeking?

I sort of expected that it would not wait at all TBH but simply peek and if there is nothing just return and empty list. Is there another method which I could use to achieve this? Guessing a timeout so that I would not cut off the receiving of a message but still not wait unnecessarily long seems a bit awkward.

@github-actions github-actions bot added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Jun 27, 2023
@l0lawrence
Copy link
Member

Hi @ManuelEibl, yes this is expected behavior.

When there are no messages in a topic subscription and session_id is set to NEXT_AVAILABLE_SESSION, you attempt to connect to the next available session (however since there are no messages there is no available session) this will cause a TimeoutError from the service as the target for the receiving link cannot be established and the peek operation times out.

On the other hand if you set a specific session_id (i.e session_id="0") you are specifying the target you are trying to reach and you will receive an empty list because the link is established and the session is shown to have no messages.

For the alternate method to use to not have to wait, at the present time there is nothing in our code that supports that but we will investigate and get back to you.

@ManuelEibl
Copy link
Author

Thanks for your clarifications.
Just to add some more context to this. The reason why I wouldn't want to wait forever for the next available session is that I am using a receiver pool. Therefore, I am checking for messages on my existing receivers (and their relative sessions) and if there is nothing new there I'm checking for the next available session. However, if a new message arrives on one of the existing sessions with the already initialized receiver this message has to wait.

So I wonder what the best flow would be in such a scenario.
I guess at the moment the only solution is a timeout for checking the next available session, but a flag whether it should return when no session is available or keep looking might be helpful in the future.

@ManuelEibl
Copy link
Author

I have now tried passing a timeout to the peek_messages call (e.g. receiver.peek_messages(timeout=1)) but the result is the same. It "freezes" on peek_messages and returns the same error azure.servicebus._pyamqp.error.AMQPLinkError: Error condition: com.microsoft:timeout after 1min.

Surely this is not intended is it? I would need to forcefully kill the peek process if I wanted to avoid this.

@l0lawrence
Copy link
Member

Hi @ManuelEibl, sorry for the delayed response.

For your second point

I have now tried passing a timeout to the peek_messages call (e.g. receiver.peek_messages(timeout=1)) but the result is the same. It "freezes" on peek_messages and returns the same error azure.servicebus._pyamqp.error.AMQPLinkError: Error condition: com.microsoft:timeout after 1min.

Surely this is not intended is it? I would need to forcefully kill the peek process if I wanted to avoid this.

This timeout in the peek_messages here works on the operation , where the AMQPLinkError is coming from is from when we are attempting to open the receiver link to the NEXT_AVAILABLE_SESSION.

I am currently working on a PR that will add in a timeout that allows you to control how long you want to wait for a session (allowing for a less than 65 second wait time), and I am optimistic that this will help with your issue you described above.

@sudharsan2020
Copy link

Hi @l0lawrence ,
I am trying to receive the sessions from a receiver with a timeout of 2 minutes.
However, when there are no longer any messages, the session ends with the below message.

All messages received for session id: 63f5d5231c7c208a64777fa9-7
Unable to attach new link: ValueError('Invalid link')
Unable to attach new link: ValueError('Invalid link')
Unable to attach new link: ValueError('Invalid link')
Unable to attach new link: ValueError('Invalid link')
If timeout occurs during connecting to a session,It indicates that there might be no non-empty sessions remaining; exiting.This may present as a UserError in the azure portal metric.

Process finished with exit code 0

Is there a way to handle this exception and monitor 24*7 for session-enabled messages?
In my case, the messages may arrive at any time in random order.
How do you suggest to handle this?

async def message_processing(servicebus_client, topic_name):
    results_list = []
    while True:
        try:
            async with servicebus_client.get_subscription_receiver(
                    topic_name=topic_name,
                    subscription_name='my-demo-subscription-name',
                    max_wait_time=120,
                    session_id=NEXT_AVAILABLE_SESSION
            ) as receiver:
                await receiver.session.set_state("OPEN")
                _prev_session_id = ''
                async for message in receiver:
                    _response: dict = json.loads(str(message))

                    # Get the total entities
                    _expected_records_count = int(_response.get('expected_records_count'))

                    # whenever session id- changes add logic to store previous messages
                    if _prev_session_id and _prev_session_id != message.session_id:
                        msg.info(f'Session ID changes. Storing the results received so far: {len(results_list)}')
                        if _expected_records_count != len(results_list):
                            msg.fail(f'Not all entities received. Expected count: {len(results_list)} Actual count: {_expected_records_count}')

                    await receiver.complete_message(message)                   

                    # Store the session id
                    _prev_session_id = message.session_id

                    if len(results_list) == entity_count:
                        print(f'All messages received for session id: {message.session_id}')
                        await receiver.session.set_state("CLOSED")
                        break
        except OperationTimeoutError:
            print("If timeout occurs during connecting to a session,"
                  "It indicates that there might be no non-empty sessions remaining; exiting."
                  "This may present as a UserError in the azure portal metric.")
            return
        except (ServiceBusConnectionError, SessionLockLostError) as ex:
            msg.fail(f'Exception occurred for the document: {ex.__cause__}')
            continue

@l0lawrence
Copy link
Member

Hi @sudharsan2020, there is nothing in the SDK that would allow for periodic lookup of sessions. In your own code you could try to receive and except the OperationTimeoutError if any appear and try again. With the PR that is in progress you will be able to specify a smaller timeout and therefore try to receive again sooner.

@l0lawrence l0lawrence added the issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. label Jul 31, 2023
@github-actions github-actions bot removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jul 31, 2023
@github-actions
Copy link

Hi @ManuelEibl. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

@github-actions
Copy link

github-actions bot commented Aug 8, 2023

Hi @ManuelEibl, since you haven’t asked that we /unresolve the issue, we’ll close this out. If you believe further discussion is needed, please add a comment /unresolve to reopen the issue.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

5 participants