-
Notifications
You must be signed in to change notification settings - Fork 81
/
device.py
1822 lines (1492 loc) · 61.7 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import collections
import contextlib
import functools
import inspect
import itertools
import logging
import operator
import textwrap
import time as ttime
import typing
import warnings
from collections import OrderedDict, namedtuple
from collections.abc import Iterable, MutableSequence
from enum import Enum
from typing import (
Any,
Callable,
ClassVar,
DefaultDict,
Dict,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)
from .ophydobj import Kind, OphydObject
from .signal import Signal
from .status import DeviceStatus, StatusBase
from .utils import (
ExceptionBundle,
RedundantStaging,
doc_annotation_forwarder,
getattrs,
underscores_to_camel_case,
)
A, B = TypeVar("A"), TypeVar("B")
ALL_COMPONENTS = object()
# This attrs are defined at instanitation time and must not
# collide with class attributes.
DEVICE_INSTANCE_ATTRS = {
"name",
"parent",
"component_names",
"_signals",
"_sig_attrs",
"_sub_devices",
}
# These attributes are part of the bluesky interface and cannot be
# used as component names.
DEVICE_RESERVED_ATTRS = {
"read",
"describe",
"trigger",
"configure",
"read_configuration",
"describe_configuration",
"describe_collect",
"set",
"stage",
"unstage",
"pause",
"resume",
"kickoff",
"complete",
"collect",
"position",
"stop",
# from OphydObject
"subscribe",
"clear_sub",
"event_types",
"root",
# for back-compat
"signal_names",
}
class OrderedDictType(Dict[A, B]):
...
logger = logging.getLogger(__name__)
class Staged(Enum):
"""Three-state switch"""
yes = "yes"
no = "no"
partially = "partially"
ComponentWalk = namedtuple("ComponentWalk", "ancestors dotted_name item")
K = TypeVar("K", bound=OphydObject)
class Component(typing.Generic[K]):
"""A descriptor representing a device component (or signal)
Unrecognized keyword arguments will be passed directly to the component
class initializer.
Parameters
----------
cls : class
Class of signal to create. The required signature of `cls.__init__` is
(if `suffix` is given)::
def __init__(self, pv_name, parent=None, **kwargs):
or (if suffix is None) ::
def __init__(self, parent=None, **kwargs):
The class may have a `wait_for_connection()` which is called during the
component instance creation.
suffix : str, optional
The PV suffix, which gets appended onto ``parent.prefix`` to generate
the final PV that the instance component will bind to.
Also see ``add_prefix``
lazy : bool, optional
Lazily instantiate the signal. If ``False``, the signal will be
instantiated upon component instantiation. Defaults to
``component.lazy_default``.
trigger_value : any, optional
Mark as a signal to be set on trigger. The value is sent to the signal
at trigger time.
add_prefix : sequence, optional
Keys in the kwargs to prefix with the Device PV prefix during creation
of the component instance.
Defaults to ``('suffix', 'write_pv', )``
doc : str, optional
string to attach to component DvcClass.component.__doc__
"""
#: Default laziness for the component class.
lazy_default: ClassVar[bool] = False
#: The attribute name of the component.
attr: Optional[str]
#: The class to instantiate when the device is created.
cls: Type[K]
#: Keyword arguments for the device creation.
kwargs: Dict[str, Any]
#: Lazily create components on access.
lazy: bool
#: PV or identifier suffix.
suffix: Optional[str]
#: Documentation string.
doc: Optional[str]
#: Value to send on ``trigger()``
trigger_value: Optional[Any]
#: The data acquisition kind.
kind: Kind
#: Names of kwarg keys to prepend the device PV prefix to.
add_prefix: Tuple[str, ...]
#: Subscription name -> subscriptions marked by decorator.
_subscriptions: DefaultDict[str, List[Callable]]
def __init__(
self,
cls: Type[K],
suffix: Optional[str] = None,
*,
lazy: Optional[bool] = None,
trigger_value: Optional[Any] = None,
add_prefix: Optional[Sequence[str]] = None,
doc: Optional[str] = None,
kind: Union[str, Kind] = Kind.normal,
**kwargs,
):
self.attr = None # attr is set later by the device when known
self.cls = cls
self.kwargs = kwargs
self.lazy = lazy if lazy is not None else self.lazy_default
self.suffix = suffix
self.doc = doc
self.trigger_value = trigger_value # TODO discuss
self.kind = Kind[kind.lower()] if isinstance(kind, str) else Kind(kind)
if add_prefix is None:
add_prefix = ("suffix", "write_pv")
self.add_prefix = tuple(add_prefix)
self._subscriptions = collections.defaultdict(list)
def _get_class_from_annotation(self) -> Optional[Type[K]]:
"""Get a class from the Component[cls] annotation."""
annotation = getattr(self, "__orig_class__", None)
if not annotation:
return None
args = typing.get_args(annotation)
if not args or not len(args) == 1:
return None
return args[0]
def __set_name__(self, owner, attr_name: str):
self.attr = attr_name
if self.doc is None:
self.doc = self.make_docstring(owner)
@property
def is_device(self):
"Does this Component contain a Device?"
return isinstance(self.cls, type) and issubclass(self.cls, Device)
@property
def is_signal(self):
"Does this Component contain a Signal?"
return isinstance(self.cls, type) and issubclass(self.cls, Signal)
def maybe_add_prefix(self, instance, kw, suffix):
"""Add prefix to a suffix if kw is in self.add_prefix
Parameters
----------
instance : Device
The instance to extract the prefix to maybe append to the
suffix from.
kw : str
The key of associated with the suffix. If this key is
self.add_prefix than prepend the prefix to the suffix and
return, else just return the suffix.
suffix : str
The suffix to maybe have something prepended to.
Returns
-------
str
"""
if kw in self.add_prefix:
return f"{instance.prefix}{suffix}"
return suffix
def create_component(self, instance):
"Instantiate the object described by this Component for a Device"
kwargs = self.kwargs.copy()
kwargs.update(
name=f"{instance.name}_{self.attr}",
kind=instance._component_kinds[self.attr],
attr_name=self.attr,
)
for kw, val in list(kwargs.items()):
kwargs[kw] = self.maybe_add_prefix(instance, kw, val)
if self.suffix is not None:
pv_name = self.maybe_add_prefix(instance, "suffix", self.suffix)
cpt_inst = self.cls(pv_name, parent=instance, **kwargs)
else:
cpt_inst = self.cls(parent=instance, **kwargs)
if self.lazy and hasattr(self.cls, "wait_for_connection"):
if getattr(instance, "lazy_wait_for_connection", True):
cpt_inst.wait_for_connection()
return cpt_inst
def make_docstring(self, parent_class):
"Create a docstring for the Component"
if self.doc is not None:
return self.doc
doc = [
"{} attribute".format(self.__class__.__name__),
"::",
"",
]
doc.append(textwrap.indent(repr(self), prefix=" " * 4))
doc.append("")
return "\n".join(doc)
def __repr__(self):
repr_dict = self.kwargs.copy()
repr_dict.pop("read_attrs", None)
repr_dict.pop("configuration_attrs", None)
repr_dict["kind"] = self.kind.name
kw_str = ", ".join(f"{k}={v!r}" for k, v in repr_dict.items())
suffix = repr(self.suffix) if self.suffix else ""
component_class = self.cls.__name__
args = ", ".join(s for s in (component_class, suffix, kw_str) if s)
this_class = self.__class__.__name__
return f"{this_class}({args})"
__str__ = __repr__
@typing.overload
def __get__(self, instance: None, owner: type) -> Component[K]:
...
@typing.overload
def __get__(self, instance: Device, owner: type) -> K:
...
def __get__(
self,
instance: Optional[Device],
owner: type,
) -> Union[Component, K]:
if instance is None:
return self
try:
return instance._signals[self.attr]
except KeyError:
return instance._instantiate_component(self.attr)
def __set__(self, instance, owner):
raise RuntimeError("Do not use setattr with components; use " "cpt.put(value)")
def subscriptions(self, event_type):
"""(Decorator) Specify subscriptions callbacks in the Device definition
Parameters
----------
event_type : str or None
Event type to subscribe to. `ophyd.Signal` supports at least
{'value', 'meta'}. An `event_type` of `None` indicates that the
default event type for the signal is to be used.
Returns
-------
subscriber : callable
Callable with signature `subscriber(func)`, where `func` is the
method to call when the subscription of event_type is fired.
"""
def subscriber(func):
self._subscriptions[event_type].append(func)
if not hasattr(func, "_subscriptions"):
func._subscriptions = []
func._subscriptions.append((self, event_type))
return func
return subscriber
def sub_default(self, func):
"Default subscription decorator"
return self.subscriptions(None)(func)
def sub_meta(self, func):
"Metadata subscription decorator"
return self.subscriptions("meta")(func)
def sub_value(self, func):
"Value subscription decorator"
return self.subscriptions("value")(func)
class FormattedComponent(Component[K]):
"""A Component which takes a dynamic format string
This differs from Component in that the parent prefix is not automatically
added onto the Component suffix. Additionally, `str.format()` style strings
are accepted, allowing access to Device instance attributes:
>>> from ophyd import (Component as Cpt, FormattedComponent as FCpt)
>>> class MyDevice(Device):
... # A normal component, where 'suffix' is added to prefix verbatim
... cpt = Cpt(EpicsSignal, 'suffix')
... # A formatted component, where 'self' refers to the Device instance
... ch = FCpt(EpicsSignal, '{self.prefix}{self._ch_name}')
... # A formatted component, where 'self' is assumed
... ch = FCpt(EpicsSignal, '{prefix}{_ch_name}')
...
... def __init__(self, prefix, ch_name=None, **kwargs):
... self._ch_name = ch_name
... super().__init__(prefix, **kwargs)
>>> dev = MyDevice('prefix:', ch_name='some_channel', name='dev')
>>> print(dev.cpt.pvname)
prefix:suffix
>>> print(dev.ch.pvname)
prefix:some_channel
For additional documentation, refer to Component.
"""
def maybe_add_prefix(self, instance, kw, suffix):
if kw not in self.add_prefix:
return suffix
format_dict = dict(instance.__dict__)
format_dict["self"] = instance
return suffix.format(**format_dict)
class DynamicDeviceComponent(Component["Device"]):
"""An Device component that dynamically creates an ophyd Device
Parameters
----------
defn : OrderedDict
The definition of all attributes to be created, in the form of::
defn['attribute_name'] = (SignalClass, pv_suffix, keyword_arg_dict)
This will create an attribute on the sub-device of type `SignalClass`,
with a suffix of pv_suffix, which looks something like this::
parent.sub.attribute_name = Cpt(SignalClass, pv_suffix, **keyword_arg_dict)
Keep in mind that this is actually done in the metaclass creation, and
not exactly as written above.
clsname : str, optional
The name of the class to be generated
This defaults to {parent_name}{this_attribute_name.capitalize()}
doc : str, optional
The docstring to put on the dynamically generated class
default_read_attrs : list, optional
A class attribute to put on the dynamically generated class
default_configuration_attrs : list, optional
A class attribute to put on the dynamically generated class
component_class : class, optional
Defaults to Component
base_class : class, optional
Defaults to Device
"""
def __init__(
self,
defn,
*,
clsname=None,
doc=None,
kind=Kind.normal,
default_read_attrs=None,
default_configuration_attrs=None,
component_class=Component,
base_class=None,
):
if isinstance(default_read_attrs, Iterable):
default_read_attrs = tuple(default_read_attrs)
if isinstance(default_configuration_attrs, Iterable):
default_configuration_attrs = tuple(default_configuration_attrs)
self.defn = defn
self.clsname = clsname
self.default_read_attrs = default_read_attrs
self.default_configuration_attrs = default_configuration_attrs
self.attrs = list(defn.keys())
self.component_class = component_class
self.base_class = base_class if base_class is not None else Device
self.components = {
attr: component_class(cls, suffix, **kwargs)
for attr, (cls, suffix, kwargs) in self.defn.items()
}
# NOTE: cls is None here, as it gets created in __set_name__, below
super().__init__(cls=None, suffix="", lazy=False, kind=kind)
# Allow easy access to all generated components directly in the
# DynamicDeviceComponent instance
for attr, cpt in self.components.items():
if not hasattr(self, attr):
setattr(self, attr, cpt)
def __getnewargs_ex__(self):
"Get arguments needed to copy this class (used for pickle/copy)"
kwargs = dict(
clsname=self.clsname,
doc=self.doc,
kind=self.kind,
default_read_attrs=self.default_read_attrs,
default_configuration_attrs=self.default_configuration_attrs,
component_class=self.component_class,
base_class=self.base_class,
)
return ((self.defn,), kwargs)
def __set_name__(self, owner, attr_name):
if self.clsname is None:
self.clsname = underscores_to_camel_case(attr_name)
super().__set_name__(owner, attr_name)
self.cls = create_device_from_components(
self.clsname,
default_read_attrs=self.default_read_attrs,
default_configuration_attrs=self.default_configuration_attrs,
base_class=self.base_class,
**self.components,
)
def __repr__(self):
return "\n".join(f"{attr} = {cpt!r}" for attr, cpt in self.components.items())
def subscriptions(self, event_type):
raise NotImplementedError(
"DynamicDeviceComponent does not yet " "support decorator subscriptions"
)
# These stub 'Interface' classes are the apex of the mro heirarchy for
# their respective methods. They make multiple interitance more
# forgiving, and let us define classes that customize these methods
# but are not full Devices.
class BlueskyInterface:
"""Classes that inherit from this can safely customize the
these methods without breaking mro.
"""
def __init__(self, *args, **kwargs):
# Subclasses can populate this with (signal, value) pairs, to be
# set by stage() and restored back by unstage().
self.stage_sigs = OrderedDict()
self._staged = Staged.no
self._original_vals = OrderedDict()
super().__init__(*args, **kwargs)
def trigger(self) -> StatusBase:
"""Trigger the device and return status object.
This method is responsible for implementing 'trigger' or
'acquire' functionality of this device.
If there is an appreciable time between triggering the device
and it being able to be read (via the
:meth:`~BlueskyInterface.read` method) then this method is
also responsible for arranging that the
:obj:`~ophyd.status.StatusBase` object returned by this method
is notified when the device is ready to be read.
If there is no delay between triggering and being readable,
then this method must return a :obj:`~ophyd.status.StatusBase`
object which is already completed.
Returns
-------
status : StatusBase
:obj:`~ophyd.status.StatusBase` object which will be marked
as complete when the device is ready to be read.
"""
pass
def read(self) -> OrderedDictType[str, Dict[str, Any]]:
"""Read data from the device.
This method is expected to be as instantaneous as possible,
with any substantial acquisition time taken care of in
:meth:`~BlueskyInterface.trigger`.
The `OrderedDict` returned by this method must have identical
keys (in the same order) as the `OrderedDict` returned by
:meth:`~BlueskyInterface.describe()`.
By convention, the first key in the return is the 'primary' key
and maybe used by heuristics in :mod:`bluesky`.
The values in the ordered dictionary must be dict (-likes) with the
keys ``{'value', 'timestamp'}``. The ``'value'`` may have any type,
the timestamp must be a float UNIX epoch timestamp in UTC.
Returns
-------
data : OrderedDict
The keys must be strings and the values must be dict-like
with the keys ``{'value', 'timestamp'}``
"""
return OrderedDict()
def describe(self) -> OrderedDictType[str, Dict[str, Any]]:
"""Provide schema and meta-data for :meth:`~BlueskyInterface.read`.
This keys in the `OrderedDict` this method returns must match the
keys in the `OrderedDict` return by :meth:`~BlueskyInterface.read`.
This provides schema related information, (ex shape, dtype), the
source (ex PV name), and if available, units, limits, precision etc.
Returns
-------
data_keys : OrderedDict
The keys must be strings and the values must be dict-like
with the ``event_model.event_descriptor.data_key`` schema.
"""
return OrderedDict()
def stage(self) -> List[object]:
"""Stage the device for data collection.
This method is expected to put the device into a state where
repeated calls to :meth:`~BlueskyInterface.trigger` and
:meth:`~BlueskyInterface.read` will 'do the right thing'.
Staging not idempotent and should raise
:obj:`RedundantStaging` if staged twice without an
intermediate :meth:`~BlueskyInterface.unstage`.
This method should be as fast as is feasible as it does not return
a status object.
The return value of this is a list of all of the (sub) devices
stage, including it's self. This is used to ensure devices
are not staged twice by the :obj:`~bluesky.run_engine.RunEngine`.
This is an optional method, if the device does not need
staging behavior it should not implement `stage` (or
`unstage`).
Returns
-------
devices : list
list including self and all child devices staged
"""
if self._staged == Staged.no:
pass # to short-circuit checking individual cases
elif self._staged == Staged.yes:
raise RedundantStaging(
"Device {!r} is already staged. " "Unstage it first.".format(self)
)
elif self._staged == Staged.partially:
raise RedundantStaging(
"Device {!r} has been partially staged. "
"Maybe the most recent unstaging "
"encountered an error before finishing. "
"Try unstaging again.".format(self)
)
self.log.debug("Staging %s", self.name)
self._staged = Staged.partially
# Resolve any stage_sigs keys given as strings: 'a.b' -> self.a.b
stage_sigs = OrderedDict()
for k, v in self.stage_sigs.items():
if isinstance(k, str):
# Device.__getattr__ handles nested attr lookup
stage_sigs[getattr(self, k)] = v
else:
stage_sigs[k] = v
# Read current values, to be restored by unstage()
original_vals = {sig: sig.get() for sig in stage_sigs}
# We will add signals and values from original_vals to
# self._original_vals one at a time so that
# we can undo our partial work in the event of an error.
# Apply settings.
devices_staged = []
try:
for sig, val in stage_sigs.items():
self.log.debug(
"Setting %s to %r (original value: %r)",
sig.name,
val,
original_vals[sig],
)
sig.set(val).wait()
# It worked -- now add it to this list of sigs to unstage.
self._original_vals[sig] = original_vals[sig]
devices_staged.append(self)
# Call stage() on child devices.
for attr in self._sub_devices:
device = getattr(self, attr)
if hasattr(device, "stage"):
device.stage()
devices_staged.append(device)
except Exception:
self.log.debug(
"An exception was raised while staging %s or "
"one of its children. Attempting to restore "
"original settings before re-raising the "
"exception.",
self.name,
)
self.unstage()
raise
else:
self._staged = Staged.yes
return devices_staged
def unstage(self) -> List[object]:
"""Unstage the device.
This method returns the device to the state it was prior to the
last `stage` call.
This method should be as fast as feasible as it does not
return a status object.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Returns
-------
devices : list
list including self and all child devices unstaged
"""
self.log.debug("Unstaging %s", self.name)
self._staged = Staged.partially
devices_unstaged = []
# Call unstage() on child devices.
for attr in self._sub_devices[::-1]:
device = getattr(self, attr)
if hasattr(device, "unstage"):
device.unstage()
devices_unstaged.append(device)
# Restore original values.
for sig, val in reversed(list(self._original_vals.items())):
self.log.debug("Setting %s back to its original value: %r", sig.name, val)
sig.set(val).wait()
self._original_vals.pop(sig)
devices_unstaged.append(self)
self._staged = Staged.no
return devices_unstaged
def pause(self) -> None:
"""Attempt to 'pause' the device.
This is called when ever the
:obj:`~bluesky.run_engine.RunEngine` is interrupted.
A device may have internal state that means plans can not
safely be re-wound. This method may: put the device in a
'paused' state and/or raise
:obj:`~bluesky.run_engine.NoReplayAllowed` to indicate that
the plan can not be rewound.
Raises
------
bluesky.run_engine.NoReplayAllowed
"""
pass
def resume(self) -> None:
"""Resume a device from a 'paused' state.
This is called by the :obj:`bluesky.run_engine.RunEngine`
when it resumes from an interruption and is responsible for
ensuring that the device is ready to take data again.
"""
pass
def _validate_kind(self, val):
return super()._validate_kind(val)
class GenerateDatumInterface:
"""Classes that inherit from this can safely customize the
`generate_datum` method without breaking mro. If used along with the
BlueskyInterface, inherit from this second."""
def generate_datum(self, key, timestamp, datum_kwargs):
pass
class Device(BlueskyInterface, OphydObject):
"""Base class for device objects
This class provides attribute access to one or more Signals, which can be
a mixture of read-only and writable. All must share the same base_name.
Parameters
----------
prefix : str, optional
The PV prefix for all components of the device
name : str, keyword only
The name of the device (as will be reported via read()`
kind : a member of the :class:`~ophydobj.Kind` :class:`~enum.IntEnum`
(or equivalent integer), optional
Default is ``Kind.normal``. See :class:`~ophydobj.Kind` for options.
read_attrs : sequence of attribute names
DEPRECATED: the components to include in a normal reading
(i.e., in ``read()``)
configuration_attrs : sequence of attribute names
DEPRECATED: the components to be read less often (i.e., in
``read_configuration()``) and to adjust via ``configure()``
parent : instance or None, optional
The instance of the parent device, if applicable
Attributes
----------
lazy_wait_for_connection : bool
When instantiating a lazy signal upon first access, wait for it to
connect before returning control to the user. See also the context
manager helpers: ``wait_for_lazy_connection`` and
``do_not_wait_for_lazy_connection``.
Subscriptions
-------------
SUB_ACQ_DONE
A one-time subscription indicating the requested trigger-based
acquisition has completed.
"""
SUB_ACQ_DONE = "acq_done" # requested acquire
# Over-ride in sub-classes to control the default contents of read and
# configuration attrs lists.
# For read attributes, if `None`, defaults to `self.component_names'
_default_read_attrs = None
# For configuration attributes, if `None`, defaults to `[]`
_default_configuration_attrs = None
# When instantiating a lazy signal upon first access, wait for it to
# connect before returning control to the user
lazy_wait_for_connection = True
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
self._destroyed = False
# Store EpicsSignal objects (only created once they are accessed)
self._signals = {}
# Copy the Device-defined signal kinds, for user modification
self._component_kinds = self._component_kinds.copy()
# Subscriptions to run or general methods necessary to call prior to
# marking the Device as connected
self._required_for_connection = self._required_for_connection.copy()
self.prefix = prefix
if self.component_names and prefix is None:
raise ValueError("Must specify prefix if device signals are being " "used")
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
# The logic of these if blocks is:
# - If the user did not provide read_attrs, fall back on the default
# specified by the class, which ultimately falls back to Device, if no
# subclass overrides it. Either way, we now have a read_attrs.
# - If it is set to the sentinel ALL_COMPONENTS, ignore whatever the
# 'kind' settings are; just include everything. This is an escape catch
# for getting what _default_read_attrs=None used to do before 'kind'
# was implemented.
# - If it is set to a list, ignore whatever the 'kind' settings are and
# just include that list.
# - If it is set to None, respect whatever the 'kind' settings of the
# components are.
# If any sub-Devices are to be removed from configuration_attrs and
# read_attrs, we have to remove them from read_attrs first, or they
# will not allow themselves to be removed from configuration_attrs.
if read_attrs is None:
read_attrs = self._default_read_attrs
if read_attrs is ALL_COMPONENTS:
read_attrs = self.component_names
if read_attrs is not None:
self.read_attrs = list(read_attrs)
if configuration_attrs is None:
configuration_attrs = self._default_configuration_attrs
if configuration_attrs is ALL_COMPONENTS:
configuration_attrs = self.component_names
if configuration_attrs is not None:
self.configuration_attrs = list(configuration_attrs)
with do_not_wait_for_lazy_connection(self):
# Instantiate non-lazy signals and lazy signals with subscriptions
[
getattr(self, attr)
for attr, cpt in self._sig_attrs.items()
if not cpt.lazy or cpt._subscriptions
]
@classmethod
def _initialize_device(cls):
"""Initializes the Device and all of its Components
Initializes the following attributes from the Components::
- _sig_attrs - dict of attribute name to Component
- component_names - a list of attribute names used for components
- _device_tuple - An auto-generated namedtuple based on all
existing Components in the Device
- _sub_devices - a list of attributes which hold a Device
- _required_for_connection - a dictionary of object-to-description
for additional things that block this from being reported as
connected
"""
for attr in DEVICE_INSTANCE_ATTRS:
if attr in cls.__dict__:
raise TypeError(
"The attribute name %r is reserved for "
"use by the Device class. Choose a different "
"name." % attr
)
# this is so that the _sig_attrs class attribute includes the sigattrs
# from all of its class-inheritance-parents so we do not have to do
# this look up everytime we look at it.
base_devices = [
base for base in reversed(cls.__bases__) if hasattr(base, "_sig_attrs")
]
cls._sig_attrs = OrderedDict(
(attr, cpt)
for base in base_devices
for attr, cpt in base._sig_attrs.items()
if getattr(cls, attr) is not None
)
# map component classes to their attribute names from this class
this_sig_attrs = {
attr: cpt
for attr, cpt in cls.__dict__.items()
if isinstance(cpt, Component)
}
cls._sig_attrs.update(**this_sig_attrs)
# Record the class-defined kinds - these can be updated on a
# per-instance basis
cls._component_kinds = {attr: cpt.kind for attr, cpt in cls._sig_attrs.items()}
bad_attrs = set(cls._sig_attrs).intersection(DEVICE_RESERVED_ATTRS)
if bad_attrs:
raise TypeError(
f"The attribute name(s) {bad_attrs} are part of"
f" the bluesky interface and cannot be used as "
f"component names. Choose a different name."
)
# List Signal attribute names.
cls.component_names = tuple(cls._sig_attrs)
# The namedtuple associated with the device
cls._device_tuple = namedtuple(
f"{cls.__name__}Tuple",
[comp for comp in cls.component_names[:254] if not comp.startswith("_")],
)
# List the attributes that are Devices (not Signals).
# This list is used by stage/unstage. Only Devices need to be staged.
cls._sub_devices = [
attr for attr, cpt in cls._sig_attrs.items() if cpt.is_device
]
# All (obj, description) that may block the Device from being shown as
# connected:
cls._required_for_connection = dict(
obj._required_for_connection
for attr, obj in cls.__dict__.items()
if getattr(obj, "_required_for_connection", False)
)
def __init_subclass__(cls, **kwargs):
"This is called automatically in Python for all subclasses of Device"
super().__init_subclass__(**kwargs)
cls._initialize_device()
@classmethod
def walk_components(cls):
"""Walk all components in the Device hierarchy
Yields
------
ComponentWalk
Where ancestors is all ancestors of the signal, including the
top-level device `walk_components` was called on.
"""
for attr, cpt in cls._sig_attrs.items():
yield ComponentWalk(ancestors=(cls,), dotted_name=attr, item=cpt)
if issubclass(cpt.cls, Device) or hasattr(cpt.cls, "walk_components"):
sub_dev = cpt.cls
for walk in sub_dev.walk_components():
ancestors = (cls,) + walk.ancestors
dotted_name = ".".join((attr, walk.dotted_name))