Skip to content

Commit

Permalink
Changefeed (#10755)
Browse files Browse the repository at this point in the history
* [ChangeFeed]Add ChangeFeed Package

* test_avro failure

* update dev_requirement.txt

* change namespace to azure.storage.blob.changefeed

* address comments

* optimize memory when reading changefeed events

* namespace change

* set up package change

* fix failed tests

* readme and kwargs

* Update sdk/storage/azure-storage-blob-changefeed/azure/storage/blob/changefeed/_change_feed_client.py

Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com>

* address comments

* 'azure-storage-blob>=12.3.0' which does not match the frozen requirement 'azure-storage-blob~=1.3'

Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com>
  • Loading branch information
xiafu-msft and Rakshith Bhyravabhotla authored Jun 24, 2020
1 parent 18adaa8 commit c5f1413
Show file tree
Hide file tree
Showing 36 changed files with 81,507 additions and 56 deletions.
5 changes: 5 additions & 0 deletions doc/sphinx/package_service_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@
"category": "Client",
"service_name": "Storage",
"manually_generated": true
},
"azure-storage-blob-changefeed": {
"category": "Client",
"service_name": "Storage",
"manually_generated": true
},
"azure-storage-file-share": {
"category": "Client",
Expand Down
1 change: 1 addition & 0 deletions eng/.docsettings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ known_content_issues:
- ['sdk/search/azure-search-nspkg/README.md', 'nspkg and common']
- ['sdk/storage/azure-storage-blob/samples/README.md', 'nspkg and common']
- ['sdk/storage/azure-storage-file-datalake/samples/README.md', 'nspkg and common']
- ['sdk/storage/azure-storage-blob-changefeed/samples/README.md', 'nspkg and common']
- ['sdk/storage/azure-storage-file-share/samples/README.md', 'nspkg and common']
- ['sdk/storage/azure-storage-queue/samples/README.md', 'nspkg and common']
- ['sdk/textanalytics/azure-ai-nspkg/README.md', 'nspkg and common']
Expand Down
8 changes: 8 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## 12.0.0b1 (XX-XX-XX)
- Initial Release. Please see the README for information on the new design.
- Support for ChangeFeedClient: get change feed events by page, get all change feed events, get events in a time range

This package's
[documentation](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/README.md)
and
[samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/samples)
21 changes: 21 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2017 Microsoft

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
7 changes: 7 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include *.md
include azure/__init__.py
include azure/storage/__init__.py
include azure/storage/blob/__init__.py
include LICENSE.txt
recursive-include tests *.py
recursive-include samples *.py *.md
185 changes: 185 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Azure Storage Blob ChangeFeed client library for Python

This preview package for Python enables users to get blob change feed events. These events can be lazily generated, iterated by page, retrieved for a specific time interval, or iterated from a specific continuation token.


[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/azure/storage/blobchangefeed) | [Package (PyPi)](https://pypi.org/project/azure-storage-blob-changefeed/) | [API reference documentation](https://aka.ms/azsdk-python-storage-blobchangefeed-ref) | [Product documentation](https://docs.microsoft.com/azure/storage/) | [Samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/samples)


## Getting started

### Prerequisites
* Python 2.7, or 3.5 or later is required to use this package.
* You must have an [Azure subscription](https://azure.microsoft.com/free/) and an
[Azure storage account](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account) to use this package.

### Install the package
Install the Azure Storage Blob ChangeFeed client library for Python with [pip](https://pypi.org/project/pip/):

```bash
pip install azure-storage-blob-changefeed --pre
```

### Create a storage account
If you wish to create a new storage account, you can use the
[Azure Portal](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account#create-an-account-using-the-azure-portal),
[Azure PowerShell](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account#create-an-account-using-powershell),
or [Azure CLI](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account#create-an-account-using-azure-cli):

```bash
# Create a new resource group to hold the storage account -
# if using an existing resource group, skip this step
az group create --name my-resource-group --location westus2

# Create the storage account
az storage account create -n my-storage-account-name -g my-resource-group --hierarchical-namespace true
```

### Authenticate the client

Interaction with Blob ChangeFeed client starts with an instance of the ChangeFeedClient class. You need an existing storage account, its URL, and a credential to instantiate the client object.

#### Get credentials

To authenticate the client you have a few options:
1. Use a SAS token string
2. Use an account shared access key
3. Use a token credential from [azure.identity](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/identity/azure-identity)

Alternatively, you can authenticate with a storage connection string using the `from_connection_string` method. See example: [Client creation with a connection string](#client-creation-with-a-connection-string).

You can omit the credential if your account URL already has a SAS token.

#### Create client

Once you have your account URL and credentials ready, you can create the ChangeFeedClient:

```python
from azure.storage.blob.changefeed import ChangeFeedClient

service = ChangeFeedClient(account_url="https://<my-storage-account-name>.dfs.core.windows.net/", credential=credential)
```

## Key concepts

#### Clients

The Blob ChangeFeed SDK provides one client:
* ChangeFeedClient: this client allows you to get change feed events by page, get all change feed events, get events in a time range, start listing events with a continuation token.

## Examples

The following sections provide several code snippets covering some of the most common Storage Blob ChangeFeed, including:

* [Client creation with a connection string](#client-creation-with-a-connection-string)
* [Enumerating Events Within a Time Range](#enumerating-events-within-a-time-range)
* [Enumerating All Events](#enumerating-all-events)
* [Enumerating Events by Page](#enumerating-events-by-page)


### Client creation with a connection string
Create the ChangeFeedClient using the connection string to your Azure Storage account.

```python
from azure.storage.blob.changefeed import ChangeFeedClient

service = ChangeFeedClient.from_connection_string(conn_str="my_connection_string")
```
### Enumerating Events Within a Time Range
List all events within a time range.

```python
from datetime import datetime
from azure.storage.blob.changefeed import ChangeFeedClient

cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format("YOUR_ACCOUNT_NAME"),
credential="Your_ACCOUNT_KEY")
start_time = datetime(2020, 1, 6)
end_time = datetime(2020, 3, 4)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time)

# print range of events
for event in change_feed:
print(event)
```

### Enumerating All Events
List all events.

```python
from azure.storage.blob.changefeed import ChangeFeedClient

cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format("YOUR_ACCOUNT_NAME"),
credential="Your_ACCOUNT_KEY")
change_feed = cf_client.list_changes()

# print all events
for event in change_feed:
print(event)
```

### Enumerating Events by Page
List events by page.

```python
from azure.storage.blob.changefeed import ChangeFeedClient

cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format("YOUR_ACCOUNT_NAME"),
credential="Your_ACCOUNT_KEY")

change_feed = cf_client.list_changes().by_page()

# print first page of events
change_feed_page1 = next(change_feed)
for event in change_feed_page1:
print(event)
```

## Troubleshooting

### Logging
This library uses the standard
[logging](https://docs.python.org/3/library/logging.html) library for logging.
Basic information about HTTP sessions (URLs, headers, etc.) is logged at INFO
level.

Detailed DEBUG level logging, including request/response bodies and unredacted
headers, can be enabled on a client with the `logging_enable` argument:
```python
import sys
import logging
from azure.storage.blob.changefeed import ChangeFeedClient

# Create a logger for the 'azure.storage.blob.changefeed' SDK
logger = logging.getLogger('azure.storage')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
service_client = ChangeFeedClient.from_connection_string("your_connection_string", logging_enable=True)
```

## Next steps

### More sample code

Get started with our [Azure Blob ChangeFeed samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/samples).

Several Storage Blob ChangeFeed Python SDK samples are available to you in the SDK's GitHub repository. These samples provide example code for additional scenarios commonly encountered while working with Blob ChangeFeed:

* [`change_feed_samples.py`](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob-changefeed/samples/change_feed_samples.py) - Examples for authenticating and operating on the client:
* list events by page
* list all events
* list events in a time range
* list events starting from a continuation token


## Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: str
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: str
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._change_feed_client import ChangeFeedClient


__all__ = [
'ChangeFeedClient'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=too-many-lines,no-self-use
from typing import ( # pylint: disable=unused-import
Optional, Any, TYPE_CHECKING, Dict
)

from azure.core.paging import ItemPaged
from azure.storage.blob import BlobServiceClient # pylint: disable=no-name-in-module

from azure.storage.blob._shared.base_client import parse_connection_str
from ._models import ChangeFeedPaged
if TYPE_CHECKING:
from datetime import datetime


class ChangeFeedClient(object): # pylint: disable=too-many-public-methods
"""A client to interact with a specific account change feed.
:param str account_url:
The URI to the storage account.
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token. The value can be a SAS token string, an account
shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
:keyword str secondary_hostname:
The hostname of the secondary endpoint.
.. admonition:: Example:
.. literalinclude:: ../samples/change_feed_samples.py
:start-after: [START create_change_feed_client]
:end-before: [END create_change_feed_client]
:language: python
:dedent: 8
:caption: Creating the ChangeFeedClient from a URL to a public blob (no auth needed).
"""
def __init__(
self, account_url, # type: str
credential=None, # type: Optional[Any]
**kwargs # type: Any
):
# type: (...) -> None
self._blob_service_client = BlobServiceClient(account_url, credential, **kwargs)

@classmethod
def from_connection_string(
cls, conn_str, # type: str
credential=None, # type: Optional[Any]
**kwargs # type: Any
): # type: (...) -> ChangeFeedClient
"""Create ChangeFeedClient from a Connection String.
:param str conn_str:
A connection string to an Azure Storage account.
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, an account shared access
key, or an instance of a TokenCredentials class from azure.identity.
Credentials provided here will take precedence over those in the connection string.
:returns: A change feed client.
:rtype: ~azure.storage.blob.changefeed.ChangeFeedClient
.. admonition:: Example:
.. literalinclude:: ../samples/blob_samples_authentication.py
:start-after: [START auth_from_connection_string]
:end-before: [END auth_from_connection_string]
:language: python
:dedent: 8
:caption: Creating the BlobServiceClient using account_key as credential.
"""
account_url, secondary, credential = parse_connection_str(conn_str, credential, 'blob')
if 'secondary_hostname' not in kwargs:
kwargs['secondary_hostname'] = secondary
return cls(account_url, credential=credential, **kwargs)

def list_changes(self, **kwargs):
# type: (Optional[datetime], Optional[datetime], **Any) -> ItemPaged[Dict]
"""Returns a generator to list the change feed events.
The generator will lazily follow the continuation tokens returned by
the service.
:keyword datetime start_time:
Filters the results to return only events which happened after this time.
:keyword datetime end_time:
Filters the results to return only events which happened before this time.
:keyword int results_per_page:
The page size when list events by page using by_page() method on the generator.
:returns: An iterable (auto-paging) response of events whose type is dictionary.
:rtype: ~azure.core.paging.ItemPaged[dict]
.. admonition:: Example:
.. literalinclude:: ../samples/change_feed_samples.py
:start-after: [START list_all_events]
:end-before: [END list_all_events]
:language: python
:dedent: 8
:caption: List all change feed events.
.. literalinclude:: ../samples/change_feed_samples.py
:start-after: [START list_events_by_page]
:end-before: [END list_events_by_page]
:language: python
:dedent: 8
:caption: List change feed events by page.
"""
results_per_page = kwargs.pop('results_per_page', None)
container_client = self._blob_service_client.get_container_client("$blobchangefeed")
return ItemPaged(
container_client,
results_per_page=results_per_page,
page_iterator_class=ChangeFeedPaged,
**kwargs)
Loading

0 comments on commit c5f1413

Please sign in to comment.