Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Track2 Preview3 #7059

Merged
merged 44 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
64da8ed
Small changes from code review
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
70a33d0
code clean 1
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
13a8fe7
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
YijunXieMS Sep 7, 2019
b5c933f
exclude eventprocessor test for python27
Sep 7, 2019
7b0f5fe
exclude eventprocessor test
Sep 7, 2019
167361e
Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"
Sep 7, 2019
1253983
Fix small problem in consumer iterator (#7110)
yunhaoling Sep 7, 2019
548a989
Fixed an issue that initializes partition processor multiple times
Sep 8, 2019
725b333
Update release history for 5.0.0b3
Sep 9, 2019
c359042
Update README for 5.0.0b3
Sep 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
ignore-patterns=test_*,conftest,setup
reports=no

# PYLINT DIRECTORY BLACKLIST. Ignore eventprocessor temporarily until new eventprocessor code is merged to master
ignore=_generated,samples,examples,test,tests,doc,.tox,eventprocessor
# PYLINT DIRECTORY BLACKLIST.
ignore=_generated,samples,examples,test,tests,doc,.tox

init-hook='import sys; sys.path.insert(0, os.path.abspath(os.getcwd().rsplit("azure-sdk-for-python", 1)[0] + "azure-sdk-for-python/scripts/pylint_custom_plugin"))'
load-plugins=pylint_guidelines_checker
Expand Down
14 changes: 14 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# Release History
## 5.0.0b3 (2019-09-10)

**New features**
- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically
- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added
to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package.
Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details.

**Breaking changes**

- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events,
process_error, close) added argument "partition_context", which has method update_checkpoint.
- `CheckpointManager` was replaced by `PartitionContext`
- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager`

## 5.0.0b2 (2019-08-06)

Expand Down
22 changes: 9 additions & 13 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,16 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.
You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.


```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'

Expand All @@ -232,24 +235,16 @@ async def do_operation(event):
print(event)

class MyPartitionProcessor(PartitionProcessor):
def __init__(self, checkpoint_manager):
super(MyPartitionProcessor, self).__init__(checkpoint_manager)

async def process_events(self, events):
async def process_events(self, events, partition_context):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)

def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
partition_manager = SamplePartitionManager() # in-memory PartitionManager.
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
Expand All @@ -273,6 +268,7 @@ The Event Hubs APIs generate the following exceptions.
- **EventDataError:** The EventData to be sent fails data validation.
For instance, this error is raised if you try to send an EventData that is already sent.
- **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent.
- **OperationTimeoutError:** EventHubConsumer.send() times out.
- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors.

## Next steps
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,11 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

def __enter__(self):
return self
Expand All @@ -44,59 +26,81 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
self._handler.close()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have two return _handle_exception(exception, self)?


def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
# pylint:disable=protected-access
timeout_time = time.time() + (
timeout if timeout else 100000) # timeout equals to 0 means no timeout, set the value to be a large number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout or 100000

retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand All @@ -118,16 +122,16 @@ def close(self, exception=None):
:caption: Close down the handler.

"""
self.running = False
if self.error: # type: ignore
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Loading