Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replay utility. #155

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,3 +1822,42 @@ def default(self, obj):
return obj.item()
return obj.tolist()
return json.JSONEncoder.default(self, obj)


def replay(gen, callback, burst=False):
"""
Emit documents to a callback with realistic time spacing.

Parameters
----------
gen: iterable
Expected to yield (name, doc) pairs
callback: callable
Expected signature: callback(name, doc)
burst: bool, optional
If True, emit as fast as possible, ignoring timing. False by default.
"""
DOCUMENTS_WITHOUT_A_TIME = {'datum', 'datum_page', 'resource'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a list. Why the curly braces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro-optimization to use a set instead of a list. It make the init a bit slower, makes x in DOCUMENTS_WITHOUT_A_TIME a bit faster. Removes the [n] access because there are no meaningful semantics in the order of these values.

cache = collections.deque()
name, doc = next(gen)
if name != 'start':
raise ValueError("Expected gen to start with a RunStart document")
# Compute time difference between now and the time that this run started.
offset = ttime.time() - doc['time']
callback(name, doc)
for name, doc in gen:
if name in DOCUMENTS_WITHOUT_A_TIME:
# The bluesky RunEngine emits these documents immediately
# before emitting an Event, which does have a time. Lacking
# more specific timing info, we'll cache these and then emit
# them in a burst before the next document with an associated time.
cache.append((name, doc))
else:
if not burst:
delay = max(0, offset - (ttime.time() - doc['time']))
ttime.sleep(delay)
while cache:
# Emit any cached documents without a time in a burst.
callback(*cache.popleft())
# Emit this document.
callback(name, doc)
49 changes: 49 additions & 0 deletions event_model/tests/test_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -1214,3 +1214,52 @@ def test_round_trip_datum_page_with_empty_data():

page_again = event_model.pack_datum_page(*datums)
assert page_again == datum_page


def test_replay():
bundle = event_model.compose_run()
collector = []
start_doc, compose_descriptor, compose_resource, compose_stop = bundle
collector.append(('start', start_doc))
bundle = compose_descriptor(
data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'},
'image': {'shape': [512, 512], 'dtype': 'number',
'source': '...', 'external': 'FILESTORE:'}},
name='primary')
descriptor_doc, compose_event, compose_event_page = bundle
collector.append(('descriptor', descriptor_doc))
bundle = compose_resource(
spec='TIFF', root='/tmp', resource_path='stack.tiff',
resource_kwargs={})
resource_doc, compose_datum, compose_datum_page = bundle
collector.append(('resource', resource_doc))
datum_doc = compose_datum(datum_kwargs={'slice': 5})
collector.append(('datum', datum_doc))
event_doc = compose_event(
data={'motor': 0, 'image': datum_doc['datum_id']},
timestamps={'motor': 0, 'image': 0}, filled={'image': False})
collector.append(('event', event_doc))
datum_page = compose_datum_page(datum_kwargs={'slice': [10, 15]})
collector.append(('datum_page', datum_page))
event_page = compose_event_page(data={'motor': [1, 2], 'image':
datum_page['datum_id']},
timestamps={'motor': [0, 0],
'image': [0, 0]},
filled={'image': [False, False]},
seq_num=[1, 2])
collector.append(('event_page', event_page))
stop_doc = compose_stop()
collector.append(('stop', stop_doc))

def gen():
for name, doc in collector:
yield name, doc

actual = []

def callback(name, doc):
actual.append((name, doc))

event_model.replay(gen(), callback)

assert actual == collector