-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDeposition_Control.py
394 lines (325 loc) · 16.5 KB
/
Deposition_Control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
import sys
from PyQt5 import uic
from PyQt5.QtCore import QTimer, QObject, QRegExp, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QIntValidator, QDoubleValidator, QStandardItem, QStandardItemModel
from PyQt5.QtWidgets import (QCheckBox, QFileDialog, QLabel, QLineEdit, QVBoxLayout,
QWidget, QMessageBox, QFormLayout, QFrame, QPushButton, QListView, QListWidgetItem,
QListWidget, QComboBox, QApplication, QMainWindow)
import os
import xml.etree.ElementTree as ET
from RPi_Hardware import RPiHardware
from Laser_Hardware import CompexLaser
from math import trunc
# ToDo: Write validators for steps
class DepStepItem(QListWidgetItem):
def __init__(self, *__args, copy_idx=None):
super().__init__(*__args)
# Add placeholder step variables to the item
if type(copy_idx) == int:
self.set_params(self.parentWidget().item(copy_idx).get_params())
else:
self.step_params = {
'step_index': None,
'step_name': 'New Step',
'target': 1,
'raster': True,
'tts_distance': 100,
'num_pulses': 100,
'reprate': 5,
'time_on_step': 20,
'delay': 0,
'man_action': ''
}
def get_params(self):
return self.step_params
def set_params(self, in_step_params):
self.step_params = in_step_params
def set_params_from_xml(self, xml):
if xml.tag != 'step':
print('Invalid xml element provided to load step from')
return
self.step_params = {
'step_index': xml.get('step_index'),
'step_name': xml.get('step_name'),
'target': xml.find('./target').text,
'raster': xml.find('./raster').text,
'tts_distance': xml.find('./tts_distance').text,
'num_pulses': xml.find('./num_pulses').text,
'reprate': xml.find('./reprate').text,
'time_on_step': xml.find('./time_on_step').text,
'delay': xml.find('./delay').text,
'man_action': xml.find('./man_action')
}
def set_step_index(self, step_index):
self.step_params['step_index'] = step_index
def set_step_name(self, name):
self.step_params['step_name'] = name
self.setText(name)
def set_target(self, target):
self.step_params['target'] = target
def set_raster(self, raster):
self.step_params['raster'] = raster
def calc_time_on_step(self):
self.step_params['time_on_step'] = int(self.step_params['num_pulses'] / self.step_params['reprate'])
def set_delay(self, delay):
self.step_params['delay'] = delay
def get_xml_element(self, get_subelement_dict=False):
tags = ['step_name', 'step_index']
step_xml_root = ET.Element('step')
step_xml_subelements = {key: ET.SubElement(step_xml_root, key)
for key in self.step_params
if key not in tags}
for key in tags:
step_xml_root.set(key, str(self.step_params[key]))
for key, element in step_xml_subelements.items():
element.text = str(self.step_params[key])
# if requested also return the dict for the subelements to make it easier to manipulate elswhere.
if get_subelement_dict is True:
return step_xml_root, step_xml_subelements
return step_xml_root
class DepControlBox(QWidget):
stop_deposition = pyqtSignal()
deposition_changed = pyqtSignal()
def __init__(self, laser: CompexLaser, brain: RPiHardware, parent: QMainWindow):
super().__init__()
self.setParent(parent)
# Load ui file and discover controls
uic.loadUi('./src/ui/pld_deposition_editor.ui', self)
self.btns = {widget.objectName().split('btn_')[1]: widget
for widget in self.findChildren(QPushButton, QRegExp('btn_*'))}
self.lines = {widget.objectName().split('line_')[1]: widget
for widget in self.findChildren(QLineEdit, QRegExp('line_*'))}
self.labels = {widget.objectName().split('lbl_')[1]: widget
for widget in self.findChildren(QLabel, QRegExp('lbl_*'))}
self.checks = {widget.objectName().split('check_')[1]: widget
for widget in self.findChildren(QCheckBox, QRegExp('check_*'))}
self.checks['raster'].setTristate(False)
self.combos = {widget.objectName().split('combo_')[1]: widget
for widget in self.findChildren(QComboBox, QRegExp('combo_*'))}
# Set the available list of targets in the combo box
# Todo: get targets and their locations
self.update_target_roster()
self.list_view = self.findChildren(QListWidget, QRegExp('list_dep_steps'))[0]
self.list_view.setSelectionMode(QListView.ExtendedSelection)
# FIXME: Build in the ability to load from an xml
# Create a thread to use for running depositions and move the deposition worker to it.
self.deposition_thread = QThread()
self.dep_worker_obj = DepositionWorker(laser, brain)
self.dep_worker_obj.moveToThread(self.deposition_thread)
self.init_connections()
def init_connections(self):
# TODO: Connect everything once supporting functions are written
self.btns['add_step'].clicked.connect(self.add_deposition_step)
self.btns['delete_steps'].clicked.connect(self.delete_selected_steps)
self.btns['copy_step'].clicked.connect(self.copy_deposition_step)
self.lines['step_name'].editingFinished.connect(self.update_item_name)
self.list_view.currentItemChanged.connect(self.on_item_change)
QApplication.instance().instrument_settings.settings_applied.connect(self.update_target_roster)
# Cross thread communications
self.btns['run_dep'].clicked.connect(self.run_deposition)
self.dep_worker_obj.deposition_interrupted.connect(self.deposition_thread.quit)
self.dep_worker_obj.deposition_finished.connect(self.deposition_thread.quit)
self.stop_deposition.connect(self.dep_worker_obj.halt_dep)
self.deposition_thread.started.connect(self.dep_worker_obj.start_deposition)
def on_item_change(self, current, previous):
updated = False
# Commit the changes by the user to the previously selected step
if previous is not None:
self.commit_changes(previous)
updated = True
# Update indices for all items before loading the new item. This will make indices make sense
self.update_step_indices()
# Load the values from the new steps into the controls
if current is not None:
self.load_step_params(current)
updated = True
if updated:
self.deposition_changed.emit()
def update_step_indices(self):
for index in range(0, self.list_view.count()):
self.list_view.item(index).set_step_index(index)
def update_item_name(self):
item = self.list_view.currentItem()
self.commit_changes(item)
item.setText(item.step_params['step_name'])
def load_step_params(self, item):
# Load parameters from the item that is passed in
ret_params = item.get_params()
for key, value in ret_params.items():
ret_params[key] = str(value)
# Apply parameters to controls
self.lines['step_name'].setText(item.text())
self.combos['select_target'].setCurrentText(ret_params['target'])
self.checks['raster'].setChecked(bool(ret_params['raster']))
self.lines['tts_distance'].setText(ret_params['tts_distance'])
self.lines['num_pulses'].setText(ret_params['num_pulses'])
self.lines['reprate'].setText(ret_params['reprate'])
self.lines['delay'].setText(ret_params['delay'])
self.lines['man_action'].setText(ret_params['man_action'])
def commit_changes(self, item):
try:
step_params = {
'step_index': self.list_view.row(item),
'step_name': self.lines['step_name'].text(),
'target': self.combos['select_target'].currentText(),
'raster': self.checks['raster'].checkState(),
'tts_distance': self.lines['tts_distance'].text(),
'num_pulses': self.lines['num_pulses'].text(),
'reprate': self.lines['reprate'].text(),
'time_on_step': str(int(self.lines['num_pulses'].text()) / int(self.lines['reprate'].text())),
'delay': self.lines['delay'].text(),
'man_action': self.lines['man_action'].text()
}
item.set_params(step_params)
except ValueError as err:
print(err)
print('Failed to set step parameters for {}, '
'please check that all values are valid and try again'.format(item.step_name))
def update_target_roster(self):
self.combos['select_target'].clear()
self.combos['select_target'].addItems(QApplication.instance().instrument_settings.get_target_roster(
formatlist=['number', 'composition', 'diameter']))
def add_deposition_step(self):
self.list_view.addItem(DepStepItem('New Step {}'.format(self.list_view.count() + 1), copy_idx=None))
self.update_step_indices()
def copy_deposition_step(self):
for item in self.list_view.selectedItems():
self.list_view.addItem(DepStepItem(item.text(), copy_idx=item.get_params()['step_index']))
self.update_step_indices()
def delete_selected_steps(self):
for item in self.list_view.selectedItems():
# Convoluted way to get the index from the selected item and then remove it, only way that seems to work
self.list_view.takeItem(self.list_view.indexFromItem(item).row())
self.update_step_indices()
def clear_deposition(self):
self.list_view.clear()
self.update_step_indices()
def update_time_on_step(self):
self.commit_changes(self.list_view.currentItem())
self.list_view.currentItem().calc_time_on_step()
self.labels['time_on_step'].setText(self.list_view.currentItem().step_params['time_on_step'])
def get_dep_xml(self):
dep_xml_root = ET.Element('deposition')
for index in range(0, self.list_view.count()):
dep_xml_root.insert(index, self.list_view.item(index).get_xml_element())
return dep_xml_root
def load_xml_dep(self, xml: ET.Element):
steps = xml.findall('./step')
# Make sure that the list of steps is sorted by index before import
steps.sort(key=lambda x: x.get('step_index'), reverse=False)
for step in steps:
temp = DepStepItem('New Step {}'.format(self.list_view.count() + 1), copy_idx=None)
temp.set_params_from_xml(step)
self.list_view.addItem(temp)
def run_deposition(self):
# FIXME: THIS IS BROKEN, NEED TO FIX check state logic.
# If the button has not been activated, check it then start the deposition
if not self.btns['run_dep'].isChecked():
self.btns['run_dep'].setChecked(True)
self.btns['run_dep'].setText('Stop Deposition')
# Get a list of steps and then sort it by index
self.deposition_thread.start()
# If the button is checked, uncheck and send the stop signal
elif self.btns['run_dep'].isChecked():
self.btns['run_dep'].setChecked(False)
self.btns['run_dep'].setText('Run Current Deposition')
self.stop_deposition.emit()
ET.dump(self.get_dep_xml())
# ToDo: Write run deposition for real
class DepositionWorker(QObject):
deposition_interrupted = pyqtSignal()
deposition_finished = pyqtSignal()
def __init__(self, laser: CompexLaser, brain: RPiHardware):
super().__init__()
self.laser = laser
self.brain = brain
self.stop = False
self.prev_tts = None
self.prev_target = None
self.curr_step_idx = None
self.steps = None
self.init_connections()
def init_connections(self):
pass
def start_deposition(self):
xml = self.parentWidget().get_dep_xml()
deposition = xml.getroot()
steps = deposition.findall('./step')
steps.sort(key=lambda x: x.get('step_index'), reverse=False)
self.steps = steps
if self.curr_step_idx is not None:
resume = QMessageBox.warning(self, 'Previous Deposition Aborted...',
'The previous deposition was aborted, would you like to resume? Press yes '
'to resume, no to restart from the beginning, and cancel to '
'take no action.',
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel,
QMessageBox.No)
if resume == QMessageBox.Yes:
steps = [x for x in steps if int(x.get('step_index')) >= self.curr_step_idx]
pass
elif resume == QMessageBox.No:
self.curr_step_idx = None
self.start_deposition(steps)
elif resume == QMessageBox.Cancel:
return
for step in steps:
self.curr_step_idx = step.get('step_index')
# Move the target carousel
if self.prev_target != step.find('./target').text:
self.brain.move_to_target(int(step.find('./target').text))
self.prev_target = step.find('./target').text
# Move the Substrate if necessary
if self.prev_tts != step.find('./tts_distance'):
# ToDo: Find a way to convert stepper steps to z translation
sub_position = int(step.find('./tts_distance'))
self.brain.sub_position(sub_position)
# Check that the sub and target positions are achieved. block until it is done
count = 0
while (self.brain.targets_running() or self.brain.substrate_running()) and not self.stop:
count += 1
# Make sure that the stop signal gets read before the motors come to completion.
QApplication.processEvents()
if count % 2 == 0:
print('Waiting for motors to finish movement')
# If stop has been sent, halt all and emit interrupted signal
if self.stop:
self.abort_all()
break
# Start rastering target, if necessary. Abort if brain and step have mismatched targets
if bool(step.find('./raster')) and self.brain.current_target == int(step.find('./target')):
self.brain.raster_current_target()
elif self.brain.current_target != int(step.find('./target')):
print('Target setting error')
self.stop = True
self.abort_all()
# Start pulsing
self.brain.start_pulsing(step.find('./reprate').text, step.find('./num_pulses'))
# Block until laser is finished
while self.brain.laser_running() and not self.stop:
count += 1
# Make sure that the stop signal gets read before the motors come to completion.
QApplication.processEvents()
if count % 10 == 0:
print('Laser pulses under way')
# If stop has been sent, halt all and emit interrupted signal
if self.stop:
self.abort_all()
break
# If there is a manual action item, pop up a box for the user
if step.find('./man_action').text != '':
manual_action = QMessageBox.warning(self, 'Manual action required...',
'The previous step was flagged as needing manual action. '
'Please {} before continuing.',
QMessageBox.Ok | QMessageBox.Abort,
QMessageBox.Ok)
if manual_action == QMessageBox.Ok:
pass
elif manual_action == QMessageBox.Abort:
self.stop = True
def abort_all(self):
self.brain.halt_sub()
self.brain.halt_target()
self.brain.stop_pulsing()
self.deposition_interrupted.emit()
def halt_dep(self):
self.stop = True