-
Notifications
You must be signed in to change notification settings - Fork 0
/
sumoenv.py
executable file
·95 lines (76 loc) · 3.05 KB
/
sumoenv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import numpy as np
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
class SumoEnv:
place_len = 7.5
place_offset = 8.50
lane_len = 10
lane_ids = ['-gneE0_0','-gneE0_1','-gneE0_2','-gneE1_0','-gneE1_1','-gneE1_2','-gneE2_0','-gneE2_1','-gneE2_2','-gneE3_0','-gneE3_1','-gneE3_2']
def __init__(self, label='default', gui_f=False):
self.label = label
self.wt_last = 0.
self.ncars = 0
#exe = 'sumo-gui.exe' if gui_f else 'sumo.exe' #WINDOWS
exe = 'sumo-gui' if gui_f else 'sumo' #LUNUX
sumoBinary = os.path.join(os.environ['SUMO_HOME'], 'bin', exe)
#sumoBinary = checkBinary('sumo')
self.sumoCmd = [sumoBinary, '-c', 'intersection.sumocfg']
return
def get_state_d(self):
state = np.zeros(self.lane_len * 12 + 4, dtype=np.float32)
for ilane in range(0, 12):
lane_id = self.lane_ids[ilane]
ncars = traci.lane.getLastStepVehicleNumber(lane_id)
cars = traci.lane.getLastStepVehicleIDs(lane_id)
for icar in cars:
xcar, ycar = traci.vehicle.getPosition(icar)
if ilane < 3:
pos = (ycar - self.place_offset) / self.place_len
elif ilane < 6:
pos = (xcar - self.place_offset) / self.place_len
elif ilane < 9:
pos = (-ycar - self.place_offset) / self.place_len
else:
pos = (-xcar - self.place_offset) / self.place_len
if pos > self.lane_len - 1.:
continue
pos = np.clip(pos, 0., self.lane_len - 1. - 1e-6)
ipos = int(pos)
state[int(ilane * self.lane_len + ipos)] += 1. - pos + ipos
state[int(ilane * self.lane_len + ipos + 1)] += pos - ipos
state[self.lane_len * 12:self.lane_len * 12+4] = np.eye(4)[traci.trafficlight.getPhase('gneJ00')]
return state
def step_d(self, action):
done = False
# traci.switch(self.label)
action = np.squeeze(action)
traci.trafficlight.setPhase('gneJ00', action)
traci.simulationStep()
traci.simulationStep()
self.ncars += traci.simulation.getDepartedNumber()
state = self.get_state_d()
wt = 0
for ilane in range(0, 12):
lane_id = self.lane_ids[ilane]
wt += traci.lane.getWaitingTime(lane_id)
reward = - (wt - self.wt_last)*0.004
if self.ncars > 250:
done = True
return state, reward, done, np.array([[reward]])
def reset(self):
self.wt_last = 0.
self.ncars = 0
traci.start(self.sumoCmd, label=self.label)
traci.trafficlight.setProgram('gneJ00', '0')
traci.simulationStep()
return self.get_state_d()
def close(self):
traci.close()