-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreactive-status.py
90 lines (68 loc) · 2.56 KB
/
reactive-status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import threading
import socket
import sys
import time
TELLO_COMMAND_PORT = 8889
TELLO_STATUS_PORT = 8890
TELLO_IP = "192.168.10.1"
class Drone:
"""
Creates a connection to the Tello drone, and exposes methods
for observing status updates and stopping updates
"""
def __init__(self, ip_address, command_port, status_port):
self._ip_address = ip_address
self._command_port = command_port
self._status_port = status_port
self._command_address = (self._ip_address, self._command_port)
self._stopper = threading.Event()
# create a socket to listen to the status from the drone
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.listen_socket.bind(("", self._status_port))
# start a thread that will listen to the status reports and dump them to
# the console
self.listen_thread = threading.Thread(target=self._listen)
self.listen_thread.daemon = True
self.listen_thread.start()
# create a socket over which we can send the command
# that triggers the drone to start generating status messages
self.command_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.command_socket.bind(("", self._command_port))
def start(self):
# the 'command' command causes the tello to start broadcasting its
# status, and allows it to accept control commands
self.send_command("command")
def _listen(self):
while not self._stopper.is_set():
data, _ = self.listen_socket.recvfrom(4096)
status = status_to_dict(data.decode("utf-8"))
print(status)
def send_command(self, command):
self.command_socket.sendto(command.encode("utf-8"), self._command_address)
def stop(self):
self._stopper.set()
self.listen_socket.close()
self.command_socket.close()
def status_to_dict(status_str):
"""
Helper that converts the semicolon delimited status values to
a dict
"""
status = {}
params = [param.strip() for param in status_str.split(";") if param.strip()]
for param in params:
(key, val) = param.split(":")
status[key] = val
return status
def listen():
drone = Drone(TELLO_IP, TELLO_COMMAND_PORT, TELLO_STATUS_PORT)
input("press any key to start\n")
drone.start()
# time.sleep(5.0)
# drone.send_command("takeoff")
# time.sleep(7.0)
# drone.send_command("land")
input("press any key to stop\n")
drone.stop()
if __name__ == "__main__":
listen()