From 1bdc4b477cb3b3c7b2f34a9c0b3e7f3deb451332 Mon Sep 17 00:00:00 2001 From: Charles Titus Date: Wed, 6 Mar 2024 16:58:31 -0500 Subject: [PATCH 1/2] Added try/except to Publisher to handle case where EventPages are too large, and dynamically re-chunk them --- bluesky_kafka/__init__.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/bluesky_kafka/__init__.py b/bluesky_kafka/__init__.py index 948b467..47a214a 100644 --- a/bluesky_kafka/__init__.py +++ b/bluesky_kafka/__init__.py @@ -9,6 +9,10 @@ from bluesky.run_engine import Dispatcher, DocumentNames from suitcase import mongo_normalized +from confluent_kafka import KafkaError, KafkaException + +from event_model import rechunk_event_pages + from ._version import get_versions __version__ = get_versions()["version"] @@ -141,7 +145,18 @@ def __call__(self, name, doc): event-model document dictionary """ - self.produce(message=(name, doc)) + try: + self.produce(message=(name, doc)) + except KafkaException as e: + KErr = e.args[0] + if KErr.code() == KafkaError.MSG_SIZE_TOO_LARGE: + if name == "event_page": + page_len = len(doc['seq_num']) + new_event_list = rechunk_event_pages([doc], (page_len + 1)//2) + for event in new_event_list: + self.__call__(name, event) + else: + raise e if self._flush_on_stop_doc and name == "stop": self.flush() From bf89cfc085a08446f098f62a24084dfac566e0fb Mon Sep 17 00:00:00 2001 From: Jamie Date: Tue, 26 Mar 2024 12:55:20 -0400 Subject: [PATCH 2/2] Update bluesky_kafka/__init__.py Prevent potential infinite recursion by checking page_len Co-authored-by: Thomas A Caswell --- bluesky_kafka/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bluesky_kafka/__init__.py b/bluesky_kafka/__init__.py index 47a214a..75d0172 100644 --- a/bluesky_kafka/__init__.py +++ b/bluesky_kafka/__init__.py @@ -152,6 +152,8 @@ def __call__(self, name, doc): if KErr.code() == KafkaError.MSG_SIZE_TOO_LARGE: if name == "event_page": page_len = len(doc['seq_num']) + if page_len == 1: + raise new_event_list = rechunk_event_pages([doc], (page_len + 1)//2) for event in new_event_list: self.__call__(name, event)