|
| 1 | +##################################################################### |
| 2 | +# # |
| 3 | +# /labscript_devices/PrawnBlaster/blacs_tab.py # |
| 4 | +# # |
| 5 | +# Copyright 2021, Philip Starkey # |
| 6 | +# # |
| 7 | +# This file is part of labscript_devices, in the labscript suite # |
| 8 | +# (see http://labscriptsuite.org), and is licensed under the # |
| 9 | +# Simplified BSD License. See the license.txt file in the root of # |
| 10 | +# the project for the full license. # |
| 11 | +# # |
| 12 | +##################################################################### |
| 13 | +from blacs.device_base_class import ( |
| 14 | + DeviceTab, |
| 15 | + define_state, |
| 16 | + MODE_BUFFERED, |
| 17 | + MODE_MANUAL, |
| 18 | + MODE_TRANSITION_TO_BUFFERED, |
| 19 | + MODE_TRANSITION_TO_MANUAL, |
| 20 | +) |
| 21 | +import labscript_utils.properties |
| 22 | + |
| 23 | +from qtutils.qt import QtWidgets |
| 24 | + |
| 25 | + |
| 26 | +class PrawnBlasterTab(DeviceTab): |
| 27 | + def initialise_GUI(self): |
| 28 | + self.connection_table_properties = ( |
| 29 | + self.settings["connection_table"].find_by_name(self.device_name).properties |
| 30 | + ) |
| 31 | + |
| 32 | + digital_outs = {} |
| 33 | + for pin in self.connection_table_properties["out_pins"]: |
| 34 | + digital_outs[f"GPIO {pin:02d}"] = {} |
| 35 | + |
| 36 | + # Create a single digital output |
| 37 | + self.create_digital_outputs(digital_outs) |
| 38 | + # Create widgets for output objects |
| 39 | + _, _, do_widgets = self.auto_create_widgets() |
| 40 | + # and auto place the widgets in the UI |
| 41 | + self.auto_place_widgets(("Flags", do_widgets)) |
| 42 | + |
| 43 | + # Create status labels |
| 44 | + self.status_label = QtWidgets.QLabel("Status: Unknown") |
| 45 | + self.clock_status_label = QtWidgets.QLabel("Clock status: Unknown") |
| 46 | + self.get_tab_layout().addWidget(self.status_label) |
| 47 | + self.get_tab_layout().addWidget(self.clock_status_label) |
| 48 | + |
| 49 | + # Set the capabilities of this device |
| 50 | + self.supports_smart_programming(True) |
| 51 | + |
| 52 | + # Create status monitor timout |
| 53 | + self.statemachine_timeout_add(2000, self.status_monitor) |
| 54 | + |
| 55 | + def get_child_from_connection_table(self, parent_device_name, port): |
| 56 | + # Pass down channel name search to the pseudoclocks (so we can find the |
| 57 | + # clocklines) |
| 58 | + if parent_device_name == self.device_name: |
| 59 | + device = self.connection_table.find_by_name(self.device_name) |
| 60 | + |
| 61 | + for pseudoclock_name, pseudoclock in device.child_list.items(): |
| 62 | + for child_name, child in pseudoclock.child_list.items(): |
| 63 | + # store a reference to the internal clockline |
| 64 | + if child.parent_port == port: |
| 65 | + return DeviceTab.get_child_from_connection_table( |
| 66 | + self, pseudoclock.name, port |
| 67 | + ) |
| 68 | + |
| 69 | + return None |
| 70 | + |
| 71 | + def initialise_workers(self): |
| 72 | + # Find the COM port to be used |
| 73 | + com_port = str( |
| 74 | + self.settings["connection_table"] |
| 75 | + .find_by_name(self.device_name) |
| 76 | + .BLACS_connection |
| 77 | + ) |
| 78 | + |
| 79 | + worker_initialisation_kwargs = { |
| 80 | + "com_port": com_port, |
| 81 | + "num_pseudoclocks": self.connection_table_properties["num_pseudoclocks"], |
| 82 | + "out_pins": self.connection_table_properties["out_pins"], |
| 83 | + "in_pins": self.connection_table_properties["in_pins"], |
| 84 | + } |
| 85 | + self.create_worker( |
| 86 | + "main_worker", |
| 87 | + "labscript_devices.PrawnBlaster.blacs_workers.PrawnBlasterWorker", |
| 88 | + worker_initialisation_kwargs, |
| 89 | + ) |
| 90 | + self.primary_worker = "main_worker" |
| 91 | + |
| 92 | + @define_state( |
| 93 | + MODE_MANUAL |
| 94 | + | MODE_BUFFERED |
| 95 | + | MODE_TRANSITION_TO_BUFFERED |
| 96 | + | MODE_TRANSITION_TO_MANUAL, |
| 97 | + True, |
| 98 | + ) |
| 99 | + def status_monitor(self, notify_queue=None): |
| 100 | + # When called with a queue, this function writes to the queue |
| 101 | + # when the pulseblaster is waiting. This indicates the end of |
| 102 | + # an experimental run. |
| 103 | + status, clock_status, waits_pending = yield ( |
| 104 | + self.queue_work(self.primary_worker, "check_status") |
| 105 | + ) |
| 106 | + |
| 107 | + # Manual mode or aborted |
| 108 | + done_condition = status == 0 or status == 5 |
| 109 | + |
| 110 | + # Update GUI status/clock status widgets |
| 111 | + self.status_label.setText(f"Status: {status}") |
| 112 | + self.clock_status_label.setText(f"Clock status: {clock_status}") |
| 113 | + |
| 114 | + if notify_queue is not None and done_condition and not waits_pending: |
| 115 | + # Experiment is over. Tell the queue manager about it, then |
| 116 | + # set the status checking timeout back to every 2 seconds |
| 117 | + # with no queue. |
| 118 | + notify_queue.put("done") |
| 119 | + self.statemachine_timeout_remove(self.status_monitor) |
| 120 | + self.statemachine_timeout_add(2000, self.status_monitor) |
| 121 | + |
| 122 | + @define_state(MODE_BUFFERED, True) |
| 123 | + def start_run(self, notify_queue): |
| 124 | + self.statemachine_timeout_remove(self.status_monitor) |
| 125 | + yield (self.queue_work(self.primary_worker, "start_run")) |
| 126 | + self.status_monitor() |
| 127 | + self.statemachine_timeout_add(100, self.status_monitor, notify_queue) |
0 commit comments