Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EH Pyproto] Release preparation #22433

3 changes: 2 additions & 1 deletion eng/tox/allowed_pylint_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@
"azure-messaging-nspkg",
"azure-agrifood-farming",
"azure-ai-language-questionanswering",
"azure-ai-language-conversations"
"azure-ai-language-conversations",
"azure-eventhub"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we open an issue to re-enable any analysis/test passes we've opted out of?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue created: #22455

]
1 change: 0 additions & 1 deletion eng/tox/mypy_hard_failure_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

MYPY_HARD_FAILURE_OPTED = [
"azure-core",
"azure-eventhub",
"azure-identity",
"azure-keyvault-administration",
"azure-keyvault-certificates",
Expand Down
18 changes: 15 additions & 3 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
# Release History

## 5.7.0 (Unreleased)
## 5.8.0a1 (Unreleased)

Version 5.8.0a1 is our first efforts to build an Azure Event Hubs client library based on pure python implemented AMQP stack.

### Breaking changes

- The following features have been temporarily pulled out which will be added back in future previews as we work towards a stable release:
- Async is not supported.
- Passing the following keyword arguments to the constructors and `from_connection_string` methods of the `EventHubProducerClient` and `EventHubConsumerClient` is not supported: `transport_type`, `http_proxy`, `custom_endpoint_address`, and `connection_verify`.

### Other Changes

- uAMQP dependency is removed.

## 5.7.0 (2022-01-11)

This version and all future versions will require Python 3.7+. Python 2.7 and 3.6 are no longer supported.

Expand All @@ -9,8 +23,6 @@ This version and all future versions will require Python 3.7+. Python 2.7 and 3.
- Added support for fixed (linear) retry backoff:
- Sync/async `EventHubProducerClient` and `EventHubConsumerClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.

### Breaking Changes

### Bugs Fixed

- Fixed a bug that `EventHubProducerClient` could be reopened for sending events instead of encountering with `KeyError` when the client is previously closed (issue #21849).
Expand Down
148 changes: 18 additions & 130 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure

_Azure SDK Python packages support for Python 2.7 has ended 01 January 2022. For more information and questions, please refer to https://github.com/Azure/azure-sdk-for-python/issues/20691_

_This version is our first efforts to build an Azure Event Hubs client library based on pure python implemented AMQP stack.
Features like async support or AmqpOverWebsocket are unavailable for this version which will be added back in future previews as we work towards a stable release.
Please refer to the changelog for more details._


## Getting started

### Prerequisites
Expand Down Expand Up @@ -63,8 +68,6 @@ protocol. There are implementations of the `TokenCredential` protocol available
[azure-identity package](https://pypi.org/project/azure-identity/). The host name is of the format `<yournamespace.servicebus.windows.net>`.
- To use the credential types provided by `azure-identity`, please install the package:
```pip install azure-identity```
- Additionally, to use the async API, you must first install an async transport, such as [`aiohttp`](https://pypi.org/project/aiohttp/):
```pip install aiohttp```
- When using Azure Active Directory, your principal must be assigned a role which allows access to Event Hubs, such as the
Azure Event Hubs Data Owner role. For more information about using Azure Active Directory authorization with Event Hubs,
please refer to [the associated documentation](https://docs.microsoft.com/azure/event-hubs/authorize-access-azure-active-directory).
Expand Down Expand Up @@ -101,9 +104,6 @@ The following sections provide several code snippets covering some of the most c
- [Publish events to an Event Hub](#publish-events-to-an-event-hub)
- [Consume events from an Event Hub](#consume-events-from-an-event-hub)
- [Consume events from an Event Hub in batches](#consume-events-from-an-event-hub-in-batches)
- [Publish events to an Event Hub asynchronously](#publish-events-to-an-event-hub-asynchronously)
- [Consume events from an Event Hub asynchronously](#consume-events-from-an-event-hub-asynchronously)
- [Consume events from an Event Hub in batches asynchronously](#consume-events-from-an-event-hub-in-batches-asynchronously)
- [Consume events and save checkpoints using a checkpoint store](#consume-events-and-save-checkpoints-using-a-checkpoint-store)
- [Use EventHubConsumerClient to work with IoT Hub](#use-eventhubconsumerclient-to-work-with-iot-hub)

Expand Down Expand Up @@ -204,112 +204,6 @@ with client:
# client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
```

### Publish events to an Event Hub asynchronously

Use the `create_batch` method on `EventHubProducer` to create an `EventDataBatch` object which can then be sent using the `send_batch` method.
Events may be added to the `EventDataBatch` using the `add` method until the maximum batch size limit in bytes has been reached.
```python
import asyncio
from azure.eventhub.aio import EventHubProducerClient # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
event_data_batch = await client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
return event_data_batch

async def send():
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
batch_data = await create_batch(client)
async with client:
await client.send_batch(batch_data)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
```

### Consume events from an Event Hub asynchronously

This SDK supports both synchronous and asyncio based code. To receive as demonstrated in the samples above, but within
aio, one would need the following:

```python
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)

async def receive():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
async with client:
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
```

### Consume events from an Event Hub in batches asynchronously

All synchronous functions are supported in aio as well. As demonstrated above for synchronous batch receipt, one can accomplish
the same within asyncio as follows:

```python
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint()

async def receive_batch():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
async with client:
await client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_batch())
```

### Consume events and save checkpoints using a checkpoint store

`EventHubConsumerClient` is a high level construct which allows you to receive events from multiple partitions at once
Expand All @@ -322,53 +216,49 @@ a consumer group in an Event Hub instance. The `EventHubConsumerClient` uses an
and to store the relevant information required by the load balancing algorithm.

Search pypi with the prefix `azure-eventhub-checkpointstore` to
find packages that support this and use the `CheckpointStore` implementation from one such package. Please note that both sync and async libraries are provided.
find packages that support this and use the `CheckpointStore` implementation from one such package.

In the below example, we create an instance of `EventHubConsumerClient` and use a `BlobCheckpointStore`. You need
to [create an Azure Storage account](https://docs.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-portal)
and a [Blob Container](https://docs.microsoft.com/azure/storage/blobs/storage-quickstart-blobs-portal#create-a-container) to run the code.

[Azure Blob Storage Checkpoint Store Async](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio)
and [Azure Blob Storage Checkpoint Store Sync](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub-checkpointstoreblob)
are one of the `CheckpointStore` implementations we provide that applies Azure Blob Storage as the persistent store.
[Azure Blob Storage Checkpoint Store Sync](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub-checkpointstoreblob)
is one of the `CheckpointStore` implementations we provide that applies Azure Blob Storage as the persistent store.


```python
import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
def on_event(partition_context, event):
# do something
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.

async def receive(client):
await client.receive(
def receive(client):
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)

async def main():
def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store, # For load balancing and checkpoint. Leave None for no load balancing
)
async with client:
await receive(client)
with client:
receive(client)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
main()
```

### Use EventHubConsumerClient to work with IoT Hub
Expand All @@ -390,8 +280,6 @@ client = EventHubConsumerClient.from_connection_string(connection_str, consumer_

partition_ids = client.get_partition_ids()
```
- Programmatically retrieve the built-in Event Hubs compatible endpoint.
Refer to [IoT Hub Connection String Sample](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py).

## Troubleshooting

Expand Down
4 changes: 1 addition & 3 deletions sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from ._pyamqp import constants
from ._common import EventData, EventDataBatch
from ._version import VERSION

Expand All @@ -18,14 +17,13 @@
parse_connection_string,
EventHubConnectionStringProperties
)
from ._constants import TransportType


__all__ = [
"EventData",
"EventDataBatch",
"EventHubProducerClient",
"EventHubConsumerClient",
"TransportType",
"EventHubSharedKeyCredential",
"CheckpointStore",
"CloseReason",
Expand Down
Loading