Skip to content

Commit

Permalink
[EventHubs] merge Buffered Producer into main (#24653)
Browse files Browse the repository at this point in the history
* update buffered producer changelog and version (#24210)

* [EventHubs] Buffered Producer  (#24362)

* clean up, remove conditions, semaphores

* minor fix

* remove semaphores, conditions

* minor fixes

* minor changs on queue length

* expose buffer_concurrency

* remove max_concurrent_sends

* make buffer size reqd

* remove comment

* add locks around flush

* use the right counter to track q size

* use the correct count for the q

* locks and right q size var for async

* clean imports

* lock for bg worker

* formatting fixes for pylin

* final review

* fix pylint issues

* lint + version

* remove semaphore tests

* skip tests that flush then close

* fix for lock issue

* unskip tests

* more async updates

Co-authored-by: Kashif Khan <361477+kashifkhan@users.noreply.github.com>
Co-authored-by: Kashif Khan <kashifkhan@microsoft.com>
  • Loading branch information
3 people authored Jun 2, 2022
1 parent 169e608 commit 9a34e81
Show file tree
Hide file tree
Showing 28 changed files with 3,442 additions and 199 deletions.
22 changes: 16 additions & 6 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
# Release History

## 5.9.1 (Unreleased)
## 5.10.0 (2022-06-07)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes
- Includes the following features related to buffered sending of events:
- A new method `send_event` to `EventHubProducerClient` which allows sending single `EventData` or `AmqpAnnotatedMessage`.
- Buffered mode sending to `EventHubProducerClient` which is intended to allow for efficient publishing of events
without having to explicitly manage batches in the application.
- The constructor of `EventHubProducerClient` and `from_connection_string` method takes the following new keyword arguments
for configuration:
- `buffered_mode`: The flag to enable/disable buffered mode sending.
- `on_success`: The callback to be called once events have been successfully published.
- `on_error`: The callback to be called once events have failed to be published.
- `max_buffer_length`: The total number of events per partition that can be buffered before a flush will be triggered.
- `max_wait_time`: The amount of time to wait for a batch to be built with events in the buffer before publishing.
- A new method `EventHubProducerClient.flush` which flushes events in the buffer to be sent immediately.
- A new method `EventHubProducerClient.get_buffered_event_count` which returns the number of events that are buffered and waiting to be published for a given partition.
- A new property `EventHubProducerClient.total_buffered_event_count` which returns the total number of events that are currently buffered and waiting to be published, across all partitions.
- A new boolean keyword argument `flush` to `EventHubProducerClient.close` which indicates whether to flush the buffer or not while closing.

## 5.9.0 (2022-05-10)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from ._buffered_producer import BufferedProducer
from ._partition_resolver import PartitionResolver
from ._buffered_producer_dispatcher import BufferedProducerDispatcher

__all__ = [
"BufferedProducer",
"PartitionResolver",
"BufferedProducerDispatcher",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
import queue
import logging
from threading import RLock
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Callable, TYPE_CHECKING

from .._producer import EventHubProducer
from .._common import EventDataBatch
from ..exceptions import OperationTimeoutError

if TYPE_CHECKING:
from .._producer_client import SendEventTypes

_LOGGER = logging.getLogger(__name__)


class BufferedProducer:
# pylint: disable=too-many-instance-attributes
def __init__(
self,
producer: EventHubProducer,
partition_id: str,
on_success: Callable[["SendEventTypes", Optional[str]], None],
on_error: Callable[["SendEventTypes", Optional[str], Exception], None],
max_message_size_on_link: int,
executor: ThreadPoolExecutor,
*,
max_wait_time: float = 1,
max_buffer_length: int
):
self._buffered_queue: queue.Queue = queue.Queue()
self._max_buffer_len = max_buffer_length
self._cur_buffered_len = 0
self._executor: ThreadPoolExecutor = executor
self._producer: EventHubProducer = producer
self._lock = RLock()
self._max_wait_time = max_wait_time
self._on_success = self.failsafe_callback(on_success)
self._on_error = self.failsafe_callback(on_error)
self._last_send_time = None
self._running = False
self._cur_batch: Optional[EventDataBatch] = None
self._max_message_size_on_link = max_message_size_on_link
self._check_max_wait_time_future = None
self.partition_id = partition_id

def start(self):
with self._lock:
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._running = True
if self._max_wait_time:
self._last_send_time = time.time()
self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker)

def stop(self, flush=True, timeout_time=None, raise_error=False):
self._running = False
if flush:
with self._lock:
self.flush(timeout_time=timeout_time, raise_error=raise_error)
else:
if self._cur_buffered_len:
_LOGGER.warning(
"Shutting down Partition %r. There are still %r events in the buffer which will be lost",
self.partition_id,
self._cur_buffered_len,
)
if self._check_max_wait_time_future:
remain_timeout = timeout_time - time.time() if timeout_time else None
try:
self._check_max_wait_time_future.result(remain_timeout)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc)
self._producer.close()

def put_events(self, events, timeout_time=None):
# Put single event or EventDataBatch into the queue.
# This method would raise OperationTimeout if the queue does not have enough space for the input and
# flush cannot finish in timeout.
try:
new_events_len = len(events)
except TypeError:
new_events_len = 1
if self._max_buffer_len - self._cur_buffered_len < new_events_len:
_LOGGER.info(
"The buffer for partition %r is full. Attempting to flush before adding %r events.",
self.partition_id,
new_events_len,
)
# flush the buffer
with self._lock:
self.flush(timeout_time=timeout_time)
if timeout_time and time.time() > timeout_time:
raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.")
try:
# add single event into current batch
self._cur_batch.add(events)
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
# if there are events in cur_batch, enqueue cur_batch to the buffer
if self._cur_batch:
self._buffered_queue.put(self._cur_batch)
self._buffered_queue.put(events)
# create a new batch for incoming events
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
except ValueError:
# add single event exceeds the cur batch size, create new batch
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch.add(events)
self._cur_buffered_len += new_events_len

def failsafe_callback(self, callback):
def wrapper_callback(*args, **kwargs):
try:
callback(*args, **kwargs)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning(
"On partition %r, callback %r encountered exception %r", callback.__name__, exc, self.partition_id
)

return wrapper_callback

def flush(self, timeout_time=None, raise_error=True):
# pylint: disable=protected-access
# try flushing all the buffered batch within given time
with self._lock:
_LOGGER.info("Partition: %r started flushing.", self.partition_id)
if self._cur_batch: # if there is batch, enqueue it to the buffer first
self._buffered_queue.put(self._cur_batch)
while self._cur_buffered_len:
remaining_time = timeout_time - time.time() if timeout_time else None
if (remaining_time and remaining_time > 0) or remaining_time is None:
batch = self._buffered_queue.get()
self._buffered_queue.task_done()
try:
_LOGGER.info("Partition %r is sending.", self.partition_id)
self._producer.send(batch, timeout=timeout_time - time.time() if timeout_time else None)
_LOGGER.info("Partition %r sending %r events succeeded.", self.partition_id, len(batch))
self._on_success(batch._internal_events, self.partition_id)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.info(
"Partition %r sending %r events failed due to exception: %r ",
self.partition_id,
len(batch),
exc,
)
self._on_error(batch._internal_events, self.partition_id, exc)
finally:
self._cur_buffered_len -= len(batch)
else:
_LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id)
if raise_error:
raise OperationTimeoutError(
"Failed to flush {!r} within {}".format(self.partition_id, timeout_time)
)
break
# after finishing flushing, reset cur batch and put it into the buffer
self._last_send_time = time.time()
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
_LOGGER.info("Partition %r finished flushing.", self.partition_id)

def check_max_wait_time_worker(self):
while self._running:
if self._cur_buffered_len > 0:
now_time = time.time()
_LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id)
# flush the partition if the producer is running beyond the waiting time
# or the buffer is at max capacity
if (now_time - self._last_send_time > self._max_wait_time) or (
self._cur_buffered_len >= self._max_buffer_len
):
# in the worker, not raising error for flush, users can not handle this
with self._lock:
self.flush(raise_error=False)
time.sleep(min(self._max_wait_time, 5))

