Skip to content

Commit

Permalink
fixes #192
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Jul 20, 2019
1 parent 85dc225 commit 816d54b
Showing 1 changed file with 81 additions and 55 deletions.
136 changes: 81 additions & 55 deletions apstools/migration/spec2ophyd.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,43 @@ def __init__(self, config_text):
self.prefix = prefix
self.index = None
self.num_channels = int(num_channels)
self.ophyd_device = None

def __str__(self):
fmt = "SpecDeviceBase(index={}, name={}, prefix={}, num_channels={})"
return fmt.format(
self.index,
self.name,
self.prefix,
self.num_channels
)
items = [f"{k}={self.__getattribute__(k)}" for k in "index name prefix num_channels".split()]
return f"{self.__class__.__name__}({', '.join(items)})"


class ItemNameBase(object):
def item_name_value(self, item):
if hasattr(self, item):
return f"{item}={self.__getattribute__(item)}"

def ophyd_config(self):
return f"{self.mne} = {self} # FIXME: not valid ophyd"


class SpecSignal(ItemNameBase):
"""
SPEC configuration of a single EPICS PV
"""

def __init__(self, mne, nm, pvname):
"""data provided by caller"""
self.mne = mne
self.name = nm
self.pvname = pvname
self.signal_name = "EpicsSignal"

def __str__(self):
items = [f"{k}={self.__getattribute__(k)}" for k in "mne name pvname signal_name".split()]
return f"{self.__class__.__name__}({', '.join(items)})"

def ophyd_config(self):
s = f"{self.mne} = {self.signal_name}('{self.pvname}', name='{self.mne}')"
if self.mne != self.name:
s += f" # {self.name}"
return s


class SpecMotor(ItemNameBase):
Expand All @@ -53,7 +75,7 @@ class SpecMotor(ItemNameBase):

def __init__(self, config_text):
"""parse the line from the SPEC config file"""
# Motor cntrl steps sign slew base backl accel nada flags mne name
# Motor ctrl steps sign slew base backl accel nada flags mne name
# MOT002 = EPICS_M2:0/3 2000 1 2000 200 50 125 0 0x003 my my
lr = config_text.split(sep="=", maxsplit=1)
self.index = int(lr[0].strip("MOT"))
Expand All @@ -66,7 +88,7 @@ def pop_word(line, int_result=False):
l = int(l)
return l, r

self.cntrl, r = pop_word(lr[1])
self.ctrl, r = pop_word(lr[1])
self.steps, r = pop_word(r, True)
self.sign, r = pop_word(r, True)
self.slew, r = pop_word(r, True)
Expand All @@ -86,21 +108,30 @@ def __str__(self):
if txt is not None:
items.append(txt)
else:
items.append(self.item_name_value("cntrl"))
return "SpecMotor({})".format(", ".join(items))
items.append(self.item_name_value("ctrl"))
return f"{self.__class__.__name__}({', '.join(items)})"

def setDevice(self, devices):
if self.cntrl.startswith("EPICS_M2"):
if self.ctrl.startswith("EPICS_M2"):
device_list = devices.get("VM_EPICS_M1")
if device_list is not None:
uc_str = self.cntrl[len("EPICS_M2:"):]
uc_str = self.ctrl[len("EPICS_M2:"):]
unit, chan = list(map(int, uc_str.split("/")))
self.device = device_list[unit]
self.pvname = "{}m{}".format(self.device.prefix, chan)
elif self.cntrl.startswith("MAC_MOT"):
elif self.ctrl.startswith("MAC_MOT"):
pass # TODO:
elif self.cntrl.startswith("NONE"):
elif self.ctrl.startswith("NONE"):
pass # TODO:

def ophyd_config(self):
s = f"{self.mne} = EpicsMotor('{self.pvname}', name='{self.mne}')"
if self.mne != self.name:
s += f" # {self.name}"
if len(self.motpar) > 0:
s += f" # {', '.join(self.motpar)}"
return s



class SpecCounter(ItemNameBase):
Expand Down Expand Up @@ -132,15 +163,16 @@ def pop_word(line, int_result=False):
self.mne, self.name = pop_word(r)
self.device = None
self.pvname = None
self.reported_pvs = []

