Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Buffered Producer #24362

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import time
import queue
import logging
from threading import RLock, Condition, Semaphore
from threading import RLock
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Callable, TYPE_CHECKING

from .._producer import EventHubProducer
from .._common import EventDataBatch
from ..exceptions import OperationTimeoutError

if TYPE_CHECKING:
from .._producer_client import SendEventTypes

Expand All @@ -21,28 +22,23 @@
class BufferedProducer:
# pylint: disable=too-many-instance-attributes
def __init__(
self,
producer: EventHubProducer,
partition_id: str,
on_success: Callable[["SendEventTypes", Optional[str]], None],
on_error: Callable[["SendEventTypes", Optional[str], Exception], None],
max_message_size_on_link: int,
executor: ThreadPoolExecutor,
*,
max_wait_time: float = 1,
max_concurrent_sends: int = 1,
max_buffer_length: int = 10
self,
producer: EventHubProducer,
partition_id: str,
on_success: Callable[["SendEventTypes", Optional[str]], None],
on_error: Callable[["SendEventTypes", Optional[str], Exception], None],
max_message_size_on_link: int,
executor: ThreadPoolExecutor,
*,
max_wait_time: float = 1,
max_buffer_length: int
):
self._buffered_queue: queue.Queue = queue.Queue()
self._max_buffer_len = max_buffer_length
self._cur_buffered_len = 0
self._executor: ThreadPoolExecutor = executor
self._producer: EventHubProducer = producer
self._lock = RLock()
self._not_empty = Condition(self._lock)
self._not_full = Condition(self._lock)
self._max_buffer_len = max_buffer_length
self._max_concurrent_sends = max_concurrent_sends
self._max_concurrent_sends_semaphore = Semaphore(self._max_concurrent_sends)
self._max_wait_time = max_wait_time
self._on_success = self.failsafe_callback(on_success)
self._on_error = self.failsafe_callback(on_error)
Expand All @@ -62,83 +58,69 @@ def start(self):
self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker)

def stop(self, flush=True, timeout_time=None, raise_error=False):

self._running = False
if flush:
self.flush(timeout_time=timeout_time, raise_error=raise_error)
with self._lock:
self.flush(timeout_time=timeout_time, raise_error=raise_error)
else:
if self._cur_buffered_len:
_LOGGER.warning(
"Shutting down Partition %r. There are still %r events in the buffer which will be lost",
self.partition_id,
self._cur_buffered_len
self._cur_buffered_len,
)
if self._check_max_wait_time_future:
remain_timeout = timeout_time - time.time() if timeout_time else None
try:
with self._not_empty:
# in the stop procedure, calling notify to give check_max_wait_time_future a chance to stop
# as it is waiting for Condition self._not_empty
self._not_empty.notify()
self._check_max_wait_time_future.result(remain_timeout)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning(
"Partition %r stopped with error %r",
self.partition_id,
exc
)
_LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc)
self._producer.close()

def put_events(self, events, timeout_time=None):
# Put single event or EventDataBatch into the queue.
# This method would raise OperationTimeout if the queue does not have enough space for the input and
# flush cannot finish in timeout.
with self._not_full:
try:
new_events_len = len(events)
except TypeError:
new_events_len = 1

if self._max_buffer_len - self._cur_buffered_len < new_events_len:
_LOGGER.info(
"The buffer for partition %r is full. Attempting to flush before adding %r events.",
self.partition_id,
new_events_len
)
# flush the buffer
try:
new_events_len = len(events)
except TypeError:
new_events_len = 1
if self._max_buffer_len - self._cur_buffered_len < new_events_len:
_LOGGER.info(
"The buffer for partition %r is full. Attempting to flush before adding %r events.",
self.partition_id,
new_events_len,
)
# flush the buffer
with self._lock:
self.flush(timeout_time=timeout_time)

if timeout_time and time.time() > timeout_time:
raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.")

try:
# add single event into current batch
self._cur_batch.add(events)
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
# if there are events in cur_batch, enqueue cur_batch to the buffer
if self._cur_batch:
self._buffered_queue.put(self._cur_batch)
self._buffered_queue.put(events)
# create a new batch for incoming events
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
except ValueError:
# add single event exceeds the cur batch size, create new batch
if timeout_time and time.time() > timeout_time:
raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.")
try:
# add single event into current batch
self._cur_batch.add(events)
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
# if there are events in cur_batch, enqueue cur_batch to the buffer
if self._cur_batch:
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch.add(events)
self._cur_buffered_len += new_events_len
# notify the max_wait_time worker
self._not_empty.notify()
self._buffered_queue.put(events)
# create a new batch for incoming events
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
except ValueError:
# add single event exceeds the cur batch size, create new batch
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
self._cur_batch.add(events)
self._cur_buffered_len += new_events_len

def failsafe_callback(self, callback):
def wrapper_callback(*args, **kwargs):
try:
callback(*args, **kwargs)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning(
"On partition %r, callback %r encountered exception %r",
callback.__name__,
exc,
self.partition_id
"On partition %r, callback %r encountered exception %r", callback.__name__, exc, self.partition_id
)

