-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpysparc_daq
executable file
·335 lines (256 loc) · 11.3 KB
/
pysparc_daq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python
"""HiSPARC data acquisition in Python (PySPARC)"""
import logging
import os
import time
import ConfigParser
import pkg_resources
import schedule
from pysparc.hardware import HiSPARCII, HiSPARCIII, TrimbleGPS
from pysparc.ftdi_chip import DeviceNotFoundError
from pysparc.align_adcs import AlignADCs, AlignADCsPrimarySecondary
from pysparc.events import Stew, ConfigEvent, Mixer
from pysparc import messages, storage, monitor
SYSTEM_CONFIGFILE = pkg_resources.resource_filename('pysparc', 'config.ini')
CONFIGFILE = os.path.expanduser('~/.pysparc')
DATAFILE = os.path.expanduser('~/hisparc.h5')
ALL_CONFIG_FILES = [SYSTEM_CONFIGFILE, CONFIGFILE]
def run_once(func, *args, **kwargs):
"""Run a job only once."""
func(*args, **kwargs)
return schedule.CancelJob
class DataAcquisition(object):
def __init__(self):
self.config = ConfigParser.ConfigParser()
logging.info("Reading config from file")
self.config.read(ALL_CONFIG_FILES)
self.open_hisparc_hardware()
self.gps = TrimbleGPS()
self.initialize_hardware()
station_name = self.config.get('DAQ', 'station_name')
station_number = self.config.getint('DAQ', 'station_number')
station_password = self.config.get('DAQ', 'station_password')
self.primary_stew = Stew()
self.storage_manager = storage.StorageManager()
self.datastore = storage.NikhefDataStore(station_number,
station_password)
self.storage_manager.add_datastore(self.datastore, 'queue_nikhef')
store_data_in_file = self.config.getboolean('DAQ',
'store_data_in_file')
if store_data_in_file:
self.filestore = storage.TablesDataStore(DATAFILE)
self.storage_manager.add_datastore(self.filestore, 'queue_file')
self.monitor = monitor.Monitor(station_name)
def open_hisparc_hardware(self):
try:
self.primary = HiSPARCIII()
except DeviceNotFoundError:
self.primary = HiSPARCII()
# Give hardware 20 seconds to start up
self.t_last_msg = time.time() + 20
def initialize_hardware(self):
logging.info("Initializing device configuration")
self.configure_devices()
if self.config.getboolean('DAQ', 'force_reset_gps'):
logging.info("Force reset GPS to factory defaults.")
self.gps.reset_defaults()
self.config.set('DAQ', 'force_reset_gps', False)
if self.config.getboolean('DAQ', 'force_align_adcs'):
logging.info("Force aligning ADCs.")
self.align_adcs()
self.config.set('DAQ', 'force_align_adcs', False)
self.write_config()
def configure_devices(self):
"""Read configuration into device"""
self.primary.config.read_config(self.config)
def align_adcs(self):
"""Align ADCs"""
align_adcs = AlignADCs(self.primary)
align_adcs.align()
def run(self):
"""Take data, process and store events"""
# The first configuration message does not include GPS information.
# Flush it, and possibly other outdated messages, and request it later.
self.flush_devices()
self.schedule_jobs()
logging.info("Taking data.")
try:
while True:
self.read_and_process_messages()
schedule.run_pending()
except KeyboardInterrupt:
logging.info("Interrupted by user.")
def schedule_jobs(self):
"""Schedule jobs for the data run"""
schedule.every(1).seconds.do(self.process_and_store_events)
schedule.every(30).seconds.do(self.request_config_from_device)
schedule.every(30).seconds.do(self.check_silent_devices)
schedule.every().minute.do(self.send_monitor_messages)
schedule.every().minute.do(self.log_status)
schedule.every().day.at('2:00').do(self.store_config_event)
# After 1 minute, store the configuration *once*
schedule.every().minute.do(run_once, self.store_config_event)
def read_and_process_messages(self):
"""Read messages from the hardware and process them"""
msg = self.primary.read_message()
if msg is not None:
self.t_last_msg = time.time()
self.process_message(msg, self.primary_stew)
def process_message(self, msg, stew):
"""Process a hardware message and throw it in the stew."""
if isinstance(msg, messages.MeasuredDataMessage):
stew.add_event_message(msg)
elif isinstance(msg, messages.OneSecondMessage):
stew.add_one_second_message(msg)
logging.debug("One-second received: %d (%d %d %d %d)",
msg.timestamp, msg.count_ch1_low, msg.count_ch1_high,
msg.count_ch2_low, msg.count_ch2_high)
elif isinstance(msg, messages.ControlParameterList):
# No need to process this message. This is already done in the
# hardware class
pass
def process_and_store_events(self):
"""Process events from the stew and store them in the datastore."""
self.primary_stew.stir()
events = self.primary_stew.serve_events()
self.store_events(events)
self.primary_stew.drain()
def send_monitor_messages(self):
"""Send all monitor messages."""
self.monitor.send_uptime()
self.monitor.send_cpu_load()
self.monitor.send_trigger_rate(self.primary_stew.event_rate())
def log_status(self):
logging.info("Event rate: %.1f Hz", self.primary_stew.event_rate())
def request_config_from_device(self):
"""Request configuration from device.
This includes gps positions and PMT currents. When the hardware
responds, the config objects are automatically updated by the hardware
classes when the message is read, and are available a short time after
calling this method.
"""
self.primary.send_message(messages.GetControlParameterList())
def check_silent_devices(self):
if time.time() - self.t_last_msg > 20:
logging.error("Hardware is silent, resetting.")
self.reset_devices()
def flush_devices(self):
"""Flush devices"""
self.primary.flush_device()
def reset_devices(self):
"""Reset hardware devices"""
self.primary.reset_hardware()
def store_events(self, events):
for event in events:
try:
self.storage_manager.store_event(event)
except Exception as e:
logging.error(str(e))
logging.debug("Stored %d events.", len(events))
def store_config_event(self):
config = ConfigEvent(self.primary.config)
self.storage_manager.store_event(config)
logging.info("Sent configuration message.")
def write_config(self):
self.primary.config.write_config(self.config)
with open(CONFIGFILE, 'w') as f:
self.config.write(f)
def close(self):
logging.info("Closing down")
self.primary.close()
self.storage_manager.close()
self.datastore.close()
class PrimarySecondaryDataAcquisition(DataAcquisition):
"""HiSPARC data acquisition using a primary/secondary setup"""
def __init__(self):
super(PrimarySecondaryDataAcquisition, self).__init__()
self.secondary_stew = Stew()
self.mixer = Mixer()
def open_hisparc_hardware(self):
try:
self.secondary = HiSPARCIII(secondary=True)
except DeviceNotFoundError:
self.secondary = HiSPARCII(secondary=True)
super(PrimarySecondaryDataAcquisition, self).open_hisparc_hardware()
# Give hardware 20 seconds to start up
self.t_last_secondary_msg = time.time() + 20
def configure_devices(self):
"""Read configuration into device"""
super(PrimarySecondaryDataAcquisition, self).configure_devices()
self.secondary.config.read_config(self.config)
def align_adcs(self):
"""Align ADCs"""
align_adcs = AlignADCsPrimarySecondary(self.primary, self.secondary)
align_adcs.align()
def read_and_process_messages(self):
"""Read messages from the hardware and process them"""
super(PrimarySecondaryDataAcquisition, self).read_and_process_messages()
msg = self.secondary.read_message()
if msg is not None:
self.t_last_secondary_msg = time.time()
self.process_message(msg, self.secondary_stew)
def process_and_store_events(self):
"""Process events from the stew and store them in the datastore."""
self.primary_stew.stir()
self.secondary_stew.stir()
primary_events = self.primary_stew.serve_events()
self.mixer.add_primary_events(primary_events)
secondary_events = self.secondary_stew.serve_events()
self.mixer.add_secondary_events(secondary_events)
self.mixer.mix()
events = self.mixer.serve_events()
self.store_events(events)
self.primary_stew.drain()
self.secondary_stew.drain()
def request_config_from_device(self):
"""Request configuration from device.
This includes gps positions and PMT currents. When the hardware
responds, the config objects are automatically updated by the hardware
classes when the message is read, and are available a short time after
calling this method.
"""
super(PrimarySecondaryDataAcquisition, self).request_config_from_device()
self.secondary.send_message(messages.GetControlParameterList())
def check_silent_devices(self):
now = time.time()
if (now - self.t_last_msg > 20) or (now - self.t_last_secondary_msg > 20):
logging.error("Hardware is silent, resetting.")
self.reset_devices()
def flush_devices(self):
"""Flush devices"""
super(PrimarySecondaryDataAcquisition, self).flush_devices()
self.secondary.flush_device()
def reset_devices(self):
"""Reset hardware devices"""
super(PrimarySecondaryDataAcquisition, self).reset_devices()
self.secondary.reset_hardware()
def store_config_event(self):
config = ConfigEvent(self.primary.config, self.secondary.config)
self.storage_manager.store_event(config)
logging.info("Sent configuration message.")
def write_config(self):
self.secondary.config.write_config(self.config)
super(PrimarySecondaryDataAcquisition, self).write_config()
def close(self):
super(PrimarySecondaryDataAcquisition, self).close()
self.secondary.close()
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(name)s:%(message)s')
# raise urllib3 module log level to WARNING
urllib_log = logging.getLogger("urllib3")
urllib_log.setLevel(logging.WARNING)
# raise schedule module log level to WARNING
schedule_log = logging.getLogger("schedule")
schedule_log.setLevel(logging.WARNING)
try:
# try primary/secondary
app = PrimarySecondaryDataAcquisition()
except DeviceNotFoundError:
# try primary-only
app = DataAcquisition()
try:
app.run()
finally:
app.close()