def __str__(self):
items = [self.item_name_value(k) for k in "index mne name".split()]
items = [self.item_name_value(k) for k in "mne name unit chan".split()]
txt = self.item_name_value("pvname")
if txt is not None:
items.append(txt)
else:
items.append(self.item_name_value("ctrl"))
return "SpecCounter({})".format(", ".join(items))
return f"{self.__class__.__name__}({', '.join(items)})"

def setDevice(self, devices):
if self.ctrl.startswith("EPICS_SC"):
Expand All @@ -149,10 +181,17 @@ def setDevice(self, devices):
self.device = device_list[self.unit]
# scalers are goofy, SPEC uses 0-based numbering, scaler uses 1-based
self.pvname = "{}.S{}".format(self.device.prefix, self.chan+1)
elif self.cntrl.startswith("EPICS_PV"):
pass # TODO:
elif self.cntrl.startswith("NONE"):
elif self.ctrl.startswith("EPICS_PV"):
device_list = devices.get("VM_EPICS_PV")
if device_list is not None:
self.device = device_list[self.unit]
self.pvname = self.device.prefix
self.ophyd_device = "EpicsSignal"
elif self.ctrl.startswith("NONE"):
pass # TODO:

def ophyd_config(self):
return f"# counter: {self.mne} = {self}"


class SpecConfig(object):
Expand All @@ -165,6 +204,9 @@ def __init__(self, config_file):
self.devices = OrderedDict()
self.motors = OrderedDict()
self.counters = OrderedDict()
self.signals = OrderedDict()
self.scalers = []
self.collection = []
self.unhandled = []

def read_config(self, config_file=None):
Expand All @@ -191,50 +233,34 @@ def read_config(self, config_file=None):
elif re.match("CNT\d*", line) is not None:
counter = SpecCounter(line)
counter.setDevice(self.devices)
self.counters[counter.mne] = counter
if counter.ctrl == "EPICS_PV":
signal = SpecSignal(counter.mne, counter.name, counter.pvname)
self.signals[signal.mne] = signal
self.collection.append(signal)
else:
if counter.pvname is not None:
pvname = counter.pvname.split(".")[0]
if pvname not in self.scalers:
mne = pvname.lower().split(":")[-1]
scaler = SpecSignal(mne, mne, pvname)
scaler.signal_name = "ScalerCH"
self.scalers.append(pvname)
self.collection.append(scaler)

self.counters[counter.mne] = counter
self.collection.append(counter)
elif re.match("MOT\d*", line) is not None:
motor = SpecMotor(line)
motor.setDevice(self.devices)
self.motors[motor.mne] = motor
self.collection.append(motor)
else:
self.unhandled.append(line)


def create_ophyd_setup(spec_config):
# ophyd configures the counters by device, not by channel
device_list = spec_config.devices.get("VM_EPICS_SC")
if device_list is not None:
import_shown = False
for device in device_list:
mne = "scaler{}".format(device.index)
if not import_shown:
print("from ophyd.scaler import ScalerCH")
import_shown = True
print(f"{mne} = ScalerCH('{device.prefix}', name='{mne}')")
chans = []
for counter in spec_config.counters.values():
if counter.device == device:
key = "chan%02d" % (counter.chan+1)
print(f"# {key} : {counter.mne} ({counter.name})")
chans.append(key)
if len(chans) > 0:
print(f"{mne}.channels.read_attrs = {chans}")

mne_list = []
for mne, motor in sorted(spec_config.motors.items()):
if motor.pvname is not None:
mne = mne.replace(".", "_")
mne_list.append(mne)
msg = f"{mne} = EpicsMotor('{motor.pvname}', name='{mne}')"
comments = []
if motor.mne != motor.name:
comments.append(f"{motor.name}")
if len(motor.motpar) > 0:
comments.append(f" TODO: {', '.join(motor.motpar)}")
if len(comments) > 0:
msg += f" # {(' '.join(comments)).strip()}"
print(msg)

for device in spec_config.collection:
print(f"{device.ophyd_config()}")


def main():
Expand Down

0 comments on commit 816d54b

Please sign in to comment.