forked from kejingjing88212/MaCA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinterface.py
241 lines (216 loc) · 10.4 KB
/
interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from pytransform import pyarmor_runtime
pyarmor_runtime()
"""
@author: Gao Fang
@contact: gaofang@cetc.com.cn
@software: PyCharm
@file: interface.py
@time: 2018/4/10 0010 8:52
@desc: environment interface
"""
import importlib
from world.em_battle import BattleField
from world.replay import Replay
from world.load_map import Map
import world.position_calc as position_calc
class Environment:
"""
Environment interface
"""
def __init__(self, map_path, side1_obs_ind, side2_obs_ind, max_step=5000, render=False, render_interval=1,
random_pos=False, log=False, random_seed=-1, external_render=False, side1_name='unknown',
side2_name='unknown'):
"""
Environment initiation
:param size_x: battlefield horizontal size. got from LoadMap.get_map_size
:param size_y: battlefield vertical size. got from LoadMap.get_map_size
:param side1_detector_list: side 1 detector configuration. got from LoadMap.get_unit_property_list
:param side1_fighter_list: side 1 fighter configuration. got from LoadMap.get_unit_property_list
:param side2_detector_list: side 2 detector configuration. got from LoadMap.get_unit_property_list
:param side2_fighter_list: side 2 fighter configuration. got from LoadMap.get_unit_property_list
:param max_step: max step,0:unlimited
:param render: display enable control, True: enable display, False: disable display
:param render_interval: display interval, skip how many steps to display a frame
:param random_pos: start location initial method. False: side 1 on right, side2 on left. True: random position on top, bottom, right and left)
:param log: log control,False:disable log,other value:the folder name of log.
:param random_seed: random digit,-1:generate a new one,other value:use an exist random digit value
"""
# load map
self.map = Map(map_path)
self.size_x, self.size_y = self.map.get_map_size()
self.side1_detector_num, self.side1_fighter_num, self.side2_detector_num, self.side2_fighter_num = self.map.get_unit_num()
# make env
self.side1_detector_list, self.side1_fighter_list, self.side2_detector_list, self.side2_fighter_list = self.map.get_unit_property_list()
self.env = BattleField(self.size_x, self.size_y, self.side1_detector_list, self.side1_fighter_list,
self.side2_detector_list, self.side2_fighter_list, max_step, render, render_interval,
random_pos, log, random_seed, external_render, side1_name=side1_name,
side2_name=side2_name)
# import obs construct class
if 'raw' == side1_obs_ind:
self.side1_obs_path = 'raw'
elif 'vector' == side1_obs_ind:
self.side1_obs_path = 'vector'
else:
self.side1_obs_path = 'obs_construct.' + side1_obs_ind + '.construct'
self.agent1_obs_module = importlib.import_module(self.side1_obs_path)
self.agent1_obs = self.agent1_obs_module.ObsConstruct(self.size_x, self.size_y, self.side1_detector_num,
self.side1_fighter_num)
if 'raw' == side2_obs_ind:
self.side2_obs_path = 'raw'
elif 'vector' == side2_obs_ind:
self.side2_obs_path = 'vector'
else:
self.side2_obs_path = 'obs_construct.' + side2_obs_ind + '.construct'
self.agent2_obs_module = importlib.import_module(self.side2_obs_path)
self.agent2_obs = self.agent2_obs_module.ObsConstruct(self.size_x, self.size_y, self.side2_detector_num,
self.side2_fighter_num)
def get_done(self):
"""
Get done
:return: done: True, False
"""
return self.env.done
def get_obs_vector(self):
side1_obs_data, side2_obs_data = self.env.get_obs_vector()
return side1_obs_data, side2_obs_data
def get_obs(self):
"""
Get image-based observation
:return: side1_obs
:return: side2_obs
"""
side1_obs_data = []
side2_obs_data = []
side1_obs_raw_dict, side2_obs_raw_dict = self.get_obs_raw()
if 'vector' == self.side1_obs_path or 'vector' == self.side2_obs_path:
side1_obs_data, side2_obs_data = self.get_obs_vector()
# side2_detector_data_obs_list, side2_fighter_data_obs_list, side2_joint_data_obs_dict = self.env.get_obs_raw()
if 'raw' == self.side1_obs_path:
side1_obs = side1_obs_raw_dict
elif 'vector' == self.side1_obs_path:
side1_obs = side1_obs_data
else:
side1_obs = self.agent1_obs.obs_construct(side1_obs_raw_dict)
if 'raw' == self.side2_obs_path:
side2_obs = side2_obs_raw_dict
elif 'vector' == self.side2_obs_path:
side2_obs = side2_obs_data
else:
side2_obs = self.agent2_obs.obs_construct(side2_obs_raw_dict)
return side1_obs, side2_obs
def get_obs_raw(self):
"""
Get raw data observation
:return: side1_detector_data
:return: side1_fighter_data
:return: side2_detector_data
:return: side2_fighter_data
detector obs:{'id':id, 'alive': alive status, 'pos_x': horizontal coordinate, 'pos_y': vertical coordinate, 'course': course, 'r_iswork': radar enable status, 'r_fre_point': radar frequency point, 'r_visible_list': radar visible enemy}
fighter obs:{'id':id, 'alive': alive status, 'pos_x': horizontal coordinate, 'pos_y': vertical coordinate, 'course': course, 'r_iswork': radar enable status, 'r_fre_point': radar frequency point, 'r_visible_list': radar visible enemy, 'j_iswork': jammer enable status, 'j_fre_point': jammer frequency point, 'j_recv_list': jammer received enemy, 'l_missile_left': long range missile left, 's_missile_left': short range missile left}
"""
side1_obs_dict = {}
side2_obs_dict = {}
side1_detector_data_obs_list, side1_fighter_data_obs_list, side1_joint_data_obs_dict, \
side2_detector_data_obs_list, side2_fighter_data_obs_list, side2_joint_data_obs_dict = self.env.get_obs_raw()
side1_obs_dict.update({'detector_obs_list': side1_detector_data_obs_list})
side1_obs_dict.update({'fighter_obs_list': side1_fighter_data_obs_list})
side1_obs_dict.update({'joint_obs_dict': side1_joint_data_obs_dict})
side2_obs_dict.update({'detector_obs_list': side2_detector_data_obs_list})
side2_obs_dict.update({'fighter_obs_list': side2_fighter_data_obs_list})
side2_obs_dict.update({'joint_obs_dict': side2_joint_data_obs_dict})
return side1_obs_dict, side2_obs_dict
def get_alive_status(self,side1_detector_obs_raw_list,side1_fighter_obs_raw_list,side2_detector_obs_raw_list,side2_fighter_obs_raw_list):
return self.env.get_alive_status(side1_detector_obs_raw_list,side1_fighter_obs_raw_list,side2_detector_obs_raw_list,side2_fighter_obs_raw_list)
def get_reward(self):
"""
get reward
:return:side1_detector:side1 detector reward,side1_fighter:side1 fighter reward,side1_round: side1 round reward, side2_detector:side2 detector reward,side2_fighter:side2 fighter reward,side2_round: side1 round reward
"""
return self.env.get_reward()
def reset(self):
"""
Reset environment
:return: none
"""
self.env.reset()
def step(self, side1_detector_action, side1_fighter_action, side2_detector_action, side2_fighter_action):
"""
Run a step
:param side1_detector_action: Numpy ndarray [detector_quantity, 2]
:param side1_fighter_action: Numpy ndarray [fighter_quantity, 4]
:param side2_detector_action: Numpy ndarray [detector_quantity, 2]
:param side2_fighter_action: Numpy ndarray [fighter_quantity, 4]
:return: True, run succeed, False, run Failed
"""
return self.env.step(side1_detector_action, side1_fighter_action, side2_detector_action, side2_fighter_action)
def get_map_size(self):
"""
Get map size
:return: size_x: horizontal size
:return: size_y: vertical size
"""
return self.map.get_map_size()
def get_unit_num(self):
"""
Get unit number
:return: side1_detector_num
:return: side1_fighter_num
:return: side2_detector_num
:return: side2_fighter_num
"""
return self.map.get_unit_num()
def get_unit_property_list(self):
"""
Get unit config information
:return: side1_detector_list, should be directly forward to Environment init interface
:return: side1_fighter_list, should be directly forward to Environment init interface
:return: side2_detector_list, should be directly forward to Environment init interface
:return: side2_fighter_list, should be directly forward to Environment init interface
"""
return self.map.get_unit_property_list()
def set_surrender(self, side):
'''
surrender
:param side: side1: 0, side2: 1
:return:
'''
return self.env.set_surrender(side)
class PlayBack:
"""
Replay
"""
def __init__(self, log_name, external_render=False, display_delay_time=0):
"""
Initial replay class
:param log_name:
:param display_delay_time:
"""
self.rp = Replay(log_name, external_render, display_delay_time)
def start(self):
"""
Replay begin
"""
self.rp.start()
# Utilities
def get_distance(a_x, a_y, b_x, b_y):
"""
Get distance between two coordinates
:param a_x: point a horizontal coordinate
:param a_y: point a vertical coordinate
:param b_x: point b horizontal coordinate
:param b_y: point b vertical coordinate
:return: distance value
"""
return position_calc.get_distance(a_x, a_y, b_x, b_y)
def angle_cal(o_x, o_y, e_x, e_y):
"""
Get a direction (angle) from a point to another point.
:param o_x: starting point horizontal coordinate
:param o_y: starting point vertical coordinate
:param e_x: end point horizontal coordinate
:param e_y: end point vertical coordinate
:return: angle value
"""
return position_calc.angle_cal(o_x, o_y, e_x, e_y)