Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
de06725
fix i210_replay and get_table_disk
brentgryffindor Jun 2, 2020
603b86c
add custom configuration setting tool
brentgryffindor Jun 2, 2020
3b15f8d
add configuration tool to i210 replay too
brentgryffindor Jun 2, 2020
9ca3199
Update README.md
liljonnystyle Jun 2, 2020
086e0c8
modify request for user input
liljonnystyle Jun 2, 2020
061b5f7
fix network_name_map
liljonnystyle Jun 2, 2020
5fc9a09
fix network_name_map
liljonnystyle Jun 2, 2020
21600ba
add percent_improvement to leaderboard_chart_agg
liljonnystyle Jun 2, 2020
c95752d
add submitter_name and strategy to leaderboard
liljonnystyle Jun 2, 2020
8563138
fix time issues with inflows and metrics-by-time plots
liljonnystyle Jun 3, 2020
824c37d
zero out inflows/outflows time axis
liljonnystyle Jun 3, 2020
d6cc6da
add data collection process to visualizer_rllib, and to warmup period…
brentgryffindor Jun 4, 2020
1c13fa2
Merge branch 'datapipeline_dev_v2' of https://github.com/flow-project…
brentgryffindor Jun 4, 2020
cbc9f9c
Merge branch 'i210_dev' into datapipeline_dev_v2
liljonnystyle Jun 5, 2020
c0e474b
implement emission_to_csv_large to deal with memory insufficiency iss…
brentgryffindor Jun 10, 2020
a225720
fix some minor issue with I210 replay and add pipeline support to vis…
brentgryffindor Jun 10, 2020
c58474a
add data collection during the warm-up period
brentgryffindor Jun 10, 2020
19aa292
fix the edge filter issue
brentgryffindor Jun 10, 2020
e3b8af9
Merge branch 'datapipeline_dev_v2' of https://github.com/flow-project…
brentgryffindor Jun 10, 2020
1cc6f91
fix style
brentgryffindor Jun 10, 2020
904c94b
style fix
brentgryffindor Jun 10, 2020
f90610f
Update leaderboard_utils.py
brentgryffindor Jun 10, 2020
1210e6a
fix minor error with emission_to_csv_large
brentgryffindor Jun 10, 2020
7baa3c5
Merge branch 'datapipeline_dev_v2' of https://github.com/flow-project…
brentgryffindor Jun 10, 2020
0a1089b
fix minor error for query
brentgryffindor Jun 10, 2020
1a968e7
add comment for error catching, merge emission_to_csv_large to emissi…
brentgryffindor Jun 11, 2020
dfddec0
use etree with recover enabled instead of ElementTree
brentgryffindor Jun 11, 2020
3e87cd5
fix pandas SettingWithCopyWarning
brentgryffindor Jun 11, 2020
452a1e0
fix minor error in query
brentgryffindor Jun 11, 2020
263ca54
add is_baseline to metadata
brentgryffindor Jun 11, 2020
937ff08
update power demand models w/ prius/tacoma params
liljonnystyle Jun 11, 2020
e554f24
add update baseline feature
brentgryffindor Jun 11, 2020
b7c472b
Merge branch 'datapipeline_dev_v2' of https://github.com/flow-project…
brentgryffindor Jun 11, 2020
7bebec6
update leaderboard agg to omit old baseline
brentgryffindor Jun 11, 2020
b171a98
update EV energy model MVP
liljonnystyle Jun 17, 2020
1a47179
Merge branch 'i210_dev' into datapipeline_dev_v2
eugenevinitsky Jun 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ flow.ang
*.ang.old
*.sang

