Skip to content

Commit

Permalink
fixes #267
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Dec 3, 2019
1 parent 792b88f commit cc13bbb
Showing 1 changed file with 115 additions and 110 deletions.
225 changes: 115 additions & 110 deletions apstools/plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import sys
import time

from bluesky import preprocessors as bpp
from bluesky import plans as bp
from bluesky import plan_stubs as bps
from bluesky import preprocessors as bpp
from bluesky.callbacks.fitting import PeakStats
from ophyd import Device, Component, Signal, DeviceStatus, EpicsSignal
from ophyd.scaler import ScalerCH, ScalerChannel
from ophyd.status import Status

from . import utils as APS_utils
Expand Down Expand Up @@ -252,125 +254,128 @@ def _internal(blocking_function, *args, **kwargs):
return status


def lineup(
counter, axis, minus, plus, npts,
time_s=0.1, peak_factor=4, width_factor=0.8,
_md={}):
"""
lineup and center a given axis, relative to current position
# TODO: the lineup() plan needs access to the BestEffortCallbacks().peaks object.
# 2019-12-03,prj: removing until we can get to that object (if it even exists) from here
#-----
# def lineup(
# counter, axis, minus, plus, npts,
# time_s=0.1, peak_factor=4, width_factor=0.8,
# _md={}):
# """
# lineup and center a given axis, relative to current position

PARAMETERS
# PARAMETERS

counter : object
instance of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel)
dependent measurement to be maximized
# counter : object
# instance of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel)
# dependent measurement to be maximized

axis : movable object
instance of ophyd.Signal (or subclass such as EpicsMotor)
independent axis to use for alignment
# axis : movable object
# instance of ophyd.Signal (or subclass such as EpicsMotor)
# independent axis to use for alignment

minus : float
first point of scan at this offset from starting position
# minus : float
# first point of scan at this offset from starting position

plus : float
last point of scan at this offset from starting position
# plus : float
# last point of scan at this offset from starting position

npts : int
number of data points in the scan
# npts : int
# number of data points in the scan

time_s : float (default: 0.1)
count time per step (if counter is ScalerChannel object)
# time_s : float (default: 0.1)
# count time per step (if counter is ScalerChannel object)

peak_factor : float (default: 4)
maximum must be greater than 'peak_factor'*minimum
# peak_factor : float (default: 4)
# maximum must be greater than 'peak_factor'*minimum

width_factor : float (default: 0.8)
fwhm must be less than 'width_factor'*plot_range
EXAMPLE:
RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0))
"""
# first, determine if counter is part of a ScalerCH device
scaler = None
obj = counter.parent
if isinstance(counter.parent, ScalerChannel):
if hasattr(obj, "parent") and obj.parent is not None:
obj = obj.parent
if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH):
scaler = obj.parent

if scaler is not None:
old_sigs = scaler.stage_sigs
scaler.stage_sigs["preset_time"] = time_s
scaler.select_channels([counter.name])

if hasattr(axis, "position"):
old_position = axis.position
else:
old_position = axis.value

def peak_analysis():
aligned = False
if counter.name in bec.peaks["cen"]:
table = pyRestTable.Table()
table.labels = ("key", "value")
table.addRow(("axis", axis.name))
table.addRow(("detector", counter.name))
table.addRow(("starting position", old_position))
for key in bec.peaks.ATTRS:
table.addRow((key, bec.peaks[key][counter.name]))
logger.info(f"alignment scan results:\n{table}")

lo = bec.peaks["min"][counter.name][-1] # [-1] means detector
hi = bec.peaks["max"][counter.name][-1] # [0] means axis
fwhm = bec.peaks["fwhm"][counter.name]
final = bec.peaks["cen"][counter.name]

ps = list(bec._peak_stats.values())[0][counter.name] # PeakStats object
# get the X data range as received by PeakStats
x_range = abs(max(ps.x_data) - min(ps.x_data))

if final is None:
logger.error(f"centroid is None")
final = old_position
elif fwhm is None:
logger.error(f"FWHM is None")
final = old_position
elif hi < peak_factor*lo:
logger.error(f"no clear peak: {hi} < {peak_factor}*{lo}")
final = old_position
elif fwhm > width_factor*x_range:
logger.error(f"FWHM too large: {fwhm} > {width_factor}*{x_range}")
final = old_position
else:
aligned = True
# width_factor : float (default: 0.8)
# fwhm must be less than 'width_factor'*plot_range
# EXAMPLE:

# RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0))
# """
# # first, determine if counter is part of a ScalerCH device
# scaler = None
# obj = counter.parent
# if isinstance(counter.parent, ScalerChannel):
# if hasattr(obj, "parent") and obj.parent is not None:
# obj = obj.parent
# if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH):
# scaler = obj.parent

