-
Notifications
You must be signed in to change notification settings - Fork 42
/
connection.py
97 lines (71 loc) · 3.43 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import constants
import ismrmrd
import ctypes
import logging
import socket
import numpy as np
class Connection:
"""
This is a docstring. It should be a good one.
"""
def __init__(self, socket):
self.socket = socket
self.is_exhausted = False
self.handlers = {
constants.GADGET_MESSAGE_CONFIG_FILE: self.read_gadget_message_config_file,
constants.GADGET_MESSAGE_CONFIG_SCRIPT: self.read_gadget_message_config_script,
constants.GADGET_MESSAGE_PARAMETER_SCRIPT: self.read_gadget_message_parameter_script,
constants.GADGET_MESSAGE_CLOSE: self.read_gadget_message_close,
constants.GADGET_MESSAGE_ISMRMRD_ACQUISITION: self.read_gadget_message_ismrmrd_acquisition,
constants.GADGET_MESSAGE_ISMRMRD_WAVEFORM: self.read_gadget_message_ismrmrd_waveform,
constants.GADGET_MESSAGE_ISMRMRD_IMAGE: self.read_gadget_message_ismrmrd_image
}
def __iter__(self):
while not self.is_exhausted:
yield self.next()
def __next__(self):
return self.next()
def read(self, nbytes):
return self.socket.recv(nbytes, socket.MSG_WAITALL)
def send_image(self, image):
self.socket.send(constants.GadgetMessageIdentifier.pack(constants.GADGET_MESSAGE_ISMRMRD_IMAGE))
image.serialize_into(self.socket.send)
def send_acquisition(self, acquisition):
self.socket.send(constants.GadgetMessageIdentifier.pack(constants.GADGET_MESSAGE_ISMRMRD_ACQUISITION))
acquisition.serialize_into(self.socket.send)
def send_waveform(self, waveform):
self.socket.send(constants.GadgetMessageIdentifier.pack(constants.GADGET_MESSAGE_ISMRMRD_WAVEFORM))
waveform.serialize_into(self.socket.send)
def next(self):
id = self.read_gadget_message_identifier()
handler = self.handlers.get(id, lambda: Connection.unknown_message_identifier(id))
return handler()
@staticmethod
def unknown_message_identifier(identifier):
logging.error("Received unknown message type: %d", identifier)
raise StopIteration
def read_gadget_message_identifier(self):
identifier_bytes = self.read(constants.SIZEOF_GADGET_MESSAGE_IDENTIFIER)
return constants.GadgetMessageIdentifier.unpack(identifier_bytes)[0]
def read_gadget_message_length(self):
length_bytes = self.read(constants.SIZEOF_GADGET_MESSAGE_LENGTH)
return constants.GadgetMessageLength.unpack(length_bytes)[0]
def read_gadget_message_config_file(self):
config_file_bytes = self.read(constants.SIZEOF_GADGET_MESSAGE_CONFIGURATION_FILE)
config_file = constants.GadgetMessageConfigurationFile.unpack(config_file_bytes)[0]
return config_file
def read_gadget_message_config_script(self):
length = self.read_gadget_message_length(self)
return self.read(length)
def read_gadget_message_parameter_script(self):
length = self.read_gadget_message_length()
return self.read(length)
def read_gadget_message_close(self):
self.is_exhausted = True
raise StopIteration
def read_gadget_message_ismrmrd_acquisition(self):
return ismrmrd.Acquisition.deserialize_from(self.read)
def read_gadget_message_ismrmrd_waveform(self):
return ismrmrd.Waveform.deserialize_from(self.read)
def read_gadget_message_ismrmrd_image(self):
return ismrmrd.Image.deserialize_from(self.read)