forked from mattsoulanille/nimbie-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdriver.py
271 lines (218 loc) · 8.69 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import usb.core
import usb.util
array = usb.util.array.array
import sys
from eject import open_tray, close_tray
from typing import List, Union, Dict
# Largest incoming packet in bytes as listed in the endpoint descriptor
IN_SIZE = 64
# The array -> string decoder's error message
class NotStringError(TypeError):
pass
# If the state of the hardware does not support
# the requested operation
class HardwareStateError(Exception):
pass
# The tray already has a disk
class DiskInTrayError(HardwareStateError):
pass
# The tray has no disk in it
class NoDiskInTrayError(HardwareStateError):
pass
# No disk available in the input queue
class NoDiskError(HardwareStateError):
pass
# The tray is closed or opened when it should be the opposite
class TrayInvalidStateError(HardwareStateError):
pass
# An error involving the state of the dropper
# Perhaps it is missing a disk
# Perhaps you're trying to place another disk
# while the dropper is still up.
class DropperError(HardwareStateError):
pass
class Nimbie:
def __init__(self):
"""Detect the connected Nimbie"""
dev = usb.core.find(idVendor=0x1723, idProduct=0x0945)
if dev is None:
raise ValueError('Device not found')# was it found?
dev.set_configuration() # There's only one config so use that one
# get an endpoint instance
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]
self.in_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
# OUT means out of the computer and into the device
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_IN)
self.out_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
# OUT means out of the computer and into the device
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT)
def send_command(self, *command: int) -> str:
"""
Send a command of up to six bytes to the Nimbie
"""
if len(args) > 6:
raise Exception("Too many arguments. Maximum of 6")
message = bytearray(8)
for i in range(len(args)):
message[i + 2] = args[i]
self.out_ep.write(message)
response = self.get_response()
return self.extract_statuscode(response)
def get_response(self, minimum=1) -> List[str]:
"""
Get the Nimbie's raw response to a command
The nimbie sends several messages in response to a command.
This function reads messages from the Nimbie until it receives
an empty message. Since the first message is usually empty,
the `minimum` variable specifies a minimum number of messages
to read.
"""
# Get at least `minimum` messages
messages = []
# Get the minimum number of messages
for i in range(minimum):
message = self.read()
messages.append(message)
# Get any more messages that aren't null
message = self.read()
while len(message) > 0:
messages.append(message)
message = self.read()
return messages
@staticmethod
def extract_statuscode(response_list: List[str]) -> str:
"""Attempt to extract the Nimbie's status code from a sequence of its messages
The Nimbie responds to commands with an undefined number of
empty messages, then the message "OK", and finally its status code.
This function extracts and returns the status code.
"""
try:
ok_index = response_list.index("OK")
except ValueError:
raise ValueError("Expected message 'OK' from nimbie "
+"but did not receive message. Instead got "
+str(response_list))
return response_list[ok_index + 1]
@staticmethod
def array_to_string(array: array) -> str:
"""Attempt to parse an array of integers as a null terminated ASCII string"""
if (len(array) == 0):
return ""
# Expect null termination if nonempty string
if array[-1] != 0:
raise NotStringError("Expected array to be null terminated but got " + str(array[-1]))
return "".join([chr(x) for x in array][:-1])
def read_data(self) -> array:
"""Read the next message from the Nimbie as an array of integers"""
# Maybe have the timeout be an option instead of just 20 seconds?
return self.in_ep.read(IN_SIZE, 20000)
def read(self) -> Union[str, array]:
"""Attempt to read a null terminated string from the Nimbie
Returns an array of integers if it is not null terminated
"""
data = self.read_data()
try:
return self.array_to_string(data)
except NotStringError:
return data
@staticmethod
def decode_statuscode(statuscode: str) -> Union[Exception, str]:
"""Decode one of the Nimbie's status codes
Returns an exception on error codes and a string on status codes.
"""
assert statuscode[0:3] == "AT+" # The prefix for all status codes
code = statuscode[3:] # The part that changes
if (code == "S12"):
return DiskInTrayError("The tray already has a disk")
if (code == "S14"):
return NoDiskError("No disk in disk queue")
if (code == "S10"):
return TrayInvalidStateError("The tray is in the "
+"opposite state it should be in")
if (code == "S03"):
return DropperError("The dropper has an error (maybe it's "
+"missing a disk. Maybe you're attempting "
+"to place a disk on it while it's still up).")
if (code == "S00"):
return NoDiskInTrayError("The tray has no disk in it")
if (code == "O"):
return "Dropper success (lifting or dropping)"
if (code == "S07"):
return "Successfully placed disk on tray"
return "Unknown status code"
# Try the command and throw an error if we get an error code
def try_command(self, *command: int) -> str:
"""Try the command, throwing an error if the Nimbie throws one"""
result = self.send_command(*command)
decoded = self.decode_statuscode(result)
if isinstance(decoded, Exception):
raise decoded
return decoded
def place_disk(self) -> str:
"""Place the next disk from the queue into the tray"""
return self.try_command(0x52, 0x01)
def lift_disk(self) -> str:
"""Lift the disk from the tray"""
return self.try_command(0x47, 0x01)
def accept_disk(self) -> str:
"""Drop the lifted disk into the accept pile"""
return self.try_command(0x52, 0x02)
def reject_disk(self) -> str:
"""Drop the lifted disk into the reject pile"""
return self.try_command(0x52, 0x03)
def get_state(self) -> Dict[str, bool]:
"""Gets the state of the Nimbie hardware
The state is a dictionary of boolean values with
the following strings as keys:
disk_available
disk_in_open_tray
disk_lifted
tray_out
"""
state_str = self.send_command(0x43)
return {"disk_available": state_str[2] == "1",
"disk_in_open_tray": state_str[4] == "1",
"disk_lifted": state_str[5] == "1",
"tray_out": state_str[6] == "1",
}
def disk_available(self) -> bool:
"""Whether or not a disk is available in the input queue"""
return self.get_state()["disk_available"]
def load_next_disk(self) -> None:
"""Load the next disk into the reader
Ejects the tray, places the disk, and returns the tray.
"""
open_tray()
self.place_disk()
close_tray()
def accept_current_disk(self) -> None:
"""Accept the currently loaded disk
Ejects the disk, picks it up, and drops it into
the accept pile.
"""
open_tray()
self.lift_disk()
close_tray()
self.accept_disk()
def reject_current_disk(self) -> None:
"""Reject the currently loaded disk
Ejects the disk, picks it up, and drops it into
the reject pile.
"""
open_tray()
self.lift_disk()
close_tray()
self.reject_disk()
if __name__ == "__main__":
n = Nimbie()