# local configuration file for data pipeline
**/data_pipeline_config
25 changes: 15 additions & 10 deletions flow/core/experiment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Contains an experiment class for running simulations."""
from flow.utils.registry import make_create_env
from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info
from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info, get_configuration
from flow.data_pipeline.leaderboard_utils import network_name_translate
from collections import defaultdict
from datetime import datetime, timezone
Expand Down Expand Up @@ -158,12 +158,18 @@ def rl_actions(*_):
cur_datetime = datetime.now(timezone.utc)
cur_date = cur_datetime.date().isoformat()
cur_time = cur_datetime.time().isoformat()
# collecting information for metadata table
metadata['source_id'].append(source_id)
metadata['submission_time'].append(cur_time)
metadata['network'].append(network_name_translate(self.env.network.name.split('_20')[0]))
metadata['is_baseline'].append(str(is_baseline))
name, strategy = get_configuration()
metadata['submitter_name'].append(name)
metadata['strategy'].append(strategy)

if convert_to_csv and self.env.simulator == "traci":
dir_path = self.env.sim_params.emission_path

dir_path = self.env.sim_params.emission_path
if not dir_path is None:
trajectory_table_path = os.path.join(dir_path, '{}.csv'.format(source_id))
metadata_table_path = os.path.join(dir_path, '{}_METADATA.csv'.format(source_id))
Expand All @@ -172,6 +178,8 @@ def rl_actions(*_):
ret = 0
vel = []
custom_vals = {key: [] for key in self.custom_callables.keys()}
run_id = "run_{}".format(i)
self.env.pipeline_params = (extra_info, source_id, run_id)
state = self.env.reset()
for j in range(num_steps):
t0 = time.time()
Expand All @@ -185,9 +193,7 @@ def rl_actions(*_):
ret += reward

# collect additional information for the data pipeline
get_extra_info(self.env.k.vehicle, extra_info, veh_ids)
extra_info["source_id"].extend([source_id] * len(veh_ids))
extra_info["run_id"].extend(['run_{}'.format(i)] * len(veh_ids))
get_extra_info(self.env.k.vehicle, extra_info, veh_ids, source_id, run_id)

# write to disk every 100 steps
if convert_to_csv and self.env.simulator == "traci" and j % 100 == 0 and not dir_path is None:
Expand Down Expand Up @@ -230,8 +236,7 @@ def rl_actions(*_):
emission_path = os.path.join(dir_path, emission_filename)

# convert the emission file into a csv
# FIXME(@Brent): produce seg fault with large CSV
# emission_to_csv(emission_path)
emission_to_csv(emission_path)

# Delete the .xml version of the emission file.
os.remove(emission_path)
Expand All @@ -241,12 +246,12 @@ def rl_actions(*_):

if to_aws:
upload_to_s3('circles.data.pipeline',
'metadata_table/date={0}/partition_name={1}_METADATA/'
'{1}_METADATA.csv'.format(cur_date, source_id),
'metadata_table/date={0}/partition_name={1}_METADATA/{1}_METADATA.csv'.format(cur_date,
source_id),
metadata_table_path)
upload_to_s3('circles.data.pipeline',
'fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv'.format(cur_date, source_id),
trajectory_table_path,
{'network': metadata['network'][0]})
{'network': metadata['network'][0], 'is_baseline': metadata['is_baseline'][0]})

return info_dict
67 changes: 32 additions & 35 deletions flow/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,42 +47,39 @@ def emission_to_csv(emission_path, output_path=None):
path to the csv file that will be generated, default is the same
directory as the emission file, with the same name
"""
parser = etree.XMLParser(recover=True)
tree = ElementTree.parse(emission_path, parser=parser)
root = tree.getroot()

# parse the xml data into a dict
context = etree.iterparse(emission_path, recover=True)
out_data = []
for time in root.findall('timestep'):
t = float(time.attrib['time'])

