-
Notifications
You must be signed in to change notification settings - Fork 1
/
stellarium_alpaca.py
executable file
·151 lines (116 loc) · 3.8 KB
/
stellarium_alpaca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import sys
assert sys.version_info >= (3, 0)
import warnings
warnings.filterwarnings("ignore", module="urllib3")
import os
import getopt
import re
import asyncio
import time
import requests
from datetime import datetime
from datetime import timezone
from math import pi
# https://rhodesmill.org/pyephem
import ephem
####### Globals
local_port = 10001
alpaca_server = '127.0.0.1'
alpaca_port = 5555
alpaca_client_id = 65432
alpaca_transaction_id = 123
LOGGING = False
DEBUG = False
TESTS = False
####### Alpaca
def alpaca_goto(ra, dec):
rest_cmd = '/api/v1/telescope/0/slewtocoordinatesasync'
url = 'http://' + alpaca_server + ':' + str(alpaca_port) + rest_cmd
payload = {'ClientTransactionID' : str(alpaca_transaction_id) , 'ClientID' : str(alpaca_client_id), 'RightAscension' : str(ra), 'Declination' : str(dec) }
if LOGGING:
print(f">>> Alpaca: {url}: {payload}")
try:
r = requests.put(url, data=payload)
if DEBUG:
print(f"alpaca_goto ra={ra} decl={dec} http status code={r.status_code} response={r.content}")
except Exception as error:
print(f"Error {error}")
####### Stellarium
def dec2dms(dd):
is_positive = dd >= 0
dd = abs(dd)
minutes,seconds = divmod(dd*3600,60)
degrees,minutes = divmod(minutes,60)
degrees = degrees if is_positive else -degrees
return f"{int(degrees)}:{int(minutes)}:{seconds:.2f}"
def dms2dec(dms):
(degree, minute, second, frac_seconds) = re.split('\D+', dms, maxsplit=4)
return int(degree) + float(minute) / 60 + float(second) / 3600 + float(frac_seconds) / 360000
def decode_stellarium_packet(s):
t = int.from_bytes(s[4:11], byteorder='little')
ra = int.from_bytes(s[12:16], byteorder='little')
dec = int.from_bytes(s[16:20], byteorder='little', signed=True)
ra = (24*ra)/0x100000000
dec = (90*dec)/0x40000000
if DEBUG:
print(f"<<< Stellarium: t={t} ra={ra} dec={dec}")
return (ra, dec)
####### main loop
async def mainloop():
while True:
await asyncio.sleep(1)
####### network
async def handle_local_input(reader, writer):
while True:
data = await reader.read(256)
if not data:
break
if DEBUG:
print(f"<<< Stellarium: {':'.join(('0'+hex(x)[2:])[-2:] for x in data)}")
(ra, dec) = decode_stellarium_packet(data)
alpaca_goto(ra, dec)
async def main(argv):
global LOGGING, DEBUG, TESTS
global local_port
global alpaca_port
usage = f"{os.path.basename(sys.argv[0])} [-dhl] --StellariumPort <Stellarium port> --AlpacaPort <Alpca port>"
try:
opts, args = getopt.getopt(argv,"dhl",["StellariumPort=", "AlpacaPort="])
except getopt.GetoptError:
print (usage)
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print (usage)
sys.exit()
elif opt == "--StellariumPort":
local_port = int(arg)
elif opt == "--AlpacaPort":
alpaca_port = int(arg)
elif opt == "-l":
LOGGING = True
elif opt == "-d":
DEBUG = True
print (f"Stellarium port={local_port}")
print (f"Alpaca port={alpaca_port}")
if LOGGING:
print("Logging is on")
if DEBUG:
print("Debug is on")
local_server = await asyncio.start_server(lambda reader, writer: handle_local_input(reader, writer), 'localhost', local_port)
tasks = [
mainloop()
]
async with local_server:
await asyncio.gather(*tasks)
#######
if __name__ == "__main__":
try:
asyncio.run(main(sys.argv[1:]))
except ValueError as value:
print(f"{value}\nQuit.")
except Exception as error:
print(f"Error {error}, quit.")
except KeyboardInterrupt as kb:
print("Keyboard interrupt.")