-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathescape_eschaton.py
166 lines (130 loc) · 5.36 KB
/
escape_eschaton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
"""
Main executable for the finding the optimal escape path
"""
import os
import logging
import argparse
import shutil
import json
from lib import utils
CURVERSION = "0.1"
def usage():
buf = '''
Computes an optimal route to escape Eschaton by flying through
the gaps in the asteroids and exiting the asteroid belt.
Run:
$ ./escape_eschaton.py <chart.json> [optional args]
or
$ python<3.x> escape_eschaton.py <chart.json> [optional args]
'''
return buf
def solve(args):
"""
State: (p,v,t)
p: position
v: velocity
t: time instance
State Stack stores a tuple with:
1. acceleration value at a state,
2. flag to mark if a state was complete
3. the state tuple
eg (1, False, (p,v,t))
Input:
args: input arguments supplied to the executable that contains
json file location that holds the state chart.
Returns:
Optimal Escape Path: List of int as acceleration values for
each time period.
"""
blast_time_step, asteroids = utils.load_json_chart(args.chart)
state_stack = []
course_sofar = []
exhausted_states = set([])
initial_state = (0,0,0)
state_stack.append((None, False, initial_state))
best_course = []
# perform an iterative DFS on the state space to find the optimal solution
while state_stack != []:
curr_a, exhausted, curr_state = state_stack.pop()
# store exhausted state to prune and backtrack on the search space later
if exhausted:
exhausted_states.add(curr_state)
course_sofar.pop()
continue
# backtrack if current state was already exhausted
if curr_state in exhausted_states:
continue
# backtrack if current solution is already longer than best solution so far
if best_course and len(course_sofar) >= len(best_course):
continue
course_sofar.append(curr_a)
# escape condition
if curr_state[0] >= len(asteroids):
# if current course is shorter than best course so far, make it best course
if not best_course or len(course_sofar) < len(best_course):
best_course = course_sofar.copy()
course_sofar.pop()
continue
# add state back to stack with exhaused flag as True. When it is popped again,
# we will know all states downstream from here were exhausted
state_stack.append((curr_a, True, curr_state))
# add all valid downstream states from here (continue DFS)
for next_a, next_state in utils.get_nxt_states(curr_state, blast_time_step, asteroids):
state_stack.append((next_a, False, next_state))
return best_course[1:]
def setup(args):
"""
Staging area that sets out output folder and logging infoormation
and then class the 'solve' routine
Input: Object with Input arguements
Returns: None
"""
output_folder = args.output_folder
if os.path.exists(output_folder) and args.xforce:
shutil.rmtree(output_folder)
os.makedirs(output_folder)
elif not os.path.exists(output_folder):
os.makedirs(output_folder)
else:
print("Output Path already exists.")
print("Please use the -x option to overwrite in the existing path")
exit(0)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
filelog_path = os.path.join(output_folder, args.log)
filelog = logging.FileHandler(filelog_path, mode="w")
filelog.setFormatter(formatter)
logger.addHandler(filelog)
logger.info('Logging enabled')
logger.info('Created Output Folder: {}'.format(args.output_folder))
logger.info('Evaluating Optimal Path')
res = solve(args)
logger.info('Optimal path:' )
logger.info(res)
output_course = os.path.join(output_folder, "course.json")
with open(output_course, 'w') as outfile:
json.dump(res, outfile)
logger.info("Successfully written optimal path to output course.json")
def main(argstr=None):
parser = argparse.ArgumentParser(description='Input Parameters', usage=usage())
parser.add_argument('chart', help="json file containing state chart for blast velocity and asteroids around Eschaton")
parser.add_argument("-o", "--output-folder", dest="output_folder", default='escape_result',
help="Output folder for results json and logs")
parser.add_argument("-x", "--xforce", dest="xforce", action="store_true", \
help="force overwrite the output, if the folder already exists")
parser.add_argument("-l", "--log", dest="log", default="escape_eschaton.log", nargs="?", \
help="log to file (Default: escape_eschaton.log)")
parser.add_argument("-v", "--version", action="version", version="Escapeing Eschaton Current version {0}".format(CURVERSION))
if argstr:
args = parser.parse_args(argstr)
else:
args = parser.parse_args()
return setup(args)
if __name__ == '__main__':
main()