# if scaler is not None:
# old_sigs = scaler.stage_sigs
# scaler.stage_sigs["preset_time"] = time_s
# scaler.select_channels([counter.name])

# if hasattr(axis, "position"):
# old_position = axis.position
# else:
# old_position = axis.value

# def peak_analysis():
# aligned = False
# if counter.name in bec.peaks["cen"]:
# table = pyRestTable.Table()
# table.labels = ("key", "value")
# table.addRow(("axis", axis.name))
# table.addRow(("detector", counter.name))
# table.addRow(("starting position", old_position))
# for key in bec.peaks.ATTRS:
# table.addRow((key, bec.peaks[key][counter.name]))
# logger.info(f"alignment scan results:\n{table}")

# lo = bec.peaks["min"][counter.name][-1] # [-1] means detector
# hi = bec.peaks["max"][counter.name][-1] # [0] means axis
# fwhm = bec.peaks["fwhm"][counter.name]
# final = bec.peaks["cen"][counter.name]

# ps = list(bec._peak_stats.values())[0][counter.name] # PeakStats object
# # get the X data range as received by PeakStats
# x_range = abs(max(ps.x_data) - min(ps.x_data))

# if final is None:
# logger.error(f"centroid is None")
# final = old_position
# elif fwhm is None:
# logger.error(f"FWHM is None")
# final = old_position
# elif hi < peak_factor*lo:
# logger.error(f"no clear peak: {hi} < {peak_factor}*{lo}")
# final = old_position
# elif fwhm > width_factor*x_range:
# logger.error(f"FWHM too large: {fwhm} > {width_factor}*{x_range}")
# final = old_position
# else:
# aligned = True

logger.info(f"moving {axis.name} to {final} (aligned: {aligned})")
yield from bps.mv(axis, final)
else:
logger.error("no statistical analysis of scan peak!")
yield from bps.null()

# too sneaky? We're modifying this structure locally
bec.peaks.aligned = aligned
bec.peaks.ATTRS = ('com', 'cen', 'max', 'min', 'fwhm')

md = dict(_md)
md["purpose"] = "alignment"
yield from bp.rel_scan([counter], axis, minus, plus, npts, md=md)
yield from peak_analysis()

if bec.peaks.aligned:
# again, tweak axis to maximize
md["purpose"] = "alignment - fine"
fwhm = bec.peaks["fwhm"][counter.name]
yield from bp.rel_scan([counter], axis, -fwhm, fwhm, npts, md=md)
yield from peak_analysis()

if scaler is not None:
scaler.select_channels()
scaler.stage_sigs = old_sigs
# logger.info(f"moving {axis.name} to {final} (aligned: {aligned})")
# yield from bps.mv(axis, final)
# else:
# logger.error("no statistical analysis of scan peak!")
# yield from bps.null()

# # too sneaky? We're modifying this structure locally
# bec.peaks.aligned = aligned
# bec.peaks.ATTRS = ('com', 'cen', 'max', 'min', 'fwhm')

# md = dict(_md)
# md["purpose"] = "alignment"
# yield from bp.rel_scan([counter], axis, minus, plus, npts, md=md)
# yield from peak_analysis()

# if bec.peaks.aligned:
# # again, tweak axis to maximize
# md["purpose"] = "alignment - fine"
# fwhm = bec.peaks["fwhm"][counter.name]
# yield from bp.rel_scan([counter], axis, -fwhm, fwhm, npts, md=md)
# yield from peak_analysis()

# if scaler is not None:
# scaler.select_channels()
# scaler.stage_sigs = old_sigs


def nscan(detectors, *motor_sets, num=11, per_step=None, md=None):
Expand Down

0 comments on commit cc13bbb

Please sign in to comment.