Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replay script #674

Merged
merged 4 commits into from
Feb 7, 2022
Merged

Add replay script #674

merged 4 commits into from
Feb 7, 2022

Conversation

AbbyGi
Copy link
Contributor

@AbbyGi AbbyGi commented Sep 29, 2021

Description

Adds a script that replays Bluesky runs with the correct timing.

Motivation and Context

This simulates live streaming data so we can develop and test code for streaming data without needing to be at a beamline looking at new data.

How Has This Been Tested?

This has been tested interactively while developing GUI applications.

@AbbyGi AbbyGi requested a review from danielballan September 29, 2021 19:36
@gwbischof
Copy link
Contributor

How are Kafka and ZMQ used here?

@gwbischof
Copy link
Contributor

gwbischof commented Sep 29, 2021

Could replay take a list of generators? So I can simulate multiple runs from different beamlines at the same time? Or maybe we should make a new function that replays multiple generators?

@tacaswell
Copy link
Contributor

You probably want to lift the time sorting logic than merges documents from multiple collections into a time ordered stream in databroker and write a function like

def merge_runs(*runs):
    for name, doc in sort_and_collate(runs):
        yield name, doc

@gwbischof
Copy link
Contributor

gwbischof commented Sep 29, 2021

There is this function here that does something like that:

def _interlace(*gens, strict_order=True):

Which reminds me, that we may need to interlace the event/datum_pages for a single run if we want them to be in the right order. But if the generator is canonical() then that should already be done.

@danielballan
Copy link
Member

How are Kafka and ZMQ used here?

The CLI portion of this (in main()) is designed to replay the documents over the network, via 0MQ or Kafka, so that can be consumed by some other process. The underlying function with the "brains" (replay) can also be used directly in process, without a CLI or network, if that's better for your use case.

Could replay take a list of generators? So I can simulate multiple runs from different beamlines at the same time?

I see a way to do this that is simpler and also more realistic: a separate process per beamline, each running replay on one run at a time, and each (naturally) making reference to the same clock. With a Run, there are strict ordering guarantees, but across beamlines, there is a little more slip in the order of things, and the relative timing does not have to be exactly perfect. Assuming your goal is load-testing Mongo, hitting it with ~28 processes all writing concurrently is more realistic that funneling everything through one process.

Which reminds me, that we may need to interlace the event/datum_pages for a single run if we want them to be in the right order. But if the generator is canonical() then that should already be done.

This now happens here. It turns out it's a lot simpler to do this at the single-document level --- straight from Mongo before anythign could have been grouped into pages.

def single_documents(self, fill):
if fill:
raise NotImplementedError("Only fill=False is implemented.")
external_fields = {} # map descriptor uid to set of external fields
datum_cache = {} # map datum_id to datum document
# Track which Resource and Datum documents we have yielded so far.
resource_uids = set()
datum_ids = set()
# Interleave the documents from the streams in time order.
merged_iter = toolz.itertoolz.merge_sorted(
*(stream.iter_descriptors_and_events() for stream in self.values()),
key=lambda item: item[1]["time"],
)
yield ("start", self.metadata["start"])
for name, doc in merged_iter:
# Insert Datum, Resource as needed, and then yield (name, doc).
if name == "event":
for field in external_fields[doc["descriptor"]]:
datum_id = doc["data"][field]
if datum_ids not in datum_ids:
# We haven't yielded this Datum yet. Look it up, and yield it.
try:
# Check to see if it's been pre-fetched.
datum = datum_cache.pop(datum_id)
except KeyError:
resource_uid = self.lookup_resource_for_datum(datum_id)
if resource_uid not in resource_uids:
# We haven't yielded this Resource yet. Look it up, and yield it.
resource = self.get_resource(resource_uid)
resource_uids.add(resource_uid)
yield ("resource", resource)
# Pre-fetch *all* the Datum documents for this resource in one query.
datum_cache.update(
{
doc["datum_id"]: doc
for doc in self.get_datum_for_resource(
resource_uid
)
}
)
# Now get the Datum we originally were looking for.
datum = datum_cache.pop(datum_id)
datum_ids.add(datum_id)
yield ("datum", datum)
elif name == "descriptor":
# Track which fields ("data keys") hold references to external data.
external_fields[doc["uid"]] = {
key
for key, value in doc["data_keys"].items()
if value.get("external")
}
yield name, doc
stop_doc = self.metadata["stop"]
if stop_doc is not None:
yield ("stop", stop_doc)

Replay can count on the documents coming out of databroker in strict time order.

@danielballan danielballan marked this pull request as ready for review February 7, 2022 20:34
@danielballan danielballan merged commit fa785b1 into bluesky:v2.0.0-dev Feb 7, 2022
@mrakitin
Copy link
Member

mrakitin commented Apr 7, 2022

@danielballan
Copy link
Member

Certainly relevant to bluesky/event-model#150 and bluesky/event-model#155. We may need to think about how something in event-model itself (or bluesky-live?) would be useful, in addition to this tool in databroker. Probably not? Needs though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants