Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] SDK layer exception parity for uamqp/pyamqp #26229

Closed
3 tasks done
swathipil opened this issue Sep 14, 2022 · 2 comments
Closed
3 tasks done

[EventHubs] SDK layer exception parity for uamqp/pyamqp #26229

swathipil opened this issue Sep 14, 2022 · 2 comments
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs Messaging Messaging crew

Comments

@swathipil
Copy link
Member

swathipil commented Sep 14, 2022

  • EH error story: add a bunch of tests to for all test scenarios. and check that uamqp and pyamqp raise the same errors. check that we have tests for optional params, like timeout flags etc.

Test cases:

  • 1) Sending EventHubProducerClient(...connection_verify="cacert.pem").
  • 2) Buffered Producer with send_batch(), (potentially on both sync/async unsure) doesn't send messages on AttributeError("'list' object has no attribute '_internal_events'")}
  • 3) Setting retry_total>0 and 0 and have consumer client error in 2 ways:
    • retry_total>0; pass in a SASTokenCredential that expires quickly. See how client + eventprocessor behaves. should call error callback --> Issue created for this ([EventHubs] retry_total>0 sync consumer hangs when exception #27137)
    • retry_total=0, Force link detach by updating properties of EH while receiving. If retry set to 0, should call error callback right away and continue running.
    • should these close and re-open consumer and partition?
    • should only non-retryable errors close the partition?
import os
import sys
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

import logging
#uamqp_logger = logging.getLogger('uamqp')
#uamqp_logger.setLevel(logging.DEBUG)

# Configure a console output
#handler = logging.StreamHandler(stream=sys.stdout)
#uamqp_logger.addHandler(handler)

def on_event(partition_context, event):
    # Put your code here.
    # If the operation is i/o intensive, multi-thread will have better performance.
    print("Received event from partition: {}.".format(partition_context.partition_id))


def on_partition_initialize(partition_context):
    # Put your code here.
    print("Partition: {} has been initialized.".format(partition_context.partition_id))


def on_partition_close(partition_context, reason):
    # Put your code here.
    print("Partition: {} has been closed, reason for closing: {}.".format(
        partition_context.partition_id,
        reason
    ))
    consumer_client.close()


def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))

def update_entity(interval):    # forces service link detach
    subscription_id = os.environ['SUBSCRIPTION_ID']
    live_eventhub = {
        "resource_group" : 'swathip-test',
        "namespace": 'swathip-test-eventhubs',
        "event_hub": "eventhub-test"
    }
    from azure.mgmt.eventhub import EventHubManagementClient
    from azure.identity import EnvironmentCredential
    import threading
    mgmt_client = EventHubManagementClient(EnvironmentCredential(), subscription_id)
    def _schedule_update_properties():
        eventhub = mgmt_client.event_hubs.get(
            live_eventhub["resource_group"],
            live_eventhub["namespace"],
            live_eventhub["event_hub"]
        )
        properties = eventhub.as_dict()
        if properties["message_retention_in_days"] == 1:
            properties["message_retention_in_days"] = 2
        else:
            properties["message_retention_in_days"] = 1
        mgmt_client.event_hubs.create_or_update(
            live_eventhub["resource_group"],
            live_eventhub["namespace"],
            live_eventhub["event_hub"],
            properties
        )
        print('updating')
    t = threading.Timer(interval, _schedule_update_properties)
    t.start()