@property
def buffered_event_count(self):
return self._cur_buffered_len
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import logging
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Optional, List, Callable, Union, TYPE_CHECKING

from ._partition_resolver import PartitionResolver
from ._buffered_producer import BufferedProducer
from .._producer import EventHubProducer
from ..exceptions import EventDataSendError, ConnectError, EventHubError

if TYPE_CHECKING:
from .._producer_client import SendEventTypes

_LOGGER = logging.getLogger(__name__)


class BufferedProducerDispatcher:
# pylint: disable=too-many-instance-attributes
def __init__(
self,
partitions: List[str],
on_success: Callable[["SendEventTypes", Optional[str]], None],
on_error: Callable[["SendEventTypes", Optional[str], Exception], None],
create_producer: Callable[..., EventHubProducer],
eventhub_name: str,
max_message_size_on_link: int,
*,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
executor: Optional[Union[ThreadPoolExecutor, int]] = None
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
self._lock = Lock()
self._on_success = on_success
self._on_error = on_error
self._create_producer = create_producer
self._eventhub_name = eventhub_name
self._max_message_size_on_link = max_message_size_on_link
self._partition_resolver = PartitionResolver(self._partition_ids)
self._max_wait_time = max_wait_time
self._max_buffer_length = max_buffer_length
self._existing_executor = False

if not executor:
self._executor = ThreadPoolExecutor()
elif isinstance(executor, ThreadPoolExecutor):
self._existing_executor = True
self._executor = executor
elif isinstance(executor, int):
self._executor = ThreadPoolExecutor(executor)

def _get_partition_id(self, partition_id, partition_key):
if partition_id:
if partition_id not in self._partition_ids:
raise ConnectError(
"Invalid partition {} for the event hub {}".format(
partition_id, self._eventhub_name
)
)
return partition_id
if isinstance(partition_key, str):
return self._partition_resolver.get_partition_id_by_partition_key(partition_key)
return self._partition_resolver.get_next_partition_id()

def enqueue_events(self, events, *, partition_id=None, partition_key=None, timeout_time=None):
pid = self._get_partition_id(partition_id, partition_key)
with self._lock:
try:
self._buffered_producers[pid].put_events(events, timeout_time)
except KeyError:
buffered_producer = BufferedProducer(
self._create_producer(pid),
pid,
self._on_success,
self._on_error,
self._max_message_size_on_link,
executor=self._executor,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length
)
buffered_producer.start()
self._buffered_producers[pid] = buffered_producer
buffered_producer.put_events(events, timeout_time)

def flush(self, timeout_time=None):
# flush all the buffered producer, the method will block until finishes or times out
with self._lock:
futures = []
for pid, producer in self._buffered_producers.items():
# call each producer's flush method
futures.append((pid, self._executor.submit(producer.flush, timeout_time=timeout_time)))

# gather results
exc_results = {}
for pid, future in futures:
try:
future.result()
except Exception as exc: # pylint: disable=broad-except
exc_results[pid] = exc

if not exc_results:
_LOGGER.info("Flushing all partitions succeeded")
return

_LOGGER.warning('Flushing all partitions partially failed with result %r.', exc_results)
raise EventDataSendError(
message="Flushing all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(exc_results.keys(), exc_results)
)

def close(self, *, flush=True, timeout_time=None, raise_error=False):

with self._lock:

futures = []
# stop all buffered producers
for pid, producer in self._buffered_producers.items():
futures.append((pid, self._executor.submit(
producer.stop,
flush=flush,
timeout_time=timeout_time,
raise_error=raise_error
)))

exc_results = {}
# gather results
for pid, future in futures:
try:
future.result()
except Exception as exc: # pylint: disable=broad-except
exc_results[pid] = exc

if exc_results:
_LOGGER.warning('Stopping all partitions partially failed with result %r.', exc_results)
if raise_error:
raise EventHubError(
message="Stopping all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(exc_results.keys(), exc_results)
)

if not self._existing_executor:
self._executor.shutdown()

def get_buffered_event_count(self, pid):
try:
return self._buffered_producers[pid].buffered_event_count
except KeyError:
return 0

@property
def total_buffered_event_count(self):
return sum([self.get_buffered_event_count(pid) for pid in self._buffered_producers])
Loading

0 comments on commit 9a34e81

Please sign in to comment.