for car in time:
out_data.append(dict())
try:
out_data[-1]['time'] = t
out_data[-1]['CO'] = float(car.attrib['CO'])
out_data[-1]['y'] = float(car.attrib['y'])
out_data[-1]['CO2'] = float(car.attrib['CO2'])
out_data[-1]['electricity'] = float(car.attrib['electricity'])
out_data[-1]['type'] = car.attrib['type']
out_data[-1]['id'] = car.attrib['id']
out_data[-1]['eclass'] = car.attrib['eclass']
out_data[-1]['waiting'] = float(car.attrib['waiting'])
out_data[-1]['NOx'] = float(car.attrib['NOx'])
out_data[-1]['fuel'] = float(car.attrib['fuel'])
out_data[-1]['HC'] = float(car.attrib['HC'])
out_data[-1]['x'] = float(car.attrib['x'])
out_data[-1]['route'] = car.attrib['route']
out_data[-1]['relative_position'] = float(car.attrib['pos'])
out_data[-1]['noise'] = float(car.attrib['noise'])
out_data[-1]['angle'] = float(car.attrib['angle'])
out_data[-1]['PMx'] = float(car.attrib['PMx'])
out_data[-1]['speed'] = float(car.attrib['speed'])
out_data[-1]['edge_id'] = car.attrib['lane'].rpartition('_')[0]
out_data[-1]['lane_number'] = car.attrib['lane'].\
rpartition('_')[-1]
except KeyError:
del out_data[-1]
for event, elem in context:
if elem.tag == "timestep":
t = float(elem.attrib['time'])
for car in elem:
out_data.append(dict())
try:
out_data[-1]['time'] = t
out_data[-1]['CO'] = float(car.attrib['CO'])
out_data[-1]['y'] = float(car.attrib['y'])
out_data[-1]['CO2'] = float(car.attrib['CO2'])
out_data[-1]['electricity'] = float(car.attrib['electricity'])
out_data[-1]['type'] = car.attrib['type']
out_data[-1]['id'] = car.attrib['id']
out_data[-1]['eclass'] = car.attrib['eclass']
out_data[-1]['waiting'] = float(car.attrib['waiting'])
out_data[-1]['NOx'] = float(car.attrib['NOx'])
out_data[-1]['fuel'] = float(car.attrib['fuel'])
out_data[-1]['HC'] = float(car.attrib['HC'])
out_data[-1]['x'] = float(car.attrib['x'])
out_data[-1]['route'] = car.attrib['route']
out_data[-1]['relative_position'] = float(car.attrib['pos'])
out_data[-1]['noise'] = float(car.attrib['noise'])
out_data[-1]['angle'] = float(car.attrib['angle'])
out_data[-1]['PMx'] = float(car.attrib['PMx'])
out_data[-1]['speed'] = float(car.attrib['speed'])
out_data[-1]['edge_id'] = car.attrib['lane'].rpartition('_')[0]
out_data[-1]['lane_number'] = car.attrib['lane']. \
rpartition('_')[-1]
except KeyError:
del out_data[-1]
elem.clear()

# sort the elements of the dictionary by the vehicle id
out_data = sorted(out_data, key=lambda k: k['id'])
Expand Down
12 changes: 12 additions & 0 deletions flow/data_pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
To run a simulation with output stored locally only:

`python simulate.py EXP_CONFIG --gen_emission`

To run a simulation and upload output to pipeline:

`python simulate.py EXP_CONFIG --to_aws`

To run a simulation, upload output to pipeline, and mark it as baseline:

`python simulate.py EXP_CONFIG --to_aws --is_baseline`

45 changes: 44 additions & 1 deletion flow/data_pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from flow.data_pipeline.query import QueryStrings
from time import time
from datetime import date
import csv
from io import StringIO


def generate_trajectory_table(data_path, extra_info, partition_name):
Expand Down Expand Up @@ -77,7 +79,7 @@ def upload_to_s3(bucket_name, bucket_key, file_path, metadata={}):
return


def get_extra_info(veh_kernel, extra_info, veh_ids):
def get_extra_info(veh_kernel, extra_info, veh_ids, source_id, run_id):
"""Get all the necessary information for the trajectory output from flow."""
for vid in veh_ids:
extra_info["time_step"].append(veh_kernel.get_timestep(vid) / 1000)
Expand All @@ -103,6 +105,32 @@ def get_extra_info(veh_kernel, extra_info, veh_ids):
extra_info["edge_id"].append(veh_kernel.get_edge(vid))
extra_info["lane_id"].append(veh_kernel.get_lane(vid))
extra_info["distance"].append(veh_kernel.get_distance(vid))
extra_info["relative_position"].append(veh_kernel.get_position(vid))
extra_info["source_id"].append(source_id)
extra_info["run_id"].append(run_id)


