-
Notifications
You must be signed in to change notification settings - Fork 9
/
nbclient.py
executable file
·148 lines (126 loc) · 4.83 KB
/
nbclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import time
from threading import Event
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.transport.base import BaseTransport as BT
from py9b.command.regio import ReadRegs, WriteRegs
from kivy.utils import platform
from kivy.event import EventDispatcher
from kivy.clock import mainthread
from kivy.properties import BooleanProperty, StringProperty, ObjectProperty
from utils import tprint, specialthread
import asyncio
from threading import Event
class Client(EventDispatcher):
state = StringProperty('disconnected')
transport = StringProperty('')
address = ObjectProperty('')
link = StringProperty('')
_link = None
_tran = None
def __init__(self):
self.register_event_type('on_error')
super(Client, self).__init__()
self.loop = None
self.stay_connected = False
@mainthread
def connect(self):
self.update_state('connecting')
try:
link = None
if self.link == 'ble':
if platform == 'android':
from py9b.link.droidble import BLELink
link = BLELink()
else:
from py9b.link.bleak import BLELink
if self.loop is None:
self.loop = asyncio.get_event_loop()
link = BLELink(loop=self.loop)
elif self.link == 'tcp':
from py9b.link.tcp import TCPLink
link = TCPLink()
elif self.link == 'serial':
if platform == 'android':
tprint('Serial on Android is only available in Pro')
else:
from py9b.link.serial import SerialLink
link = SerialLink(timeout=1.0)
elif self.link == 'mock':
from mocklink import MockLink
link = MockLink()
elif self.link == None:
tprint('select interface first')
self.disconnect()
if self.transport is '':
self.disconnect()
tprint('select protocol first')
if self.transport is not '' and self.link is not None:
link.__enter__()
# This is split into two parts due to some link implementations
# (namely droidble) requiring some initalization in main thread...
self._connect_inner(link)
else:
tprint('One or more parameters are not set')
except Exception as exc:
self.dispatch('on_error', repr(exc))
raise exc
@specialthread
def _connect_inner(self, link):
try:
if self.address is '':
ports = link.scan()
if not link.scanned.is_set():
link.scanned.wait(link.timeout)
if not ports:
raise Exception('No devices found')
if isinstance(ports[0], tuple):
self.address = ports[0][1]
else:
self.address = ports[0]
link.open(self.address)
elif self.address is not '':
link.open(self.address)
transport = None
if self.transport == 'ninebot':
from py9b.transport.ninebot import NinebotTransport
transport = NinebotTransport(link)
elif self.transport == 'xiaomi':
from py9b.transport.xiaomi import XiaomiTransport
transport = XiaomiTransport(link)
if transport.execute(ReadRegs(BT.ESC, 0x68, "<H"))[0] > 0x081 and self.link is ('ble'):
transport.keys = link.fetch_keys()
transport.recover_keys()
tprint('Keys recovered')
tprint('functionality may be limited for now')
self._tran = transport
self._link = link
if not self._link.connected.is_set():
self._link.connected.wait(self._link.timeout*1.5)
if self._link.connected.is_set():
self.update_state('connected')
else:
self.disconnect()
return transport
except Exception as exc:
self.dispatch('on_error', repr(exc))
raise exc
@mainthread
def update_state(self, state):
print('Current state:', state)
self.state = state
def disconnect(self):
self.update_state('disconnecting')
self.address = ''
try:
self._link.close()
self._link = None
self._tran = None
except:
pass
self.update_state('disconnected')
def on_error(self, *args):
self.disconnect()
if self.stay_connected:
self.connect()
# Required for event handling dispatch
pass