Skip to content

Commit

Permalink
Merge pull request #57 from cjtitus/event_page_rechunk
Browse files Browse the repository at this point in the history
Dynamic EventPage Rechunking
  • Loading branch information
tacaswell authored Apr 9, 2024
2 parents 5b692e3 + bf89cfc commit 2aaf2e1
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion bluesky_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from bluesky.run_engine import Dispatcher, DocumentNames
from suitcase import mongo_normalized

from confluent_kafka import KafkaError, KafkaException

from event_model import rechunk_event_pages

from ._version import get_versions

__version__ = get_versions()["version"]
Expand Down Expand Up @@ -141,7 +145,20 @@ def __call__(self, name, doc):
event-model document dictionary
"""
self.produce(message=(name, doc))
try:
self.produce(message=(name, doc))
except KafkaException as e:
KErr = e.args[0]
if KErr.code() == KafkaError.MSG_SIZE_TOO_LARGE:
if name == "event_page":
page_len = len(doc['seq_num'])
if page_len == 1:
raise
new_event_list = rechunk_event_pages([doc], (page_len + 1)//2)
for event in new_event_list:
self.__call__(name, event)
else:
raise e
if self._flush_on_stop_doc and name == "stop":
self.flush()

Expand Down

0 comments on commit 2aaf2e1

Please sign in to comment.