-
Notifications
You must be signed in to change notification settings - Fork 0
/
covers
executable file
·137 lines (108 loc) · 4.34 KB
/
covers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
#
# This file is part of the Robotic Observatory Control Kit (rockit)
#
# rockit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# rockit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rockit. If not, see <http://www.gnu.org/licenses/>.
"""Commandline client for controlling the W1m mirror covers"""
import glob
import os
import sys
import Pyro4
from rockit.common import print
from rockit.covers import Config, CommandStatus, CoversState
SCRIPT_NAME = os.path.basename(sys.argv[0])
sys.excepthook = Pyro4.util.excepthook
def run_command(command, args):
"""Runs a daemon command, handling error messages"""
if 'COVERSD_CONFIG_PATH' in os.environ:
config = Config(os.environ['COVERSD_CONFIG_PATH'])
else:
# Load the config file defined in the COVERSD_CONFIG_PATH environment variable or from the
# default system location (/etc/coversd/). Exit with an error if zero or multiple are found.
files = glob.glob("/etc/coversd/*.json")
if len(files) != 1:
print('error: failed to guess the default config file. ' +
'Run as COVERSD_CONFIG_PATH=/path/to/config.json covers <command>')
return 1
config = Config(files[0])
try:
ret = command(config, args)
except Pyro4.errors.CommunicationError:
ret = -101
# Print message associated with error codes, except for -1 (error handled locally)
if ret not in [-1, 0]:
print(CommandStatus.message(ret))
sys.exit(ret)
def print_status(config, _):
"""Prints a human-readable summary of the covers status"""
with config.daemon.connect() as covers:
data = covers.report_status()
print(f'Covers are {CoversState.label(data["state"], formatting=True)}')
return 0
def open_covers(config, _):
"""Request covers to be open"""
try:
with config.daemon.connect(timeout=config.daemon.default_timeout + config.move_timeout) as covers:
return covers.open_covers()
except KeyboardInterrupt:
# ctrl-c terminates the running command
with config.daemon.connect() as covers:
return covers.stop_covers()
def close_covers(config, _):
"""Request covers to be closed"""
try:
with config.daemon.connect(timeout=config.daemon.default_timeout + config.move_timeout) as covers:
return covers.close_covers()
except KeyboardInterrupt:
# ctrl-c terminates the running command
with config.daemon.connect() as covers:
return covers.stop_covers()
def stop_covers(config, _):
"""Request covers to stop immediately"""
with config.daemon.connect() as covers:
return covers.stop_covers()
def initialize(config, _):
"""Connect to the covers controller"""
with config.daemon.connect() as covers:
return covers.initialize()
def shutdown(config, *_):
"""Disconnect from the covers controller"""
with config.daemon.connect() as covers:
return covers.shutdown()
def print_usage():
"""Prints the utility help"""
print(f'usage: {SCRIPT_NAME} <command>')
print()
print('general commands:')
print(' open request the covers to be open')
print(' close request the covers to be closed')
print(' stop stop cover movement')
print(' status print a human-readable summary of the covers status')
print('engineering commands:')
print(' init connect to the covers controller')
print(' kill disconnect from covers controller')
print()
return 0
if __name__ == '__main__':
commands = {
'open': open_covers,
'close': close_covers,
'stop': stop_covers,
'status': print_status,
'init': initialize,
'kill': shutdown,
}
if len(sys.argv) >= 2 and sys.argv[1] in commands:
sys.exit(run_command(commands[sys.argv[1]], sys.argv[2:]))
sys.exit(print_usage())