def get_configuration():
"""Get configuration for the metadata table."""
try:
config_df = pd.read_csv('./data_pipeline_config')
except FileNotFoundError:
config_df = pd.DataFrame(data={"submitter_name": [""], "strategy": [""]})

if not config_df['submitter_name'][0]:
name = input("Please enter your name:").strip()
while not name:
name = input("Please enter a non-empty name:").strip()
config_df['submitter_name'] = [name]

strategy = input(
"Please enter strategy name (current: \"{}\"):".format(config_df["strategy"][0])).strip()
if strategy:
config_df['strategy'] = [strategy]

config_df.to_csv('./data_pipeline_config', index=False)

return config_df['submitter_name'][0], config_df['strategy'][0]


def delete_obsolete_data(s3, latest_key, table, bucket="circles.data.pipeline"):
Expand All @@ -114,6 +142,21 @@ def delete_obsolete_data(s3, latest_key, table, bucket="circles.data.pipeline"):
s3.delete_object(Bucket=bucket, Key=key)


def update_baseline(s3, baseline_network, baseline_source_id):
obj = s3.get_object(Bucket='circles.data.pipeline', Key='baseline_table/baselines.csv')['Body']
original_str = obj.read().decode()
reader = csv.DictReader(StringIO(original_str))
new_str = StringIO()
writer = csv.DictWriter(new_str, fieldnames=['network', 'source_id'])
writer.writeheader()
writer.writerow({'network': baseline_network, 'source_id': baseline_source_id})
for row in reader:
if row['network'] != baseline_network:
writer.writerow(row)
s3.put_object(Bucket='circles.data.pipeline', Key='baseline_table/baselines.csv',
Body=new_str.getvalue().replace('\r', '').encode())


class AthenaQuery:
"""Class used to run queries.

Expand Down
4 changes: 3 additions & 1 deletion flow/data_pipeline/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""lambda function on AWS Lambda."""
import boto3
from urllib.parse import unquote_plus
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline
from flow.data_pipeline.query import tags, tables, network_using_edge
from flow.data_pipeline.query import X_FILTER, EDGE_FILTER, WARMUP_STEPS, HORIZON_STEPS

Expand Down Expand Up @@ -48,6 +48,8 @@ def lambda_handler(event, context):
if 'network' in response["Metadata"]:
if response["Metadata"]['network'] in network_using_edge:
loc_filter = EDGE_FILTER
if 'is_baseline' in response['Metadata'] and response['Metadata']['is_baseline'] == 'True':
update_baseline(s3, response["Metadata"]['network'], source_id)

query_dict = tags[table]

Expand Down
7 changes: 5 additions & 2 deletions flow/data_pipeline/leaderboard_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from io import StringIO


network_name_map = {"highway-single": "Single-Lane Straight Road",
network_name_map = {"highway": "Single-Lane Straight Road",
"highway_single": "Single-Lane Straight Road",
"ring": "Single-Lane Ring Road",
"I-210_subnetwork": "I-210 without Ramps",
"I_210_subnetwork": "I-210 without Ramps"}
Expand Down Expand Up @@ -64,10 +65,12 @@ def get_table_disk(table_name="fact_vehicle_trace", bucket="circles.data.pipelin
keys = [e["Key"] for e in response["Contents"] if e["Key"].find(table_name) == 0 and e["Key"][-4:] == ".csv"]
names = [key_to_name(k) for k in keys]
existing_results = os.listdir("./result/{}".format(table_name))
updated = False
for index in range(len(keys)):
if names[index] not in existing_results:
updated = True
s3.download_file(bucket, keys[index], "./result/{}/{}".format(table_name, names[index]))
if table_name == "leaderboard_chart_agg":
if table_name == "leaderboard_chart_agg" and updated:
for p in existing_results:
os.remove("./result/{}/{}".format(table_name, p))

Expand Down
Loading