if __name__ == '__main__':
    update_entity(4)
    update_entity(6)
    update_entity(10)
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
        uamqp_transport=True,
        retry_total=0,
        logging_enable=True
    )

    #import time
    #from azure.eventhub import EventHubSharedKeyCredential
    #from azure.eventhub._client_base import EventHubSASTokenCredential
    #from azure.identity import EnvironmentCredential
    #credential = EventHubSharedKeyCredential(os.environ["EVENT_HUB_SAS_POLICY"], os.environ['EVENT_HUB_SAS_KEY'])
    #auth_uri = "sb://{}/{}".format(os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_NAME'])
    #token = credential.get_token(auth_uri).token
    #consumer_client = EventHubConsumerClient(
    #    fully_qualified_namespace=os.environ['EVENT_HUB_HOSTNAME'],
    #    eventhub_name=os.environ['EVENT_HUB_NAME'],
    #    consumer_group='$Default',
    #    credential=EventHubSASTokenCredential(token, time.time() + 8),
    #    uamqp_transport=True,
    #    retry_total=1,
    #    logging_enable=True
    #)

    try:
        with consumer_client:
            consumer_client.receive(
                on_event=on_event,
                on_partition_initialize=on_partition_initialize,
                on_partition_close=on_partition_close,
                on_error=on_error,
                starting_position="-1",  # "-1" is from the beginning of the partition.
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')
@swathipil swathipil mentioned this issue Sep 14, 2022
12 tasks
@github-actions github-actions bot added the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Sep 14, 2022
@azure-sdk azure-sdk added Client This issue points to a problem in the data-plane of the library. Event Hubs needs-team-triage Workflow: This issue needs the team to triage. labels Sep 14, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Sep 14, 2022
@kristapratico kristapratico removed the needs-team-triage Workflow: This issue needs the team to triage. label Sep 14, 2022
@swathipil
Copy link
Member Author

swathipil commented Oct 19, 2022

x = not implemented
y = passed
n = not passed

API to test kwargs/exception test file test case passed?
BUFFERED_MODE = TRUE
EHProducerClient buffer_concurrency=2 test_buffered_producer.py + async test_producer_client_constructor y
EHProducerClient on_success=callback, on_error=callback test_buffered_producer.py, test_buffered_producer.py test_producer_client_constructor y
EHProducerClient on_success=callback w/ invalid # params, on_error=callback w/ invalid # params passes and logs error test_buffered_producer.py async test_producer_client_constructor y
EHProducerClient max_buffer_length=10/100 test_buffered_producer.py + _async.py test_long_wait_small_buffer/with_timing_config y
EHProducerClient, max_wait_time=0/10/1000 test_buffered_producer.py + _async.py test_send_with_timing_config, etc. y
EHProducerClient, retry_total=3, retry_backoff_factor=0.01 test_buffered_producer.py + _async.py test_long_wait_small_buffer y
EHProducerClient, auth_timeout=3 test_buffered_producer.py + _async.py test_long_wait_small_buffer y
EHProducerClient.get_buffered_event_count partition_id=pid test_buffered_producer.py + async test_basic_send_single_events_round_robin y
EHProducerClient on_error=on_error test_bp + async test_producer_client_constructor y
send_event timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
send_batch timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
BUFFERED_MODE = FALSE
EHProducerClient fully_qualified_namespace = invalid test_negative.py + async test_client_invalid_credential y
EHProducerClient eventhub_name = invalid test_negative.py + async test_client_invalid_credential y
EHProducerClient credential = invalid, expired test_negative.py + async test_client_invalid_credential y
EHProducerClient buffered_mode=False, on_success=callback, on_error=callback test_send.py, test_send_async.py test_send_with_callback y
EHProducerClient logging_enable=True x x x
EHProducerClient auth_timeout = 3 test_negative + async test_client_secret_credential y
EHProducerClient user_agent="customized information" test_negative.py + async test_client_secret_credential y
EHProducerClient retry_total=0, retry_mode='exponential' + RetryMode.Exponential, retry_backoff_factor=0.02 test_negative.py + async test_client_invalid_credential y
EHProducerClient idle_timeout=10 test_reconnect.py, test_reconnect_async.py test_send_connection_idle_timeout y
EHProducerClient transport_type=Amqp (pass in), uamqp.AmqpOverWebsocket, AmqpOverWebsocket test_send/receive.py, test_send/receive_async.py test_send/receive_over_websocket, test_send_list, test_send_multiple_partition_with_app_prop y
EHProducerClient http_proxy={...} x x x
EHProducerClient custom_endpoint_address=valid x x x
EHProducerClient custom_endpoint_address=invalid raises AuthenticationError test_negative.py + async test_client_invalid_credential y
EHProducerClient connection_verify=valid test_client_creation.py + async test_custom_certificate y
EHProducerClient connection_verify=invalid raises AuthenticationError test_auth.py + async test_client_identity_credential y
EHProducerClient without send claim (link detach) raises ConnectionLostError local testing x y
EHProducerClient disable entity (vendor link detach) raises ConnectError local testing x y
from_connection_string invalid hostname/key/param test_negative + async test_create/send/receive_batch_invalid_hostname, test_send_batch_invalid_partition y
create_batch max_batch_size=too large/invalid partition test_negative + async test_send_to_invalid_partitions/test_send_too_large_message_async y
send_event timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
send_batch timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y

@swathipil
Copy link
Member Author

swathipil commented Oct 20, 2022

ConsumerClient:

  • mostly same as producer.
  • other kwargs for receive are being tested already

credentials:

  • EventHubSharedKeyCredential:
kwargs/exception test file test case passed?
invalid key_name, invalid access_key test_negative + async ... y

@kashifkhan kashifkhan added the Messaging Messaging crew label Jan 13, 2023
@github-actions github-actions bot locked and limited conversation to collaborators Aug 7, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs Messaging Messaging crew
Projects
None yet
Development

No branches or pull requests

4 participants