From 4bf29b0c98c0c4f7b0b006d79cf1fc5a9f38186d Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 10:56:02 -0500 Subject: [PATCH 01/18] clean up, remove conditions, semaphores --- .../_buffered_producer/_buffered_producer.py | 174 ++++++++---------- 1 file changed, 76 insertions(+), 98 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 64ef3c7c7711..b7efe634e705 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -31,18 +31,14 @@ def __init__( *, max_wait_time: float = 1, max_concurrent_sends: int = 1, - max_buffer_length: int = 10 + max_buffer_length: int = 1500 ): - self._buffered_queue: queue.Queue = queue.Queue() + self._buffered_queue: queue.Queue = queue.Queue(max_buffer_length) self._cur_buffered_len = 0 self._executor: ThreadPoolExecutor = executor self._producer: EventHubProducer = producer self._lock = RLock() - self._not_empty = Condition(self._lock) - self._not_full = Condition(self._lock) - self._max_buffer_len = max_buffer_length self._max_concurrent_sends = max_concurrent_sends - self._max_concurrent_sends_semaphore = Semaphore(self._max_concurrent_sends) self._max_wait_time = max_wait_time self._on_success = self.failsafe_callback(on_success) self._on_error = self.failsafe_callback(on_error) @@ -54,12 +50,11 @@ def __init__( self.partition_id = partition_id def start(self): - with self._lock: - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - self._running = True - if self._max_wait_time: - self._last_send_time = time.time() - self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + self._running = True + if self._max_wait_time: + self._last_send_time = time.time() + self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) def stop(self, flush=True, timeout_time=None, raise_error=False): self._running = False @@ -75,10 +70,6 @@ def stop(self, flush=True, timeout_time=None, raise_error=False): if self._check_max_wait_time_future: remain_timeout = timeout_time - time.time() if timeout_time else None try: - with self._not_empty: - # in the stop procedure, calling notify to give check_max_wait_time_future a chance to stop - # as it is waiting for Condition self._not_empty - self._not_empty.notify() self._check_max_wait_time_future.result(remain_timeout) except Exception as exc: # pylint: disable=broad-except _LOGGER.warning( @@ -92,42 +83,36 @@ def put_events(self, events, timeout_time=None): # Put single event or EventDataBatch into the queue. # This method would raise OperationTimeout if the queue does not have enough space for the input and # flush cannot finish in timeout. - with self._not_full: - try: - new_events_len = len(events) - except TypeError: - new_events_len = 1 - - if self._max_buffer_len - self._cur_buffered_len < new_events_len: - _LOGGER.info( - "The buffer for partition %r is full. Attempting to flush before adding %r events.", - self.partition_id, - new_events_len - ) - # flush the buffer - self.flush(timeout_time=timeout_time) - - if timeout_time and time.time() > timeout_time: - raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") - - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - if self._cur_batch: - self._buffered_queue.put(self._cur_batch) - self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - except ValueError: - # add single event exceeds the cur batch size, create new batch + try: + new_events_len = len(events) + except TypeError: + new_events_len = 1 + if self._buffered_queue.maxsize - self._cur_buffered_len < new_events_len: + _LOGGER.info( + "The buffer for partition %r is full. Attempting to flush before adding %r events.", + self.partition_id, + new_events_len + ) + # flush the buffer + self.flush(timeout_time=timeout_time) + if timeout_time and time.time() > timeout_time: + raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer + if self._cur_batch: self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - self._cur_batch.add(events) - self._cur_buffered_len += new_events_len - # notify the max_wait_time worker - self._not_empty.notify() + self._buffered_queue.put(events) + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + except ValueError: + # add single event exceeds the cur batch size, create new batch + self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + self._cur_batch.add(events) + self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): def wrapper_callback(*args, **kwargs): @@ -147,50 +132,45 @@ def flush(self, timeout_time=None, raise_error=True): # pylint: disable=protected-access # try flushing all the buffered batch within given time _LOGGER.info("Partition: %r started flushing.", self.partition_id) - with self._not_empty: - if self._cur_batch: # if there is batch, enqueue it to the buffer first - self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - while self._cur_buffered_len: - remaining_time = timeout_time - time.time() if timeout_time else None - # If flush could get the semaphore, perform sending - if ((remaining_time and remaining_time > 0) or remaining_time is None) and \ - self._max_concurrent_sends_semaphore.acquire(timeout=remaining_time): - batch = self._buffered_queue.get() - self._buffered_queue.task_done() - try: - _LOGGER.info("Partition %r is sending.", self.partition_id) - self._producer.send( - batch, - timeout=timeout_time - time.time() if timeout_time else None - ) - _LOGGER.info( - "Partition %r sending %r events succeeded.", - self.partition_id, - len(batch) - ) - self._on_success(batch._internal_events, self.partition_id) - except Exception as exc: # pylint: disable=broad-except - _LOGGER.info( - "Partition %r sending %r events failed due to exception: %r ", - self.partition_id, - len(batch), - exc - ) - self._on_error(batch._internal_events, self.partition_id, exc) - finally: - self._cur_buffered_len -= len(batch) - self._max_concurrent_sends_semaphore.release() - self._not_full.notify() - # If flush could not get the semaphore, we log and raise error if wanted - else: + if self._cur_batch: # if there is batch, enqueue it to the buffer first + self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + while self._cur_buffered_len: + remaining_time = timeout_time - time.time() if timeout_time else None + # If flush could get the semaphore, perform sending + if ((remaining_time and remaining_time > 0) or remaining_time is None): + batch = self._buffered_queue.get() + self._buffered_queue.task_done() + try: + _LOGGER.info("Partition %r is sending.", self.partition_id) + self._producer.send( + batch, + timeout=timeout_time - time.time() if timeout_time else None + ) _LOGGER.info( - "Partition %r fails to flush due to timeout.", - self.partition_id + "Partition %r sending %r events succeeded.", + self.partition_id, + len(batch) ) - if raise_error: - raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) - break + self._on_success(batch._internal_events, self.partition_id) + except Exception as exc: # pylint: disable=broad-except + _LOGGER.info( + "Partition %r sending %r events failed due to exception: %r ", + self.partition_id, + len(batch), + exc + ) + self._on_error(batch._internal_events, self.partition_id, exc) + finally: + self._cur_buffered_len -= len(batch) + else: + _LOGGER.info( + "Partition %r fails to flush due to timeout.", + self.partition_id + ) + if raise_error: + raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) + break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() self._cur_batch = EventDataBatch(self._max_message_size_on_link) @@ -198,13 +178,11 @@ def flush(self, timeout_time=None, raise_error=True): def check_max_wait_time_worker(self): while self._running: - with self._not_empty: - if not self._cur_buffered_len: - _LOGGER.info("Partition %r worker is awaiting data.", self.partition_id) - self._not_empty.wait() + if not self._buffered_queue.empty(): now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - if now_time - self._last_send_time > self._max_wait_time and self._running: + #flush the partition if its beyond the waiting time or the buffer is at max capacity + if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._buffered_queue.qsize == self._buffered_queue.maxsize and self._running): # in the worker, not raising error for flush, users can not handle this self.flush(raise_error=False) time.sleep(min(self._max_wait_time, 5)) From 98ad0e084e705a4c987a3368cbfc9e0231067957 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 11:05:44 -0500 Subject: [PATCH 02/18] minor fix --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index b7efe634e705..807631ca6712 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -33,7 +33,7 @@ def __init__( max_concurrent_sends: int = 1, max_buffer_length: int = 1500 ): - self._buffered_queue: queue.Queue = queue.Queue(max_buffer_length) + self._buffered_queue: queue.Queue = queue.Queue(maxsize=max_buffer_length) self._cur_buffered_len = 0 self._executor: ThreadPoolExecutor = executor self._producer: EventHubProducer = producer From 723ec2a5dcdf856ae0ef451aab7236afe1262b33 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 11:10:17 -0500 Subject: [PATCH 03/18] remove semaphores, conditions --- .../_buffered_producer_async.py | 152 ++++++++---------- 1 file changed, 66 insertions(+), 86 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index b170fbdad140..8d11d329d92c 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -32,17 +32,13 @@ def __init__( *, max_wait_time: float = 1, max_concurrent_sends: int = 1, - max_buffer_length: int = 10 + max_buffer_length: int = 1500 ): - self._buffered_queue: queue.Queue = queue.Queue() + self._buffered_queue: queue.Queue = queue.Queue(maxsize=max_buffer_length) self._cur_buffered_len = 0 self._producer: EventHubProducer = producer self._lock = Lock() - self._not_empty = Condition(self._lock) - self._not_full = Condition(self._lock) - self._max_buffer_len = max_buffer_length self._max_concurrent_sends = max_concurrent_sends - self._max_concurrent_sends_semaphore = Semaphore(self._max_concurrent_sends) self._max_wait_time = max_wait_time self._on_success = self.failsafe_callback(on_success) self._on_error = self.failsafe_callback(on_error) @@ -75,10 +71,6 @@ async def stop(self, flush=True, timeout_time=None, raise_error=False): ) if self._check_max_wait_time_future: try: - async with self._not_empty: - # in the stop procedure, calling notify to give check_max_wait_time_future a chance to stop - # as it is waiting for Condition self._not_empty - self._not_empty.notify() await self._check_max_wait_time_future except Exception as exc: # pylint: disable=broad-except _LOGGER.warning( @@ -97,7 +89,7 @@ async def put_events(self, events, timeout_time=None): except TypeError: new_events_len = 1 - if self._max_buffer_len - self._cur_buffered_len < new_events_len: + if self._buffered_queue.maxsize - self._cur_buffered_len < new_events_len: _LOGGER.info( "The buffer for partition %r is full. Attempting to flush before adding %r events.", self.partition_id, @@ -106,28 +98,25 @@ async def put_events(self, events, timeout_time=None): # flush the buffer await self.flush(timeout_time=timeout_time) - async with self._not_full: - if timeout_time and time.time() > timeout_time: - raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") - - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - if self._cur_batch: - self._buffered_queue.put(self._cur_batch) - self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - except ValueError: - # add single event exceeds the cur batch size, create new batch + + if timeout_time and time.time() > timeout_time: + raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer + if self._cur_batch: self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - self._cur_batch.add(events) - self._cur_buffered_len += new_events_len - # notify the max_wait_time worker - self._not_empty.notify() + self._buffered_queue.put(events) + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + except ValueError: + # add single event exceeds the cur batch size, create new batch + self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + self._cur_batch.add(events) + self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): async def wrapper_callback(*args, **kwargs): @@ -147,53 +136,46 @@ async def flush(self, timeout_time=None, raise_error=True): # pylint: disable=protected-access # try flushing all the buffered batch within given time _LOGGER.info("Partition: %r started flushing.", self.partition_id) - async with self._not_empty: - if self._cur_batch: # if there is batch, enqueue it to the buffer first - self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - while self._cur_buffered_len: - remaining_time = timeout_time - time.time() if timeout_time else None - # If flush could get the semaphore, perform sending - if ((remaining_time and remaining_time > 0) or remaining_time is None) and \ - await semaphore_acquire_with_timeout( - self._max_concurrent_sends_semaphore, - timeout=remaining_time - ): - batch = self._buffered_queue.get() - self._buffered_queue.task_done() - try: - _LOGGER.info("Partition %r is sending.", self.partition_id) - await self._producer.send( - batch, - timeout=timeout_time - time.time() if timeout_time else None - ) - _LOGGER.info( - "Partition %r sending %r events succeeded.", - self.partition_id, - len(batch) - ) - await self._on_success(batch._internal_events, self.partition_id) - except Exception as exc: # pylint: disable=broad-except - _LOGGER.info( - "Partition %r sending %r events failed due to exception: %r", - self.partition_id, - len(batch), - exc - ) - await self._on_error(batch._internal_events, self.partition_id, exc) - finally: - self._cur_buffered_len -= len(batch) - self._max_concurrent_sends_semaphore.release() - self._not_full.notify() - # If flush could not get the semaphore, we log and raise error if wanted - else: + if self._cur_batch: # if there is batch, enqueue it to the buffer first + self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + while self._cur_buffered_len: + remaining_time = timeout_time - time.time() if timeout_time else None + # If flush could get the semaphore, perform sending + if ((remaining_time and remaining_time > 0) or remaining_time is None): + batch = self._buffered_queue.get() + self._buffered_queue.task_done() + try: + _LOGGER.info("Partition %r is sending.", self.partition_id) + await self._producer.send( + batch, + timeout=timeout_time - time.time() if timeout_time else None + ) + _LOGGER.info( + "Partition %r sending %r events succeeded.", + self.partition_id, + len(batch) + ) + await self._on_success(batch._internal_events, self.partition_id) + except Exception as exc: # pylint: disable=broad-except _LOGGER.info( - "Partition %r fails to flush due to timeout.", - self.partition_id + "Partition %r sending %r events failed due to exception: %r", + self.partition_id, + len(batch), + exc ) - if raise_error: - raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) - break + await self._on_error(batch._internal_events, self.partition_id, exc) + finally: + self._cur_buffered_len -= len(batch) + # If flush could not get the semaphore, we log and raise error if wanted + else: + _LOGGER.info( + "Partition %r fails to flush due to timeout.", + self.partition_id + ) + if raise_error: + raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) + break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() self._cur_batch = EventDataBatch(self._max_message_size_on_link) @@ -201,15 +183,13 @@ async def flush(self, timeout_time=None, raise_error=True): async def check_max_wait_time_worker(self): while self._running: - async with self._not_empty: - if not self._cur_buffered_len: - _LOGGER.info("Partition %r worker is awaiting data.", self.partition_id) - await self._not_empty.wait() - now_time = time.time() - _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - if now_time - self._last_send_time > self._max_wait_time and self._running: - # in the worker, not raising error for flush, users can not handle this - await self.flush(raise_error=False) + if not self._buffered_queue.empty(): + now_time = time.time() + _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) + #flush the partition if its beyond the waiting time or the buffer is at max capacity + if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._buffered_queue.qsize == self._buffered_queue.maxsize and self._running): + # in the worker, not raising error for flush, users can not handle this + await self.flush(raise_error=False) await asyncio.sleep(min(self._max_wait_time, 5)) @property From 569cc38d4c5e6a0d472fbdc43e9b89ec73303581 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 11:10:36 -0500 Subject: [PATCH 04/18] minor fixes --- .../eventhub/_buffered_producer/_buffered_producer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 807631ca6712..dc3604bc1070 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -50,11 +50,12 @@ def __init__( self.partition_id = partition_id def start(self): - self._cur_batch = EventDataBatch(self._max_message_size_on_link) - self._running = True - if self._max_wait_time: - self._last_send_time = time.time() - self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) + with self._lock: + self._cur_batch = EventDataBatch(self._max_message_size_on_link) + self._running = True + if self._max_wait_time: + self._last_send_time = time.time() + self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) def stop(self, flush=True, timeout_time=None, raise_error=False): self._running = False From 759c2c649c92b47df57a47fe2acb00318eb47526 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 12:43:14 -0500 Subject: [PATCH 05/18] minor changs on queue length --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 5 +++-- .../aio/_buffered_producer/_buffered_producer_async.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index dc3604bc1070..3b9df7e90eea 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -33,7 +33,8 @@ def __init__( max_concurrent_sends: int = 1, max_buffer_length: int = 1500 ): - self._buffered_queue: queue.Queue = queue.Queue(maxsize=max_buffer_length) + self._buffered_queue: queue.Queue = queue.Queue() + self._max_buffer_len = max_buffer_length self._cur_buffered_len = 0 self._executor: ThreadPoolExecutor = executor self._producer: EventHubProducer = producer @@ -183,7 +184,7 @@ def check_max_wait_time_worker(self): now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) #flush the partition if its beyond the waiting time or the buffer is at max capacity - if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._buffered_queue.qsize == self._buffered_queue.maxsize and self._running): + if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._cur_buffered_len >= self._max_buffer_len and self._running): # in the worker, not raising error for flush, users can not handle this self.flush(raise_error=False) time.sleep(min(self._max_wait_time, 5)) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 8d11d329d92c..12da884a12a7 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -34,7 +34,8 @@ def __init__( max_concurrent_sends: int = 1, max_buffer_length: int = 1500 ): - self._buffered_queue: queue.Queue = queue.Queue(maxsize=max_buffer_length) + self._buffered_queue: queue.Queue = queue.Queue() + self._max_buffer_len = max_buffer_length self._cur_buffered_len = 0 self._producer: EventHubProducer = producer self._lock = Lock() @@ -187,7 +188,7 @@ async def check_max_wait_time_worker(self): now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) #flush the partition if its beyond the waiting time or the buffer is at max capacity - if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._buffered_queue.qsize == self._buffered_queue.maxsize and self._running): + if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._cur_buffered_len >= self._max_buffer_len and self._running): # in the worker, not raising error for flush, users can not handle this await self.flush(raise_error=False) await asyncio.sleep(min(self._max_wait_time, 5)) From b7ca17f05da08acc6958d7e3cd5211f111e53f3c Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 9 May 2022 15:10:37 -0500 Subject: [PATCH 06/18] expose buffer_concurrency --- .../_buffered_producer_dispatcher.py | 12 +++++++++--- .../azure/eventhub/_producer_client.py | 13 +++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py index 7c8777ce16fb..9ab305baa379 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py @@ -5,7 +5,7 @@ import logging from threading import Lock from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Optional, List, Callable, TYPE_CHECKING +from typing import Dict, Optional, List, Callable, Union, TYPE_CHECKING from ._partition_resolver import PartitionResolver from ._buffered_producer import BufferedProducer @@ -32,7 +32,7 @@ def __init__( max_buffer_length: int = 1500, max_wait_time: float = 1, max_concurrent_sends: int = 1, - executor: Optional[ThreadPoolExecutor] = None, + executor: Optional[Union[ThreadPoolExecutor, int]] = None, max_worker: Optional[int] = None ): self._buffered_producers: Dict[str, BufferedProducer] = {} @@ -48,7 +48,13 @@ def __init__( self._max_buffer_length = max_buffer_length self._max_concurrent_sends = max_concurrent_sends self._existing_executor = bool(executor) - self._executor = executor or ThreadPoolExecutor(max_worker) + + if not executor: + self._executor = ThreadPoolExecutor(max_worker) + elif isinstance(executor, ThreadPoolExecutor): + self._executor = executor + elif isinstance(executor, int): + self._executor = ThreadPoolExecutor(executor) def _get_partition_id(self, partition_id, partition_key): if partition_id: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 3ccdcbd76a43..3b1399143257 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -2,6 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +from concurrent.futures import ThreadPoolExecutor import logging import threading import time @@ -53,6 +54,9 @@ class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api or ~azure.core.credentials.AzureNamedKeyCredential :keyword bool buffered_mode: If True, the producer client will collect events in a buffer, efficiently batch, then publish. Default is False. + :keyword Union[ThreadPoolExecutor, int] buffer_concurrency: The ThreadPoolExecutor to be used for publishing events + or the number of workers for the ThreadPoolExecutor. Default is none and a ThreadPoolExecutor with the default number of workers + will be created https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor :keyword on_success: The callback to be called once a batch has been successfully published. The callback takes two parameters: - `events`: The list of events that have been successfully published @@ -147,6 +151,7 @@ def __init__( credential: "CredentialTypes", *, buffered_mode: Literal[True], + buffer_concurrency: Union[ThreadPoolExecutor, int] = None, on_error: Callable[[SendEventTypes, Optional[str], Exception], None], on_success: Callable[[SendEventTypes, Optional[str]], None], max_buffer_length: int = 1500, @@ -187,10 +192,8 @@ def __init__( self._buffered_producer_dispatcher = None self._max_wait_time = max_wait_time self._max_buffer_length = max_buffer_length - # the following two parameters are not part of the public api yet - # which could be exposed in the future if needed - self._executor = kwargs.get("executor") - self._max_worker = kwargs.get("max_worker") + self._executor = kwargs.get("buffer_concurrency") + self._max_worker = None if self._buffered_mode: setattr(self, "send_batch", self._buffered_send_batch) @@ -211,6 +214,8 @@ def __init__( self._max_buffer_length = 1500 if self._max_buffer_length <= 0: raise ValueError("'max_buffer_length' must be an integer greater than 0 in buffered mode") + if isinstance(self._executor, int) and self._executor <= 0: + raise ValueError("'buffer_concurrency' must be an integer greater than 0 in buffered mode") def __enter__(self): return self From 46a74314209f53ee7d7a3cf709533ae0db7d134b Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 23 May 2022 08:42:50 -0700 Subject: [PATCH 07/18] remove max_concurrent_sends --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 2 -- .../_buffered_producer/_buffered_producer_dispatcher.py | 3 --- .../_buffered_producer/_buffered_producer_dispatcher_async.py | 3 --- 3 files changed, 8 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 3b9df7e90eea..b0f9da3a3d72 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -30,7 +30,6 @@ def __init__( executor: ThreadPoolExecutor, *, max_wait_time: float = 1, - max_concurrent_sends: int = 1, max_buffer_length: int = 1500 ): self._buffered_queue: queue.Queue = queue.Queue() @@ -39,7 +38,6 @@ def __init__( self._executor: ThreadPoolExecutor = executor self._producer: EventHubProducer = producer self._lock = RLock() - self._max_concurrent_sends = max_concurrent_sends self._max_wait_time = max_wait_time self._on_success = self.failsafe_callback(on_success) self._on_error = self.failsafe_callback(on_error) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py index 9ab305baa379..31da0ad9f10b 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py @@ -31,7 +31,6 @@ def __init__( *, max_buffer_length: int = 1500, max_wait_time: float = 1, - max_concurrent_sends: int = 1, executor: Optional[Union[ThreadPoolExecutor, int]] = None, max_worker: Optional[int] = None ): @@ -46,7 +45,6 @@ def __init__( self._partition_resolver = PartitionResolver(self._partition_ids) self._max_wait_time = max_wait_time self._max_buffer_length = max_buffer_length - self._max_concurrent_sends = max_concurrent_sends self._existing_executor = bool(executor) if not executor: @@ -83,7 +81,6 @@ def enqueue_events(self, events, *, partition_id=None, partition_key=None, timeo self._max_message_size_on_link, executor=self._executor, max_wait_time=self._max_wait_time, - max_concurrent_sends=self._max_concurrent_sends, max_buffer_length=self._max_buffer_length ) buffered_producer.start() diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_dispatcher_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_dispatcher_async.py index 619c28f5f2a9..6d67d2fd8ab0 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_dispatcher_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_dispatcher_async.py @@ -31,7 +31,6 @@ def __init__( *, max_buffer_length: int = 1500, max_wait_time: float = 1, - max_concurrent_sends: int = 1 ): self._buffered_producers: Dict[str, BufferedProducer] = {} self._partition_ids: List[str] = partitions @@ -44,7 +43,6 @@ def __init__( self._partition_resolver = PartitionResolver(self._partition_ids) self._max_wait_time = max_wait_time self._max_buffer_length = max_buffer_length - self._max_concurrent_sends = max_concurrent_sends async def _get_partition_id(self, partition_id, partition_key): if partition_id: @@ -72,7 +70,6 @@ async def enqueue_events(self, events, *, partition_id=None, partition_key=None, self._on_error, self._max_message_size_on_link, max_wait_time=self._max_wait_time, - max_concurrent_sends=self._max_concurrent_sends, max_buffer_length=self._max_buffer_length ) await buffered_producer.start() From 5c5c18d4897ad1f2d5075a3507e0efd8946a54db Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 23 May 2022 08:44:46 -0700 Subject: [PATCH 08/18] make buffer size reqd --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 2 +- .../aio/_buffered_producer/_buffered_producer_async.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index b0f9da3a3d72..d176d6a334d3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -30,7 +30,7 @@ def __init__( executor: ThreadPoolExecutor, *, max_wait_time: float = 1, - max_buffer_length: int = 1500 + max_buffer_length: int ): self._buffered_queue: queue.Queue = queue.Queue() self._max_buffer_len = max_buffer_length diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 12da884a12a7..3d11ef3003b1 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -31,15 +31,13 @@ def __init__( max_message_size_on_link: int, *, max_wait_time: float = 1, - max_concurrent_sends: int = 1, - max_buffer_length: int = 1500 + max_buffer_length: int ): self._buffered_queue: queue.Queue = queue.Queue() self._max_buffer_len = max_buffer_length self._cur_buffered_len = 0 self._producer: EventHubProducer = producer self._lock = Lock() - self._max_concurrent_sends = max_concurrent_sends self._max_wait_time = max_wait_time self._on_success = self.failsafe_callback(on_success) self._on_error = self.failsafe_callback(on_error) From 1d5e9d6ccfa9aeb04658e2c8ab85b4b18b4741ef Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 23 May 2022 08:46:11 -0700 Subject: [PATCH 09/18] remove comment --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 1 - .../eventhub/aio/_buffered_producer/_buffered_producer_async.py | 1 - 2 files changed, 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index d176d6a334d3..85a0db439abb 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -137,7 +137,6 @@ def flush(self, timeout_time=None, raise_error=True): self._cur_batch = EventDataBatch(self._max_message_size_on_link) while self._cur_buffered_len: remaining_time = timeout_time - time.time() if timeout_time else None - # If flush could get the semaphore, perform sending if ((remaining_time and remaining_time > 0) or remaining_time is None): batch = self._buffered_queue.get() self._buffered_queue.task_done() diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 3d11ef3003b1..8b7df0b5facc 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -140,7 +140,6 @@ async def flush(self, timeout_time=None, raise_error=True): self._cur_batch = EventDataBatch(self._max_message_size_on_link) while self._cur_buffered_len: remaining_time = timeout_time - time.time() if timeout_time else None - # If flush could get the semaphore, perform sending if ((remaining_time and remaining_time > 0) or remaining_time is None): batch = self._buffered_queue.get() self._buffered_queue.task_done() From ff82f189555d9e6cb606060bd35ffde623bd67b5 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Mon, 23 May 2022 09:33:12 -0700 Subject: [PATCH 10/18] add locks around flush --- .../_buffered_producer/_buffered_producer.py | 13 +++++++------ .../_buffered_producer_dispatcher.py | 3 ++- .../azure/eventhub/_producer_client.py | 4 ++-- .../_buffered_producer/_buffered_producer_async.py | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 85a0db439abb..65e910eaa41e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -59,7 +59,8 @@ def start(self): def stop(self, flush=True, timeout_time=None, raise_error=False): self._running = False if flush: - self.flush(timeout_time=timeout_time, raise_error=raise_error) + with self._lock: + self.flush(timeout_time=timeout_time, raise_error=raise_error) else: if self._cur_buffered_len: _LOGGER.warning( @@ -94,7 +95,8 @@ def put_events(self, events, timeout_time=None): new_events_len ) # flush the buffer - self.flush(timeout_time=timeout_time) + with self._lock: + self.flush(timeout_time=timeout_time) if timeout_time and time.time() > timeout_time: raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") try: @@ -134,7 +136,6 @@ def flush(self, timeout_time=None, raise_error=True): _LOGGER.info("Partition: %r started flushing.", self.partition_id) if self._cur_batch: # if there is batch, enqueue it to the buffer first self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link) while self._cur_buffered_len: remaining_time = timeout_time - time.time() if timeout_time else None if ((remaining_time and remaining_time > 0) or remaining_time is None): @@ -168,7 +169,7 @@ def flush(self, timeout_time=None, raise_error=True): self.partition_id ) if raise_error: - raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) + raise OperationTimeoutError("Failed to flush {!r} within {}".format(self.partition_id, timeout_time)) break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() @@ -180,8 +181,8 @@ def check_max_wait_time_worker(self): if not self._buffered_queue.empty(): now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - #flush the partition if its beyond the waiting time or the buffer is at max capacity - if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._cur_buffered_len >= self._max_buffer_len and self._running): + #flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity + if (now_time - self._last_send_time > self._max_wait_time) or (self._cur_buffered_len >= self._max_buffer_len): # in the worker, not raising error for flush, users can not handle this self.flush(raise_error=False) time.sleep(min(self._max_wait_time, 5)) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py index 31da0ad9f10b..96be4e84ec11 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py @@ -45,13 +45,14 @@ def __init__( self._partition_resolver = PartitionResolver(self._partition_ids) self._max_wait_time = max_wait_time self._max_buffer_length = max_buffer_length - self._existing_executor = bool(executor) + self._existing_executor = False if not executor: self._executor = ThreadPoolExecutor(max_worker) elif isinstance(executor, ThreadPoolExecutor): self._executor = executor elif isinstance(executor, int): + self._existing_executor = True self._executor = ThreadPoolExecutor(executor) def _get_partition_id(self, partition_id, partition_key): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 3b1399143257..0d2f1117b845 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -55,8 +55,8 @@ class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api :keyword bool buffered_mode: If True, the producer client will collect events in a buffer, efficiently batch, then publish. Default is False. :keyword Union[ThreadPoolExecutor, int] buffer_concurrency: The ThreadPoolExecutor to be used for publishing events - or the number of workers for the ThreadPoolExecutor. Default is none and a ThreadPoolExecutor with the default number of workers - will be created https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor + or the number of workers for the ThreadPoolExecutor. Default is none and a ThreadPoolExecutor with the default number of workers + will be created https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor :keyword on_success: The callback to be called once a batch has been successfully published. The callback takes two parameters: - `events`: The list of events that have been successfully published diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 8b7df0b5facc..08d61870448b 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -172,7 +172,7 @@ async def flush(self, timeout_time=None, raise_error=True): self.partition_id ) if raise_error: - raise OperationTimeoutError("Failed to flush {!r}".format(self.partition_id)) + raise OperationTimeoutError("Failed to flush {!r} within {}".format(self.partition_id, timeout_time)) break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() From 4f27390c298e879e7e77cb0ceb1ad3cbb04795ea Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Tue, 24 May 2022 10:48:32 -0700 Subject: [PATCH 11/18] use the right counter to track q size --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 65e910eaa41e..e03e583dc0f7 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -57,6 +57,7 @@ def start(self): self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) def stop(self, flush=True, timeout_time=None, raise_error=False): + self._running = False if flush: with self._lock: @@ -178,13 +179,14 @@ def flush(self, timeout_time=None, raise_error=True): def check_max_wait_time_worker(self): while self._running: - if not self._buffered_queue.empty(): + if self._cur_buffered_len > 0: now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) #flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity if (now_time - self._last_send_time > self._max_wait_time) or (self._cur_buffered_len >= self._max_buffer_len): # in the worker, not raising error for flush, users can not handle this self.flush(raise_error=False) + time.sleep(min(self._max_wait_time, 5)) @property From 9216245d8dd990dbad1ddd04a181609ce3d7cfde Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Tue, 24 May 2022 15:49:48 -0700 Subject: [PATCH 12/18] use the correct count for the q --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index e03e583dc0f7..185eacca78cd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -89,7 +89,7 @@ def put_events(self, events, timeout_time=None): new_events_len = len(events) except TypeError: new_events_len = 1 - if self._buffered_queue.maxsize - self._cur_buffered_len < new_events_len: + if self._max_buffer_len - self._cur_buffered_len < new_events_len: _LOGGER.info( "The buffer for partition %r is full. Attempting to flush before adding %r events.", self.partition_id, @@ -186,7 +186,6 @@ def check_max_wait_time_worker(self): if (now_time - self._last_send_time > self._max_wait_time) or (self._cur_buffered_len >= self._max_buffer_len): # in the worker, not raising error for flush, users can not handle this self.flush(raise_error=False) - time.sleep(min(self._max_wait_time, 5)) @property From c4a03491ab488bba31f7de363913b1d5e750930d Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Tue, 24 May 2022 16:22:30 -0700 Subject: [PATCH 13/18] locks and right q size var for async --- .../aio/_buffered_producer/_buffered_producer_async.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 08d61870448b..61de52505329 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -59,7 +59,8 @@ async def start(self): async def stop(self, flush=True, timeout_time=None, raise_error=False): self._running = False if flush: - await self.flush(timeout_time=timeout_time, raise_error=raise_error) + async with self._lock: + await self.flush(timeout_time=timeout_time, raise_error=raise_error) else: if self._cur_buffered_len: _LOGGER.warning( @@ -88,14 +89,15 @@ async def put_events(self, events, timeout_time=None): except TypeError: new_events_len = 1 - if self._buffered_queue.maxsize - self._cur_buffered_len < new_events_len: + if self._max_buffer_len - self._cur_buffered_len < new_events_len: _LOGGER.info( "The buffer for partition %r is full. Attempting to flush before adding %r events.", self.partition_id, new_events_len ) # flush the buffer - await self.flush(timeout_time=timeout_time) + async with self._lock: + await self.flush(timeout_time=timeout_time) if timeout_time and time.time() > timeout_time: @@ -181,7 +183,7 @@ async def flush(self, timeout_time=None, raise_error=True): async def check_max_wait_time_worker(self): while self._running: - if not self._buffered_queue.empty(): + if self._max_buffer_len > 0: now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) #flush the partition if its beyond the waiting time or the buffer is at max capacity From 17db49eb34702fd44e1162a5a95a85cc2d7110ad Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Tue, 24 May 2022 16:23:39 -0700 Subject: [PATCH 14/18] clean imports --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 185eacca78cd..020f1a1cd5d8 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -5,7 +5,7 @@ import time import queue import logging -from threading import RLock, Condition, Semaphore +from threading import RLock from concurrent.futures import ThreadPoolExecutor from typing import Optional, Callable, TYPE_CHECKING From 24c1b90d6c3c2a4ddcf59dad00d04417b791dddb Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Wed, 25 May 2022 09:44:17 -0700 Subject: [PATCH 15/18] lock for bg worker --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 020f1a1cd5d8..b3e719b701cb 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -185,7 +185,8 @@ def check_max_wait_time_worker(self): #flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity if (now_time - self._last_send_time > self._max_wait_time) or (self._cur_buffered_len >= self._max_buffer_len): # in the worker, not raising error for flush, users can not handle this - self.flush(raise_error=False) + with self._lock: + self.flush(raise_error=False) time.sleep(min(self._max_wait_time, 5)) @property From 1af3ddb06f82fe017feb40d29407f97da2fbf59a Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Wed, 25 May 2022 16:40:13 -0700 Subject: [PATCH 16/18] formatting fixes for pylin --- .../_buffered_producer/_buffered_producer.py | 68 +++++++--------- .../azure/eventhub/_producer_client.py | 81 +++++-------------- .../_buffered_producer_async.py | 73 +++++++---------- 3 files changed, 75 insertions(+), 147 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index b3e719b701cb..ec8936c33716 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -12,6 +12,7 @@ from .._producer import EventHubProducer from .._common import EventDataBatch from ..exceptions import OperationTimeoutError + if TYPE_CHECKING: from .._producer_client import SendEventTypes @@ -21,16 +22,16 @@ class BufferedProducer: # pylint: disable=too-many-instance-attributes def __init__( - self, - producer: EventHubProducer, - partition_id: str, - on_success: Callable[["SendEventTypes", Optional[str]], None], - on_error: Callable[["SendEventTypes", Optional[str], Exception], None], - max_message_size_on_link: int, - executor: ThreadPoolExecutor, - *, - max_wait_time: float = 1, - max_buffer_length: int + self, + producer: EventHubProducer, + partition_id: str, + on_success: Callable[["SendEventTypes", Optional[str]], None], + on_error: Callable[["SendEventTypes", Optional[str], Exception], None], + max_message_size_on_link: int, + executor: ThreadPoolExecutor, + *, + max_wait_time: float = 1, + max_buffer_length: int ): self._buffered_queue: queue.Queue = queue.Queue() self._max_buffer_len = max_buffer_length @@ -57,7 +58,7 @@ def start(self): self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker) def stop(self, flush=True, timeout_time=None, raise_error=False): - + self._running = False if flush: with self._lock: @@ -67,18 +68,14 @@ def stop(self, flush=True, timeout_time=None, raise_error=False): _LOGGER.warning( "Shutting down Partition %r. There are still %r events in the buffer which will be lost", self.partition_id, - self._cur_buffered_len + self._cur_buffered_len, ) if self._check_max_wait_time_future: remain_timeout = timeout_time - time.time() if timeout_time else None try: self._check_max_wait_time_future.result(remain_timeout) except Exception as exc: # pylint: disable=broad-except - _LOGGER.warning( - "Partition %r stopped with error %r", - self.partition_id, - exc - ) + _LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc) self._producer.close() def put_events(self, events, timeout_time=None): @@ -93,7 +90,7 @@ def put_events(self, events, timeout_time=None): _LOGGER.info( "The buffer for partition %r is full. Attempting to flush before adding %r events.", self.partition_id, - new_events_len + new_events_len, ) # flush the buffer with self._lock: @@ -123,10 +120,7 @@ def wrapper_callback(*args, **kwargs): callback(*args, **kwargs) except Exception as exc: # pylint: disable=broad-except _LOGGER.warning( - "On partition %r, callback %r encountered exception %r", - callback.__name__, - exc, - self.partition_id + "On partition %r, callback %r encountered exception %r", callback.__name__, exc, self.partition_id ) return wrapper_callback @@ -139,38 +133,30 @@ def flush(self, timeout_time=None, raise_error=True): self._buffered_queue.put(self._cur_batch) while self._cur_buffered_len: remaining_time = timeout_time - time.time() if timeout_time else None - if ((remaining_time and remaining_time > 0) or remaining_time is None): + if (remaining_time and remaining_time > 0) or remaining_time is None: batch = self._buffered_queue.get() self._buffered_queue.task_done() try: _LOGGER.info("Partition %r is sending.", self.partition_id) - self._producer.send( - batch, - timeout=timeout_time - time.time() if timeout_time else None - ) - _LOGGER.info( - "Partition %r sending %r events succeeded.", - self.partition_id, - len(batch) - ) + self._producer.send(batch, timeout=timeout_time - time.time() if timeout_time else None) + _LOGGER.info("Partition %r sending %r events succeeded.", self.partition_id, len(batch)) self._on_success(batch._internal_events, self.partition_id) except Exception as exc: # pylint: disable=broad-except _LOGGER.info( "Partition %r sending %r events failed due to exception: %r ", self.partition_id, len(batch), - exc + exc, ) self._on_error(batch._internal_events, self.partition_id, exc) finally: self._cur_buffered_len -= len(batch) else: - _LOGGER.info( - "Partition %r fails to flush due to timeout.", - self.partition_id - ) + _LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id) if raise_error: - raise OperationTimeoutError("Failed to flush {!r} within {}".format(self.partition_id, timeout_time)) + raise OperationTimeoutError( + "Failed to flush {!r} within {}".format(self.partition_id, timeout_time) + ) break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() @@ -182,8 +168,10 @@ def check_max_wait_time_worker(self): if self._cur_buffered_len > 0: now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - #flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity - if (now_time - self._last_send_time > self._max_wait_time) or (self._cur_buffered_len >= self._max_buffer_len): + # flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity + if (now_time - self._last_send_time > self._max_wait_time) or ( + self._cur_buffered_len >= self._max_buffer_len + ): # in the worker, not raising error for flush, users can not handle this with self._lock: self.flush(raise_error=False) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 0d2f1117b845..199917dc08d5 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -38,7 +38,7 @@ _LOGGER = logging.getLogger(__name__) -class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api-version-keyword +class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api-version-keyword # pylint: disable=too-many-instance-attributes """The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service. @@ -180,9 +180,7 @@ def __init__( network_tracing=kwargs.get("logging_enable"), **kwargs ) - self._producers = { - ALL_PARTITIONS: self._create_producer() - } # type: Dict[str, Optional[EventHubProducer]] + self._producers = {ALL_PARTITIONS: self._create_producer()} # type: Dict[str, Optional[EventHubProducer]] self._max_message_size_on_link = 0 self._partition_ids = None # Optional[List[str]] self._lock = threading.Lock() @@ -239,7 +237,7 @@ def _buffered_send(self, events, **kwargs): max_wait_time=self._max_wait_time, max_buffer_length=self._max_buffer_length, executor=self._executor, - max_worker=self._max_worker + max_worker=self._max_worker, ) self._buffered_producer_dispatcher.enqueue_events(events, **kwargs) @@ -255,12 +253,8 @@ def _batch_preparer(self, event_data_batch, **kwargs): ) to_send_batch = event_data_batch else: - to_send_batch = self.create_batch( - partition_id=partition_id, partition_key=partition_key - ) - to_send_batch._load_events( # pylint:disable=protected-access - event_data_batch - ) + to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) + to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access return to_send_batch, to_send_batch._partition_id, partition_key # pylint:disable=protected-access @@ -273,12 +267,7 @@ def _buffered_send_batch(self, event_data_batch, **kwargs): timeout = kwargs.get("timeout") timeout_time = time.time() + timeout if timeout else None - self._buffered_send( - event_data_batch, - partition_id=pid, - partition_key=pkey, - timeout_time=timeout_time - ) + self._buffered_send(event_data_batch, partition_id=pid, partition_key=pkey, timeout_time=timeout_time) def _buffered_send_event(self, event, **kwargs): partition_key = kwargs.get("partition_key") @@ -286,10 +275,7 @@ def _buffered_send_event(self, event, **kwargs): timeout = kwargs.get("timeout") timeout_time = time.time() + timeout if timeout else None self._buffered_send( - event, - partition_id=kwargs.get("partition_id"), - partition_key=partition_key, - timeout_time=timeout_time + event, partition_id=kwargs.get("partition_id"), partition_key=partition_key, timeout_time=timeout_time ) def _get_partitions(self): @@ -304,13 +290,9 @@ def _get_max_message_size(self): # pylint: disable=protected-access,line-too-long with self._lock: if not self._max_message_size_on_link: - cast( - EventHubProducer, self._producers[ALL_PARTITIONS] - )._open_with_retry() + cast(EventHubProducer, self._producers[ALL_PARTITIONS])._open_with_retry() self._max_message_size_on_link = ( - self._producers[ # type: ignore - ALL_PARTITIONS - ]._handler.message_handler._link.peer_max_message_size + self._producers[ALL_PARTITIONS]._handler.message_handler._link.peer_max_message_size # type: ignore or constants.MAX_MESSAGE_LENGTH_BYTES ) @@ -318,33 +300,21 @@ def _start_producer(self, partition_id, send_timeout): # type: (str, Optional[Union[int, float]]) -> None with self._lock: self._get_partitions() - if ( - partition_id not in cast(List[str], self._partition_ids) - and partition_id != ALL_PARTITIONS - ): + if partition_id not in cast(List[str], self._partition_ids) and partition_id != ALL_PARTITIONS: raise ConnectError( - "Invalid partition {} for the event hub {}".format( - partition_id, self.eventhub_name - ) + "Invalid partition {} for the event hub {}".format(partition_id, self.eventhub_name) ) - if ( - not self._producers[partition_id] - or cast(EventHubProducer, self._producers[partition_id]).closed - ): + if not self._producers[partition_id] or cast(EventHubProducer, self._producers[partition_id]).closed: self._producers[partition_id] = self._create_producer( - partition_id=( - None if partition_id == ALL_PARTITIONS else partition_id - ), + partition_id=(None if partition_id == ALL_PARTITIONS else partition_id), send_timeout=send_timeout, ) def _create_producer(self, partition_id=None, send_timeout=None): # type: (Optional[str], Optional[Union[int, float]]) -> EventHubProducer target = "amqps://{}{}".format(self._address.hostname, self._address.path) - send_timeout = ( - self._config.send_timeout if send_timeout is None else send_timeout - ) + send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( self, @@ -751,9 +721,7 @@ def get_partition_properties(self, partition_id): :rtype: Dict[str, Any] :raises: :class:`EventHubError` """ - return super(EventHubProducerClient, self)._get_partition_properties( - partition_id - ) + return super(EventHubProducerClient, self)._get_partition_properties(partition_id) def flush(self, **kwargs: Any) -> None: """ @@ -770,12 +738,7 @@ def flush(self, **kwargs: Any) -> None: timeout_time = time.time() + timeout if timeout else None self._buffered_producer_dispatcher.flush(timeout_time=timeout_time) - def close( - self, - *, - flush: bool = True, - **kwargs: Any - ) -> None: + def close(self, *, flush: bool = True, **kwargs: Any) -> None: """Close the Producer client underlying AMQP connection and links. :keyword bool flush: Buffered mode only. If set to True, events in the buffer will be sent @@ -821,10 +784,9 @@ def get_buffered_event_count(self, partition_id: str) -> Optional[int]: return None try: - return cast( - BufferedProducerDispatcher, - self._buffered_producer_dispatcher - ).get_buffered_event_count(partition_id) + return cast(BufferedProducerDispatcher, self._buffered_producer_dispatcher).get_buffered_event_count( + partition_id + ) except AttributeError: return 0 @@ -840,9 +802,6 @@ def total_buffered_event_count(self) -> Optional[int]: return None try: - return cast( - BufferedProducerDispatcher, - self._buffered_producer_dispatcher - ).total_buffered_event_count + return cast(BufferedProducerDispatcher, self._buffered_producer_dispatcher).total_buffered_event_count except AttributeError: return 0 diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 61de52505329..014787bb5ddd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -6,10 +6,9 @@ import logging import queue import time -from asyncio import Lock, Condition, Semaphore +from asyncio import Lock from typing import Optional, Callable, Awaitable, TYPE_CHECKING -from .._async_utils import semaphore_acquire_with_timeout from .._producer_async import EventHubProducer from ..._common import EventDataBatch from ...exceptions import OperationTimeoutError @@ -23,15 +22,15 @@ class BufferedProducer: # pylint: disable=too-many-instance-attributes def __init__( - self, - producer: EventHubProducer, - partition_id: str, - on_success: Callable[["SendEventTypes", Optional[str]], Awaitable[None]], - on_error: Callable[["SendEventTypes", Optional[str], Exception], Awaitable[None]], - max_message_size_on_link: int, - *, - max_wait_time: float = 1, - max_buffer_length: int + self, + producer: EventHubProducer, + partition_id: str, + on_success: Callable[["SendEventTypes", Optional[str]], Awaitable[None]], + on_error: Callable[["SendEventTypes", Optional[str], Exception], Awaitable[None]], + max_message_size_on_link: int, + *, + max_wait_time: float = 1, + max_buffer_length: int ): self._buffered_queue: queue.Queue = queue.Queue() self._max_buffer_len = max_buffer_length @@ -64,20 +63,15 @@ async def stop(self, flush=True, timeout_time=None, raise_error=False): else: if self._cur_buffered_len: _LOGGER.warning( - "Shutting down Partition %r." - " There are still %r events in the buffer which will be lost", + "Shutting down Partition %r." " There are still %r events in the buffer which will be lost", self.partition_id, - self._cur_buffered_len + self._cur_buffered_len, ) if self._check_max_wait_time_future: try: await self._check_max_wait_time_future except Exception as exc: # pylint: disable=broad-except - _LOGGER.warning( - "Partition %r stopped with error %r", - self.partition_id, - exc - ) + _LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc) await self._producer.close() async def put_events(self, events, timeout_time=None): @@ -93,13 +87,12 @@ async def put_events(self, events, timeout_time=None): _LOGGER.info( "The buffer for partition %r is full. Attempting to flush before adding %r events.", self.partition_id, - new_events_len + new_events_len, ) # flush the buffer async with self._lock: await self.flush(timeout_time=timeout_time) - if timeout_time and time.time() > timeout_time: raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.") try: @@ -125,10 +118,7 @@ async def wrapper_callback(*args, **kwargs): await callback(*args, **kwargs) except Exception as exc: # pylint: disable=broad-except _LOGGER.warning( - "On partition %r, callback %r encountered exception %r", - callback.__name__, - exc, - self.partition_id + "On partition %r, callback %r encountered exception %r", callback.__name__, exc, self.partition_id ) return wrapper_callback @@ -142,39 +132,28 @@ async def flush(self, timeout_time=None, raise_error=True): self._cur_batch = EventDataBatch(self._max_message_size_on_link) while self._cur_buffered_len: remaining_time = timeout_time - time.time() if timeout_time else None - if ((remaining_time and remaining_time > 0) or remaining_time is None): + if (remaining_time and remaining_time > 0) or remaining_time is None: batch = self._buffered_queue.get() self._buffered_queue.task_done() try: _LOGGER.info("Partition %r is sending.", self.partition_id) - await self._producer.send( - batch, - timeout=timeout_time - time.time() if timeout_time else None - ) - _LOGGER.info( - "Partition %r sending %r events succeeded.", - self.partition_id, - len(batch) - ) + await self._producer.send(batch, timeout=timeout_time - time.time() if timeout_time else None) + _LOGGER.info("Partition %r sending %r events succeeded.", self.partition_id, len(batch)) await self._on_success(batch._internal_events, self.partition_id) except Exception as exc: # pylint: disable=broad-except _LOGGER.info( - "Partition %r sending %r events failed due to exception: %r", - self.partition_id, - len(batch), - exc + "Partition %r sending %r events failed due to exception: %r", self.partition_id, len(batch), exc ) await self._on_error(batch._internal_events, self.partition_id, exc) finally: self._cur_buffered_len -= len(batch) # If flush could not get the semaphore, we log and raise error if wanted else: - _LOGGER.info( - "Partition %r fails to flush due to timeout.", - self.partition_id - ) + _LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id) if raise_error: - raise OperationTimeoutError("Failed to flush {!r} within {}".format(self.partition_id, timeout_time)) + raise OperationTimeoutError( + "Failed to flush {!r} within {}".format(self.partition_id, timeout_time) + ) break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() @@ -186,8 +165,10 @@ async def check_max_wait_time_worker(self): if self._max_buffer_len > 0: now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - #flush the partition if its beyond the waiting time or the buffer is at max capacity - if (now_time - self._last_send_time > self._max_wait_time and self._running) or (self._cur_buffered_len >= self._max_buffer_len and self._running): + # flush the partition if its beyond the waiting time or the buffer is at max capacity + if (now_time - self._last_send_time > self._max_wait_time and self._running) or ( + self._cur_buffered_len >= self._max_buffer_len and self._running + ): # in the worker, not raising error for flush, users can not handle this await self.flush(raise_error=False) await asyncio.sleep(min(self._max_wait_time, 5)) From e2af87950304b5f5def6f4f60fc29c221b93e914 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Thu, 26 May 2022 08:05:44 -0700 Subject: [PATCH 17/18] final review --- .../_buffered_producer/_buffered_producer_dispatcher.py | 7 +++---- .../azure-eventhub/azure/eventhub/_producer_client.py | 6 ++---- .../aio/_buffered_producer/_buffered_producer_async.py | 3 ++- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py index 96be4e84ec11..3f58dc0f70f3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py @@ -31,8 +31,7 @@ def __init__( *, max_buffer_length: int = 1500, max_wait_time: float = 1, - executor: Optional[Union[ThreadPoolExecutor, int]] = None, - max_worker: Optional[int] = None + executor: Optional[Union[ThreadPoolExecutor, int]] = None ): self._buffered_producers: Dict[str, BufferedProducer] = {} self._partition_ids: List[str] = partitions @@ -48,11 +47,11 @@ def __init__( self._existing_executor = False if not executor: - self._executor = ThreadPoolExecutor(max_worker) + self._executor = ThreadPoolExecutor() elif isinstance(executor, ThreadPoolExecutor): + self._existing_executor = True self._executor = executor elif isinstance(executor, int): - self._existing_executor = True self._executor = ThreadPoolExecutor(executor) def _get_partition_id(self, partition_id, partition_key): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 199917dc08d5..d1ebbe800dce 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -191,7 +191,6 @@ def __init__( self._max_wait_time = max_wait_time self._max_buffer_length = max_buffer_length self._executor = kwargs.get("buffer_concurrency") - self._max_worker = None if self._buffered_mode: setattr(self, "send_batch", self._buffered_send_batch) @@ -236,9 +235,8 @@ def _buffered_send(self, events, **kwargs): self._max_message_size_on_link, max_wait_time=self._max_wait_time, max_buffer_length=self._max_buffer_length, - executor=self._executor, - max_worker=self._max_worker, - ) + executor=self._executor + ) self._buffered_producer_dispatcher.enqueue_events(events, **kwargs) def _batch_preparer(self, event_data_batch, **kwargs): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 014787bb5ddd..93efa215bcfb 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -170,7 +170,8 @@ async def check_max_wait_time_worker(self): self._cur_buffered_len >= self._max_buffer_len and self._running ): # in the worker, not raising error for flush, users can not handle this - await self.flush(raise_error=False) + async with self._lock: + await self.flush(raise_error=False) await asyncio.sleep(min(self._max_wait_time, 5)) @property From 590809f153aa697ad031a400cc3039ec2e03fdbc Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Thu, 26 May 2022 08:47:14 -0700 Subject: [PATCH 18/18] fix pylint issues --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 3 ++- .../azure-eventhub/azure/eventhub/_producer_client.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index ec8936c33716..6cb39659a83a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -168,7 +168,8 @@ def check_max_wait_time_worker(self): if self._cur_buffered_len > 0: now_time = time.time() _LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id) - # flush the partition if the producer is running beyond the waiting time or the buffer is at max capacity + # flush the partition if the producer is running beyond the waiting time + # or the buffer is at max capacity if (now_time - self._last_send_time > self._max_wait_time) or ( self._cur_buffered_len >= self._max_buffer_len ): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index d1ebbe800dce..aa9ada4764af 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -55,8 +55,9 @@ class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api- :keyword bool buffered_mode: If True, the producer client will collect events in a buffer, efficiently batch, then publish. Default is False. :keyword Union[ThreadPoolExecutor, int] buffer_concurrency: The ThreadPoolExecutor to be used for publishing events - or the number of workers for the ThreadPoolExecutor. Default is none and a ThreadPoolExecutor with the default number of workers - will be created https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor + or the number of workers for the ThreadPoolExecutor. + Default is none and a ThreadPoolExecutor with the default number of workers will be created + per https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor :keyword on_success: The callback to be called once a batch has been successfully published. The callback takes two parameters: - `events`: The list of events that have been successfully published