From 35ab214d20c9fba236ec7f0016125b949787f82b Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Wed, 4 Mar 2020 11:19:51 -0500 Subject: [PATCH] Add replay utility. --- event_model/__init__.py | 39 ++++++++++++++++++++++++++++ event_model/tests/test_em.py | 49 ++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/event_model/__init__.py b/event_model/__init__.py index 584c04bac..33eadeb7b 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1822,3 +1822,42 @@ def default(self, obj): return obj.item() return obj.tolist() return json.JSONEncoder.default(self, obj) + + +def replay(gen, callback, burst=False): + """ + Emit documents to a callback with realistic time spacing. + + Parameters + ---------- + gen: iterable + Expected to yield (name, doc) pairs + callback: callable + Expected signature: callback(name, doc) + burst: bool, optional + If True, emit as fast as possible, ignoring timing. False by default. + """ + DOCUMENTS_WITHOUT_A_TIME = {'datum', 'datum_page', 'resource'} + cache = collections.deque() + name, doc = next(gen) + if name != 'start': + raise ValueError("Expected gen to start with a RunStart document") + # Compute time difference between now and the time that this run started. + offset = ttime.time() - doc['time'] + callback(name, doc) + for name, doc in gen: + if name in DOCUMENTS_WITHOUT_A_TIME: + # The bluesky RunEngine emits these documents immediately + # before emitting an Event, which does have a time. Lacking + # more specific timing info, we'll cache these and then emit + # them in a burst before the next document with an associated time. + cache.append((name, doc)) + else: + if not burst: + delay = max(0, offset - (ttime.time() - doc['time'])) + ttime.sleep(delay) + while cache: + # Emit any cached documents without a time in a burst. + callback(*cache.popleft()) + # Emit this document. + callback(name, doc) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index d2ece6805..04c72d05e 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -1214,3 +1214,52 @@ def test_round_trip_datum_page_with_empty_data(): page_again = event_model.pack_datum_page(*datums) assert page_again == datum_page + + +def test_replay(): + bundle = event_model.compose_run() + collector = [] + start_doc, compose_descriptor, compose_resource, compose_stop = bundle + collector.append(('start', start_doc)) + bundle = compose_descriptor( + data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'}, + 'image': {'shape': [512, 512], 'dtype': 'number', + 'source': '...', 'external': 'FILESTORE:'}}, + name='primary') + descriptor_doc, compose_event, compose_event_page = bundle + collector.append(('descriptor', descriptor_doc)) + bundle = compose_resource( + spec='TIFF', root='/tmp', resource_path='stack.tiff', + resource_kwargs={}) + resource_doc, compose_datum, compose_datum_page = bundle + collector.append(('resource', resource_doc)) + datum_doc = compose_datum(datum_kwargs={'slice': 5}) + collector.append(('datum', datum_doc)) + event_doc = compose_event( + data={'motor': 0, 'image': datum_doc['datum_id']}, + timestamps={'motor': 0, 'image': 0}, filled={'image': False}) + collector.append(('event', event_doc)) + datum_page = compose_datum_page(datum_kwargs={'slice': [10, 15]}) + collector.append(('datum_page', datum_page)) + event_page = compose_event_page(data={'motor': [1, 2], 'image': + datum_page['datum_id']}, + timestamps={'motor': [0, 0], + 'image': [0, 0]}, + filled={'image': [False, False]}, + seq_num=[1, 2]) + collector.append(('event_page', event_page)) + stop_doc = compose_stop() + collector.append(('stop', stop_doc)) + + def gen(): + for name, doc in collector: + yield name, doc + + actual = [] + + def callback(name, doc): + actual.append((name, doc)) + + event_model.replay(gen(), callback) + + assert actual == collector