diff --git a/CHANGES.rst b/CHANGES.rst index 5095b95c9..6619fc5ee 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,23 +6,23 @@ Change History :1.1.16: expected by *tba* : + * `#265 `_ + refactor of #264 * `#264 `_ Limit number of traces shown on a plot - use a FIFO + * `#262 `_ + add `lineup()` plan (from APS 8-ID-I XPCS) :1.1.15: expected by *2019-11-21* : bug fixes, adds asyn record support * `#259 `_ resolve AssertionError from setup_lorentzian_swait - * `#258 `_ swait record does not units, some other fields - * `#255 `_ plans: resolve indentation error - * `#254 `_ add computed APS cycle as signal - * `#252 `_ synApps: add asyn record support diff --git a/apstools/plans.py b/apstools/plans.py index 9df0ce8c8..94f47a6d7 100644 --- a/apstools/plans.py +++ b/apstools/plans.py @@ -6,6 +6,7 @@ ~addDeviceDataAsStream ~execute_command_list ~get_command_list + ~lineup ~nscan ~parse_Excel_command_file ~parse_text_command_file @@ -251,6 +252,127 @@ def _internal(blocking_function, *args, **kwargs): return status +def lineup( + counter, axis, minus, plus, npts, + time_s=0.1, peak_factor=4, width_factor=0.8, + _md={}): + """ + lineup and center a given axis, relative to current position + + PARAMETERS + + counter : object + instance of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel) + dependent measurement to be maximized + + axis : movable object + instance of ophyd.Signal (or subclass such as EpicsMotor) + independent axis to use for alignment + + minus : float + first point of scan at this offset from starting position + + plus : float + last point of scan at this offset from starting position + + npts : int + number of data points in the scan + + time_s : float (default: 0.1) + count time per step (if counter is ScalerChannel object) + + peak_factor : float (default: 4) + maximum must be greater than 'peak_factor'*minimum + + width_factor : float (default: 0.8) + fwhm must be less than 'width_factor'*plot_range + + EXAMPLE: + + RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0)) + """ + # first, determine if counter is part of a ScalerCH device + scaler = None + obj = counter.parent + if isinstance(counter.parent, ScalerChannel): + if hasattr(obj, "parent") and obj.parent is not None: + obj = obj.parent + if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH): + scaler = obj.parent + + if scaler is not None: + old_sigs = scaler.stage_sigs + scaler.stage_sigs["preset_time"] = time_s + scaler.select_channels([counter.name]) + + if hasattr(axis, "position"): + old_position = axis.position + else: + old_position = axis.value + + def peak_analysis(): + aligned = False + if counter.name in bec.peaks["cen"]: + table = pyRestTable.Table() + table.labels = ("key", "value") + table.addRow(("axis", axis.name)) + table.addRow(("detector", counter.name)) + table.addRow(("starting position", old_position)) + for key in bec.peaks.ATTRS: + table.addRow((key, bec.peaks[key][counter.name])) + logger.info(f"alignment scan results:\n{table}") + + lo = bec.peaks["min"][counter.name][-1] # [-1] means detector + hi = bec.peaks["max"][counter.name][-1] # [0] means axis + fwhm = bec.peaks["fwhm"][counter.name] + final = bec.peaks["cen"][counter.name] + + ps = list(bec._peak_stats.values())[0][counter.name] # PeakStats object + # get the X data range as received by PeakStats + x_range = abs(max(ps.x_data) - min(ps.x_data)) + + if final is None: + logger.error(f"centroid is None") + final = old_position + elif fwhm is None: + logger.error(f"FWHM is None") + final = old_position + elif hi < peak_factor*lo: + logger.error(f"no clear peak: {hi} < {peak_factor}*{lo}") + final = old_position + elif fwhm > width_factor*x_range: + logger.error(f"FWHM too large: {fwhm} > {width_factor}*{x_range}") + final = old_position + else: + aligned = True + + logger.info(f"moving {axis.name} to {final} (aligned: {aligned})") + yield from bps.mv(axis, final) + else: + logger.error("no statistical analysis of scan peak!") + yield from bps.null() + + # too sneaky? We're modifying this structure locally + bec.peaks.aligned = aligned + bec.peaks.ATTRS = ('com', 'cen', 'max', 'min', 'fwhm') + + md = dict(_md) + md["purpose"] = "alignment" + yield from bp.rel_scan([counter], axis, minus, plus, npts, md=md) + yield from peak_analysis() + + if bec.peaks.aligned: + # again, tweak axis to maximize + md["purpose"] = "alignment - fine" + fwhm = bec.peaks["fwhm"][counter.name] + yield from bp.rel_scan([counter], axis, -fwhm, fwhm, npts, md=md) + yield from peak_analysis() + + if scaler is not None: + scaler.select_channels() + scaler.stage_sigs = old_sigs + + def nscan(detectors, *motor_sets, num=11, per_step=None, md=None): """ Scan over ``n`` variables moved together, each in equally spaced steps.