return wrapper_callback
Expand All @@ -147,66 +129,53 @@ def flush(self, timeout_time=None, raise_error=True):
# pylint: disable=protected-access
# try flushing all the buffered batch within given time
_LOGGER.info("Partition: %r started flushing.", self.partition_id)
with self._not_empty:
if self._cur_batch: # if there is batch, enqueue it to the buffer first
self._buffered_queue.put(self._cur_batch)
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
while self._cur_buffered_len:
remaining_time = timeout_time - time.time() if timeout_time else None
# If flush could get the semaphore, perform sending
if ((remaining_time and remaining_time > 0) or remaining_time is None) and \
self._max_concurrent_sends_semaphore.acquire(timeout=remaining_time):
batch = self._buffered_queue.get()
self._buffered_queue.task_done()
try:
_LOGGER.info("Partition %r is sending.", self.partition_id)
self._producer.send(
batch,
timeout=timeout_time - time.time() if timeout_time else None
)
_LOGGER.info(
"Partition %r sending %r events succeeded.",
self.partition_id,
len(batch)
)
self._on_success(batch._internal_events, self.partition_id)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.info(
"Partition %r sending %r events failed due to exception: %r ",
self.partition_id,
len(batch),
exc
)
self._on_error(batch._internal_events, self.partition_id, exc)
finally:
self._cur_buffered_len -= len(batch)
self._max_concurrent_sends_semaphore.release()
self._not_full.notify()
# If flush could not get the semaphore, we log and raise error if wanted
else:
if self._cur_batch: # if there is batch, enqueue it to the buffer first
self._buffered_queue.put(self._cur_batch)
while self._cur_buffered_len:
remaining_time = timeout_time - time.time() if timeout_time else None
if (remaining_time and remaining_time > 0) or remaining_time is None:
batch = self._buffered_queue.get()
self._buffered_queue.task_done()
try:
_LOGGER.info("Partition %r is sending.", self.partition_id)
self._producer.send(batch, timeout=timeout_time - time.time() if timeout_time else None)
_LOGGER.info("Partition %r sending %r events succeeded.", self.partition_id, len(batch))
self._on_success(batch._internal_events, self.partition_id)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.info(
"Partition %r fails to flush due to timeout.",
self.partition_id
"Partition %r sending %r events failed due to exception: %r ",
self.partition_id,
len(batch),
exc,
)
self._on_error(batch._internal_events, self.partition_id, exc)
finally:
self._cur_buffered_len -= len(batch)
else:
_LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id)
if raise_error:
raise OperationTimeoutError(
"Failed to flush {!r} within {}".format(self.partition_id, timeout_time)
)
if raise_error:
raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id))
break
break
# after finishing flushing, reset cur batch and put it into the buffer
self._last_send_time = time.time()
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
annatisch marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info("Partition %r finished flushing.", self.partition_id)

def check_max_wait_time_worker(self):
while self._running:
with self._not_empty:
if not self._cur_buffered_len:
_LOGGER.info("Partition %r worker is awaiting data.", self.partition_id)
self._not_empty.wait()
if self._cur_buffered_len > 0:
now_time = time.time()
_LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id)
if now_time - self._last_send_time > self._max_wait_time and self._running:
# flush the partition if the producer is running beyond the waiting time
# or the buffer is at max capacity
if (now_time - self._last_send_time > self._max_wait_time) or (
self._cur_buffered_len >= self._max_buffer_len
):
# in the worker, not raising error for flush, users can not handle this
self.flush(raise_error=False)
with self._lock:
self.flush(raise_error=False)
time.sleep(min(self._max_wait_time, 5))

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Optional, List, Callable, TYPE_CHECKING
from typing import Dict, Optional, List, Callable, Union, TYPE_CHECKING

from ._partition_resolver import PartitionResolver
from ._buffered_producer import BufferedProducer
Expand All @@ -31,9 +31,7 @@ def __init__(
*,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
max_concurrent_sends: int = 1,
executor: Optional[ThreadPoolExecutor] = None,
max_worker: Optional[int] = None
executor: Optional[Union[ThreadPoolExecutor, int]] = None
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
Expand All @@ -46,9 +44,15 @@ def __init__(
self._partition_resolver = PartitionResolver(self._partition_ids)
self._max_wait_time = max_wait_time
self._max_buffer_length = max_buffer_length
self._max_concurrent_sends = max_concurrent_sends
self._existing_executor = bool(executor)
self._executor = executor or ThreadPoolExecutor(max_worker)
self._existing_executor = False

if not executor:
self._executor = ThreadPoolExecutor()
elif isinstance(executor, ThreadPoolExecutor):
self._existing_executor = True
self._executor = executor
elif isinstance(executor, int):
self._executor = ThreadPoolExecutor(executor)

def _get_partition_id(self, partition_id, partition_key):
if partition_id:
Expand Down Expand Up @@ -77,7 +81,6 @@ def enqueue_events(self, events, *, partition_id=None, partition_key=None, timeo
self._max_message_size_on_link,
executor=self._executor,
max_wait_time=self._max_wait_time,
max_concurrent_sends=self._max_concurrent_sends,
max_buffer_length=self._max_buffer_length
)
buffered_producer.start()
Expand Down
Loading