From f81fe387ef7abf809da912e93c0b57ad448ad5df Mon Sep 17 00:00:00 2001 From: Pete R Jemian Date: Sat, 2 Dec 2023 13:03:45 -0600 Subject: [PATCH] MNT #893 add template support --- apstools/callbacks/nexus_writer.py | 86 ++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/apstools/callbacks/nexus_writer.py b/apstools/callbacks/nexus_writer.py index 413d48f4f..0b1cdc611 100644 --- a/apstools/callbacks/nexus_writer.py +++ b/apstools/callbacks/nexus_writer.py @@ -9,6 +9,7 @@ """ import datetime +import json import logging import pathlib import time @@ -94,6 +95,7 @@ def my_plan(dets, n=5): ~create_NX_group ~get_sample_title ~get_stream_link + ~resolve_class_path ~wait_writer ~wait_writer_plan_stub ~write_data @@ -108,6 +110,7 @@ def my_plan(dets, n=5): ~write_slits ~write_source ~write_streams + ~write_templates ~write_user New with apstools release 1.3.0. @@ -123,6 +126,9 @@ def my_plan(dets, n=5): nxdata_signal_axes = None # name of dataset for X axis on plot root = None # instance of h5py.File + template_key = "nxwriter_template" # TODO: refactor template from dict to list + """The template (dict) is written as a JSON string to this metadata key.""" + _external_file_read_timeout = 20 _external_file_read_retry_delay = 0.5 _writer_active = False @@ -246,6 +252,34 @@ def h5string(self, text): text = text or "" return text.encode("utf8") + def resolve_class_path(self, class_path): + """ + Parse the class path, make any groups, return the HDF5 address. + + New with apstools release 1.6.18. + """ + addr = "" + for level in class_path.split("/"): + if ":" in level: + group_name, nx_class = level.split(":") + if not nx_class.startswith("NX"): + raise ValueError(f"nx_class must start with 'NX'. Received {nx_class=!r}") + if group_name not in self.root[addr]: + # fmt: off + logger.info( + "make HDF5 group with @NX_class=%r at address '%s/%s'", + nx_class, addr, group_name + ) + # fmt: on + self.create_NX_group(self.root[addr], level) + addr += f"/{group_name}" + else: + addr += f"/{level}" + addr = addr.replace("//", "/") + + logger.debug("HDF5 address=%r", addr) + return addr + def wait_writer(self): """ Wait for the writer to finish. For interactive use (Not in a plan). @@ -414,6 +448,11 @@ def write_entry(self): nxentry["plan_name"] = self.root["/entry/instrument/bluesky/metadata/plan_name"] nxentry["entry_identifier"] = self.root["/entry/instrument/bluesky/uid"] + try: + self.write_templates() + except Exception as exc: + logger.warning("Problem writing template(s): %s", exc) + return nxentry def write_instrument(self, parent): @@ -753,6 +792,53 @@ def write_streams(self, parent): return bluesky + def write_templates(self): + """ + Process any link templates provided as run metadata. + + New in v1.6.18. + """ + addr = f"/entry/instrument/bluesky/metadata/{self.template_key}" + if addr not in self.root: + return + templates = json.loads(self.root[addr][()]) + + for source, target in templates: + if "/@" in source: + p = source.rfind("/") + if source.find("/@") != p: # more than one match + raise ValueError(f"Only one attribute can be named. Received: {source!r}") + h5addr = self.resolve_class_path(source[:p]) + attr = source[p + 2 :] + logger.debug("Set attribute: group=%r attr=%r value=%r", h5addr, attr, target) + if h5addr in self.root: + self.root[h5addr].attrs[attr] = target + else: + logger.warning("group %r not in root %r", h5addr, self.root.name) + elif source.endswith("="): + p = source.rfind("/") + h5addr = self.resolve_class_path(source[:p]) + field = source.split("/")[-1].rstrip("=") + if h5addr in self.root: + if isinstance(target, (int, float)): + target = [target] + ds = self.root[h5addr].create_dataset(field, data=target) + ds.attrs["target"] = ds.name + # fmt: off + logger.info( + "Set constant field: group=%r field=%r value=%r", + h5addr, field, ds[()] + ) + # fmt: on + else: + logger.warning("group %r not in root %r", h5addr, self.root.name) + elif source in self.root: + h5addr = self.resolve_class_path(target) + self.root[h5addr] = self.root[source] + logger.debug("Template: Linked %r to %r", source, h5addr) + else: + logger.warning("Not handled: source=%r target=%r", source, target) + def write_user(self, parent): """ group: /entry/contact:NXuser