-
Notifications
You must be signed in to change notification settings - Fork 0
/
databridge.py
127 lines (120 loc) · 4.6 KB
/
databridge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import tkinter as tk
from tkinter import ttk
import sv_ttk
import serial.tools.list_ports
import serial
import time
import threading
import json
# root = tk.Tk()
def getPorts():
#Ger a list of all available COM ports
com_ports = serial.tools.list_ports.comports()
#Extract the port names from the list
port_names = [port.device for port in com_ports]
return port_names
def testConnection(com):
#on connection failure run timeout
def timeout_handler():
nonlocal connection_established
if not connection_established:
raise TimeoutError("Connection timed out")
connection_established = False
try:
ArdObj = serial.Serial(com,115200,timeout=3)
timer = threading.Timer(10,timeout_handler)
timer.start()
time.sleep(3)
ArdObj.write('Ping\n'.encode())
receiveMsg = ArdObj.readline().decode().strip()
if receiveMsg == "Pong":
connection_established = True
ArdObj.close()
return True
else:
ArdObj.close()
return False
except Exception as e:
return str(e)
finally:
if 'ArdObj' in locals():
ArdObj.close()
timer.cancel()
def send_coordinates(json_file,serial_port,master):
progress_view = tk.Toplevel(master)
progress_view.title("Progress")
progress_view.resizable(False,False)
progress_view.attributes("=topmost",1)
progress_frame = tk.Frame(progress_view)
progress_frame.grid()
with open(json_file) as file:
data = json.load(file)
steps = data[0]['task data']
total_steps = len(steps)
ser = serial.Serial(serial_port, baudrate=115200, timeout = 3)
progress_variable = tk.DoubleVar()
progress_bar = tk.Canvas(progress_frame, width=200, height =20)
progress_bar.create_rectangle(0,0,200,20,fill="grey")
progress_bar.create_rectangle(0,0,0,20,fill="green", tags="progress")
progress_bar.grid(row=2,column=1)
progressing = tk.Label(progress_frame, text="Task is running, please wait....")
progressing.grid(row=3, column=1)
sv_ttk.set_theme("dark")
time.sleep(3)
for step_index, step in enumerate(steps):
if 'end' in step:
time.sleep(3)
endmsg = "end"
ser.write(endmsg.encode())
break
arm_number = step['arm']
if 'wait' in step:
wait_time = step['wait']
time.sleep(wait_time)
else:
coordinates = step
if 'end' in coordinates:
break
message = f"\n{arm_number}{coordinates['s1']:03d}{coordinates['s2']:03d}{coordinates['s3']:03d}{coordinates['s4']:03d}{coordinates['s5']:03d}\n"
ser.write(message.encode())
print(message)
while True:
response = ser.readline().decode().strip()
print(response)
if response == "success":
time.sleep(3)
break
elif response == "end":
return
progress_variable.set((step_index / (total_steps - 1))*100)
progress = progress_variable.get()
progress_bar.coords("progress",(0,0,progress*2,20))
progress_bar.delete("text")
progress_text = f"{int(progress)}%"
progress_bar.create_text(100,10,text=progress_text,tags="text")
progress_view.update()
progress_variable.set(100)
progress = progress_variable.get()
progress_bar.coords("progress",(0,0,progress*2,20))
progress_bar.delete("text")
progress_text = f"{int(progress)}%"
progress_bar.create_text(100,10,text=progress_text,tags="text")
progress_view.destroy()
ser.close()
progress_view.mainloop()
def retrv_cur_pos(arm_no,com_port):
ser = serial.Serial(com_port, baudrate = 115200, timeout=3)
time.sleep(3)
message = f"{arm_no}curpos\n"
ser.write(message.encode())
time.sleep(3)
response = ser.readline().decode().strip()
ser.close()
return response
def move_by_one(arm_ang,amt,com_port):
ser = serial.Serial(com_port, baudrate=115200, timeout = 3)
time.sleep(3)
ang = int(amt)
message = f"{arm_ang}{ang:03d}\n"
ser.write(message.encode())
ser.close()