From cc13bbb8badc5387601abe3181a5ae50e378c46d Mon Sep 17 00:00:00 2001 From: Pete Jemian Date: Tue, 3 Dec 2019 16:16:07 -0600 Subject: [PATCH] fixes #267 --- apstools/plans.py | 225 +++++++++++++++++++++++----------------------- 1 file changed, 115 insertions(+), 110 deletions(-) diff --git a/apstools/plans.py b/apstools/plans.py index 94f47a6d7..7cf9cdc43 100644 --- a/apstools/plans.py +++ b/apstools/plans.py @@ -40,10 +40,12 @@ import sys import time -from bluesky import preprocessors as bpp +from bluesky import plans as bp from bluesky import plan_stubs as bps +from bluesky import preprocessors as bpp from bluesky.callbacks.fitting import PeakStats from ophyd import Device, Component, Signal, DeviceStatus, EpicsSignal +from ophyd.scaler import ScalerCH, ScalerChannel from ophyd.status import Status from . import utils as APS_utils @@ -252,125 +254,128 @@ def _internal(blocking_function, *args, **kwargs): return status -def lineup( - counter, axis, minus, plus, npts, - time_s=0.1, peak_factor=4, width_factor=0.8, - _md={}): - """ - lineup and center a given axis, relative to current position +# TODO: the lineup() plan needs access to the BestEffortCallbacks().peaks object. +# 2019-12-03,prj: removing until we can get to that object (if it even exists) from here +#----- +# def lineup( +# counter, axis, minus, plus, npts, +# time_s=0.1, peak_factor=4, width_factor=0.8, +# _md={}): +# """ +# lineup and center a given axis, relative to current position - PARAMETERS +# PARAMETERS - counter : object - instance of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel) - dependent measurement to be maximized +# counter : object +# instance of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel) +# dependent measurement to be maximized - axis : movable object - instance of ophyd.Signal (or subclass such as EpicsMotor) - independent axis to use for alignment +# axis : movable object +# instance of ophyd.Signal (or subclass such as EpicsMotor) +# independent axis to use for alignment - minus : float - first point of scan at this offset from starting position +# minus : float +# first point of scan at this offset from starting position - plus : float - last point of scan at this offset from starting position +# plus : float +# last point of scan at this offset from starting position - npts : int - number of data points in the scan +# npts : int +# number of data points in the scan - time_s : float (default: 0.1) - count time per step (if counter is ScalerChannel object) +# time_s : float (default: 0.1) +# count time per step (if counter is ScalerChannel object) - peak_factor : float (default: 4) - maximum must be greater than 'peak_factor'*minimum +# peak_factor : float (default: 4) +# maximum must be greater than 'peak_factor'*minimum - width_factor : float (default: 0.8) - fwhm must be less than 'width_factor'*plot_range - - EXAMPLE: - - RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0)) - """ - # first, determine if counter is part of a ScalerCH device - scaler = None - obj = counter.parent - if isinstance(counter.parent, ScalerChannel): - if hasattr(obj, "parent") and obj.parent is not None: - obj = obj.parent - if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH): - scaler = obj.parent - - if scaler is not None: - old_sigs = scaler.stage_sigs - scaler.stage_sigs["preset_time"] = time_s - scaler.select_channels([counter.name]) - - if hasattr(axis, "position"): - old_position = axis.position - else: - old_position = axis.value - - def peak_analysis(): - aligned = False - if counter.name in bec.peaks["cen"]: - table = pyRestTable.Table() - table.labels = ("key", "value") - table.addRow(("axis", axis.name)) - table.addRow(("detector", counter.name)) - table.addRow(("starting position", old_position)) - for key in bec.peaks.ATTRS: - table.addRow((key, bec.peaks[key][counter.name])) - logger.info(f"alignment scan results:\n{table}") - - lo = bec.peaks["min"][counter.name][-1] # [-1] means detector - hi = bec.peaks["max"][counter.name][-1] # [0] means axis - fwhm = bec.peaks["fwhm"][counter.name] - final = bec.peaks["cen"][counter.name] - - ps = list(bec._peak_stats.values())[0][counter.name] # PeakStats object - # get the X data range as received by PeakStats - x_range = abs(max(ps.x_data) - min(ps.x_data)) - - if final is None: - logger.error(f"centroid is None") - final = old_position - elif fwhm is None: - logger.error(f"FWHM is None") - final = old_position - elif hi < peak_factor*lo: - logger.error(f"no clear peak: {hi} < {peak_factor}*{lo}") - final = old_position - elif fwhm > width_factor*x_range: - logger.error(f"FWHM too large: {fwhm} > {width_factor}*{x_range}") - final = old_position - else: - aligned = True +# width_factor : float (default: 0.8) +# fwhm must be less than 'width_factor'*plot_range + +# EXAMPLE: + +# RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0)) +# """ +# # first, determine if counter is part of a ScalerCH device +# scaler = None +# obj = counter.parent +# if isinstance(counter.parent, ScalerChannel): +# if hasattr(obj, "parent") and obj.parent is not None: +# obj = obj.parent +# if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH): +# scaler = obj.parent + +# if scaler is not None: +# old_sigs = scaler.stage_sigs +# scaler.stage_sigs["preset_time"] = time_s +# scaler.select_channels([counter.name]) + +# if hasattr(axis, "position"): +# old_position = axis.position +# else: +# old_position = axis.value + +# def peak_analysis(): +# aligned = False +# if counter.name in bec.peaks["cen"]: +# table = pyRestTable.Table() +# table.labels = ("key", "value") +# table.addRow(("axis", axis.name)) +# table.addRow(("detector", counter.name)) +# table.addRow(("starting position", old_position)) +# for key in bec.peaks.ATTRS: +# table.addRow((key, bec.peaks[key][counter.name])) +# logger.info(f"alignment scan results:\n{table}") + +# lo = bec.peaks["min"][counter.name][-1] # [-1] means detector +# hi = bec.peaks["max"][counter.name][-1] # [0] means axis +# fwhm = bec.peaks["fwhm"][counter.name] +# final = bec.peaks["cen"][counter.name] + +# ps = list(bec._peak_stats.values())[0][counter.name] # PeakStats object +# # get the X data range as received by PeakStats +# x_range = abs(max(ps.x_data) - min(ps.x_data)) + +# if final is None: +# logger.error(f"centroid is None") +# final = old_position +# elif fwhm is None: +# logger.error(f"FWHM is None") +# final = old_position +# elif hi < peak_factor*lo: +# logger.error(f"no clear peak: {hi} < {peak_factor}*{lo}") +# final = old_position +# elif fwhm > width_factor*x_range: +# logger.error(f"FWHM too large: {fwhm} > {width_factor}*{x_range}") +# final = old_position +# else: +# aligned = True - logger.info(f"moving {axis.name} to {final} (aligned: {aligned})") - yield from bps.mv(axis, final) - else: - logger.error("no statistical analysis of scan peak!") - yield from bps.null() - - # too sneaky? We're modifying this structure locally - bec.peaks.aligned = aligned - bec.peaks.ATTRS = ('com', 'cen', 'max', 'min', 'fwhm') - - md = dict(_md) - md["purpose"] = "alignment" - yield from bp.rel_scan([counter], axis, minus, plus, npts, md=md) - yield from peak_analysis() - - if bec.peaks.aligned: - # again, tweak axis to maximize - md["purpose"] = "alignment - fine" - fwhm = bec.peaks["fwhm"][counter.name] - yield from bp.rel_scan([counter], axis, -fwhm, fwhm, npts, md=md) - yield from peak_analysis() - - if scaler is not None: - scaler.select_channels() - scaler.stage_sigs = old_sigs +# logger.info(f"moving {axis.name} to {final} (aligned: {aligned})") +# yield from bps.mv(axis, final) +# else: +# logger.error("no statistical analysis of scan peak!") +# yield from bps.null() + +# # too sneaky? We're modifying this structure locally +# bec.peaks.aligned = aligned +# bec.peaks.ATTRS = ('com', 'cen', 'max', 'min', 'fwhm') + +# md = dict(_md) +# md["purpose"] = "alignment" +# yield from bp.rel_scan([counter], axis, minus, plus, npts, md=md) +# yield from peak_analysis() + +# if bec.peaks.aligned: +# # again, tweak axis to maximize +# md["purpose"] = "alignment - fine" +# fwhm = bec.peaks["fwhm"][counter.name] +# yield from bp.rel_scan([counter], axis, -fwhm, fwhm, npts, md=md) +# yield from peak_analysis() + +# if scaler is not None: +# scaler.select_channels() +# scaler.stage_sigs = old_sigs def nscan(detectors, *motor_sets, num=11, per_step=None, md=None):