diff --git a/.gitignore b/.gitignore index 29d788c27..6d9ff7a71 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,5 @@ flow.ang *.ang.old *.sang +# local configuration file for data pipeline +**/data_pipeline_config diff --git a/flow/core/experiment.py b/flow/core/experiment.py index 7f5352c25..ceb8c7b61 100755 --- a/flow/core/experiment.py +++ b/flow/core/experiment.py @@ -1,6 +1,6 @@ """Contains an experiment class for running simulations.""" from flow.utils.registry import make_create_env -from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info +from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info, get_configuration from flow.data_pipeline.leaderboard_utils import network_name_translate from collections import defaultdict from datetime import datetime, timezone @@ -158,12 +158,18 @@ def rl_actions(*_): cur_datetime = datetime.now(timezone.utc) cur_date = cur_datetime.date().isoformat() cur_time = cur_datetime.time().isoformat() + # collecting information for metadata table metadata['source_id'].append(source_id) metadata['submission_time'].append(cur_time) metadata['network'].append(network_name_translate(self.env.network.name.split('_20')[0])) metadata['is_baseline'].append(str(is_baseline)) + name, strategy = get_configuration() + metadata['submitter_name'].append(name) + metadata['strategy'].append(strategy) + + if convert_to_csv and self.env.simulator == "traci": + dir_path = self.env.sim_params.emission_path - dir_path = self.env.sim_params.emission_path if not dir_path is None: trajectory_table_path = os.path.join(dir_path, '{}.csv'.format(source_id)) metadata_table_path = os.path.join(dir_path, '{}_METADATA.csv'.format(source_id)) @@ -172,6 +178,8 @@ def rl_actions(*_): ret = 0 vel = [] custom_vals = {key: [] for key in self.custom_callables.keys()} + run_id = "run_{}".format(i) + self.env.pipeline_params = (extra_info, source_id, run_id) state = self.env.reset() for j in range(num_steps): t0 = time.time() @@ -185,9 +193,7 @@ def rl_actions(*_): ret += reward # collect additional information for the data pipeline - get_extra_info(self.env.k.vehicle, extra_info, veh_ids) - extra_info["source_id"].extend([source_id] * len(veh_ids)) - extra_info["run_id"].extend(['run_{}'.format(i)] * len(veh_ids)) + get_extra_info(self.env.k.vehicle, extra_info, veh_ids, source_id, run_id) # write to disk every 100 steps if convert_to_csv and self.env.simulator == "traci" and j % 100 == 0 and not dir_path is None: @@ -230,8 +236,7 @@ def rl_actions(*_): emission_path = os.path.join(dir_path, emission_filename) # convert the emission file into a csv - # FIXME(@Brent): produce seg fault with large CSV - # emission_to_csv(emission_path) + emission_to_csv(emission_path) # Delete the .xml version of the emission file. os.remove(emission_path) @@ -241,12 +246,12 @@ def rl_actions(*_): if to_aws: upload_to_s3('circles.data.pipeline', - 'metadata_table/date={0}/partition_name={1}_METADATA/' - '{1}_METADATA.csv'.format(cur_date, source_id), + 'metadata_table/date={0}/partition_name={1}_METADATA/{1}_METADATA.csv'.format(cur_date, + source_id), metadata_table_path) upload_to_s3('circles.data.pipeline', 'fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv'.format(cur_date, source_id), trajectory_table_path, - {'network': metadata['network'][0]}) + {'network': metadata['network'][0], 'is_baseline': metadata['is_baseline'][0]}) return info_dict diff --git a/flow/core/util.py b/flow/core/util.py index 1821a76a5..cd269e6af 100755 --- a/flow/core/util.py +++ b/flow/core/util.py @@ -47,42 +47,39 @@ def emission_to_csv(emission_path, output_path=None): path to the csv file that will be generated, default is the same directory as the emission file, with the same name """ - parser = etree.XMLParser(recover=True) - tree = ElementTree.parse(emission_path, parser=parser) - root = tree.getroot() - - # parse the xml data into a dict + context = etree.iterparse(emission_path, recover=True) out_data = [] - for time in root.findall('timestep'): - t = float(time.attrib['time']) - - for car in time: - out_data.append(dict()) - try: - out_data[-1]['time'] = t - out_data[-1]['CO'] = float(car.attrib['CO']) - out_data[-1]['y'] = float(car.attrib['y']) - out_data[-1]['CO2'] = float(car.attrib['CO2']) - out_data[-1]['electricity'] = float(car.attrib['electricity']) - out_data[-1]['type'] = car.attrib['type'] - out_data[-1]['id'] = car.attrib['id'] - out_data[-1]['eclass'] = car.attrib['eclass'] - out_data[-1]['waiting'] = float(car.attrib['waiting']) - out_data[-1]['NOx'] = float(car.attrib['NOx']) - out_data[-1]['fuel'] = float(car.attrib['fuel']) - out_data[-1]['HC'] = float(car.attrib['HC']) - out_data[-1]['x'] = float(car.attrib['x']) - out_data[-1]['route'] = car.attrib['route'] - out_data[-1]['relative_position'] = float(car.attrib['pos']) - out_data[-1]['noise'] = float(car.attrib['noise']) - out_data[-1]['angle'] = float(car.attrib['angle']) - out_data[-1]['PMx'] = float(car.attrib['PMx']) - out_data[-1]['speed'] = float(car.attrib['speed']) - out_data[-1]['edge_id'] = car.attrib['lane'].rpartition('_')[0] - out_data[-1]['lane_number'] = car.attrib['lane'].\ - rpartition('_')[-1] - except KeyError: - del out_data[-1] + for event, elem in context: + if elem.tag == "timestep": + t = float(elem.attrib['time']) + for car in elem: + out_data.append(dict()) + try: + out_data[-1]['time'] = t + out_data[-1]['CO'] = float(car.attrib['CO']) + out_data[-1]['y'] = float(car.attrib['y']) + out_data[-1]['CO2'] = float(car.attrib['CO2']) + out_data[-1]['electricity'] = float(car.attrib['electricity']) + out_data[-1]['type'] = car.attrib['type'] + out_data[-1]['id'] = car.attrib['id'] + out_data[-1]['eclass'] = car.attrib['eclass'] + out_data[-1]['waiting'] = float(car.attrib['waiting']) + out_data[-1]['NOx'] = float(car.attrib['NOx']) + out_data[-1]['fuel'] = float(car.attrib['fuel']) + out_data[-1]['HC'] = float(car.attrib['HC']) + out_data[-1]['x'] = float(car.attrib['x']) + out_data[-1]['route'] = car.attrib['route'] + out_data[-1]['relative_position'] = float(car.attrib['pos']) + out_data[-1]['noise'] = float(car.attrib['noise']) + out_data[-1]['angle'] = float(car.attrib['angle']) + out_data[-1]['PMx'] = float(car.attrib['PMx']) + out_data[-1]['speed'] = float(car.attrib['speed']) + out_data[-1]['edge_id'] = car.attrib['lane'].rpartition('_')[0] + out_data[-1]['lane_number'] = car.attrib['lane']. \ + rpartition('_')[-1] + except KeyError: + del out_data[-1] + elem.clear() # sort the elements of the dictionary by the vehicle id out_data = sorted(out_data, key=lambda k: k['id']) diff --git a/flow/data_pipeline/README.md b/flow/data_pipeline/README.md new file mode 100644 index 000000000..65aeb8d49 --- /dev/null +++ b/flow/data_pipeline/README.md @@ -0,0 +1,12 @@ +To run a simulation with output stored locally only: + + `python simulate.py EXP_CONFIG --gen_emission` + +To run a simulation and upload output to pipeline: + + `python simulate.py EXP_CONFIG --to_aws` + +To run a simulation, upload output to pipeline, and mark it as baseline: + + `python simulate.py EXP_CONFIG --to_aws --is_baseline` + diff --git a/flow/data_pipeline/data_pipeline.py b/flow/data_pipeline/data_pipeline.py index 50c2c8422..74070cc7a 100644 --- a/flow/data_pipeline/data_pipeline.py +++ b/flow/data_pipeline/data_pipeline.py @@ -4,6 +4,8 @@ from flow.data_pipeline.query import QueryStrings from time import time from datetime import date +import csv +from io import StringIO def generate_trajectory_table(data_path, extra_info, partition_name): @@ -77,7 +79,7 @@ def upload_to_s3(bucket_name, bucket_key, file_path, metadata={}): return -def get_extra_info(veh_kernel, extra_info, veh_ids): +def get_extra_info(veh_kernel, extra_info, veh_ids, source_id, run_id): """Get all the necessary information for the trajectory output from flow.""" for vid in veh_ids: extra_info["time_step"].append(veh_kernel.get_timestep(vid) / 1000) @@ -103,6 +105,32 @@ def get_extra_info(veh_kernel, extra_info, veh_ids): extra_info["edge_id"].append(veh_kernel.get_edge(vid)) extra_info["lane_id"].append(veh_kernel.get_lane(vid)) extra_info["distance"].append(veh_kernel.get_distance(vid)) + extra_info["relative_position"].append(veh_kernel.get_position(vid)) + extra_info["source_id"].append(source_id) + extra_info["run_id"].append(run_id) + + +def get_configuration(): + """Get configuration for the metadata table.""" + try: + config_df = pd.read_csv('./data_pipeline_config') + except FileNotFoundError: + config_df = pd.DataFrame(data={"submitter_name": [""], "strategy": [""]}) + + if not config_df['submitter_name'][0]: + name = input("Please enter your name:").strip() + while not name: + name = input("Please enter a non-empty name:").strip() + config_df['submitter_name'] = [name] + + strategy = input( + "Please enter strategy name (current: \"{}\"):".format(config_df["strategy"][0])).strip() + if strategy: + config_df['strategy'] = [strategy] + + config_df.to_csv('./data_pipeline_config', index=False) + + return config_df['submitter_name'][0], config_df['strategy'][0] def delete_obsolete_data(s3, latest_key, table, bucket="circles.data.pipeline"): @@ -114,6 +142,21 @@ def delete_obsolete_data(s3, latest_key, table, bucket="circles.data.pipeline"): s3.delete_object(Bucket=bucket, Key=key) +def update_baseline(s3, baseline_network, baseline_source_id): + obj = s3.get_object(Bucket='circles.data.pipeline', Key='baseline_table/baselines.csv')['Body'] + original_str = obj.read().decode() + reader = csv.DictReader(StringIO(original_str)) + new_str = StringIO() + writer = csv.DictWriter(new_str, fieldnames=['network', 'source_id']) + writer.writeheader() + writer.writerow({'network': baseline_network, 'source_id': baseline_source_id}) + for row in reader: + if row['network'] != baseline_network: + writer.writerow(row) + s3.put_object(Bucket='circles.data.pipeline', Key='baseline_table/baselines.csv', + Body=new_str.getvalue().replace('\r', '').encode()) + + class AthenaQuery: """Class used to run queries. diff --git a/flow/data_pipeline/lambda_function.py b/flow/data_pipeline/lambda_function.py index 00cf0fba5..f7a32d5db 100644 --- a/flow/data_pipeline/lambda_function.py +++ b/flow/data_pipeline/lambda_function.py @@ -1,7 +1,7 @@ """lambda function on AWS Lambda.""" import boto3 from urllib.parse import unquote_plus -from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data +from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline from flow.data_pipeline.query import tags, tables, network_using_edge from flow.data_pipeline.query import X_FILTER, EDGE_FILTER, WARMUP_STEPS, HORIZON_STEPS @@ -48,6 +48,8 @@ def lambda_handler(event, context): if 'network' in response["Metadata"]: if response["Metadata"]['network'] in network_using_edge: loc_filter = EDGE_FILTER + if 'is_baseline' in response['Metadata'] and response['Metadata']['is_baseline'] == 'True': + update_baseline(s3, response["Metadata"]['network'], source_id) query_dict = tags[table] diff --git a/flow/data_pipeline/leaderboard_utils.py b/flow/data_pipeline/leaderboard_utils.py index afc2fd8bc..dd7055f8b 100644 --- a/flow/data_pipeline/leaderboard_utils.py +++ b/flow/data_pipeline/leaderboard_utils.py @@ -5,7 +5,8 @@ from io import StringIO -network_name_map = {"highway-single": "Single-Lane Straight Road", +network_name_map = {"highway": "Single-Lane Straight Road", + "highway_single": "Single-Lane Straight Road", "ring": "Single-Lane Ring Road", "I-210_subnetwork": "I-210 without Ramps", "I_210_subnetwork": "I-210 without Ramps"} @@ -64,10 +65,12 @@ def get_table_disk(table_name="fact_vehicle_trace", bucket="circles.data.pipelin keys = [e["Key"] for e in response["Contents"] if e["Key"].find(table_name) == 0 and e["Key"][-4:] == ".csv"] names = [key_to_name(k) for k in keys] existing_results = os.listdir("./result/{}".format(table_name)) + updated = False for index in range(len(keys)): if names[index] not in existing_results: + updated = True s3.download_file(bucket, keys[index], "./result/{}/{}".format(table_name, names[index])) - if table_name == "leaderboard_chart_agg": + if table_name == "leaderboard_chart_agg" and updated: for p in existing_results: os.remove("./result/{}/{}".format(table_name, p)) diff --git a/flow/data_pipeline/query.py b/flow/data_pipeline/query.py index 184c7217a..d538e7d62 100644 --- a/flow/data_pipeline/query.py +++ b/flow/data_pipeline/query.py @@ -65,26 +65,46 @@ X_FILTER = "x BETWEEN 500 AND 2300" -EDGE_FILTER = "edge_id <> ANY (VALUES 'ghost0', '119257908#3')" +EDGE_FILTER = "edge_id <> ALL (VALUES 'ghost0', '119257908#3')" WARMUP_STEPS = 600 * 3 * 0.4 HORIZON_STEPS = 1000 * 3 * 0.4 -VEHICLE_POWER_DEMAND_COMBUSTION_FINAL_SELECT = """ +VEHICLE_POWER_DEMAND_TACOMA_FINAL_SELECT = """ SELECT id, time_step, speed, acceleration, road_grade, - GREATEST(0, 1200 * speed * (( + GREATEST(0, 2041 * speed * (( CASE WHEN acceleration > 0 THEN 1 WHEN acceleration < 0 THEN 0 ELSE 0.5 - END * (1 - {0}) + {0}) * acceleration + 9.81 * SIN(road_grade) - ) + 1200 * 9.81 * 0.005 * speed + 0.5 * 1.225 * 2.6 * 0.3 * POW(speed,3)) AS power, + END * (1 - {0}) + {0}) * acceleration + 9.807 * SIN(road_grade) + ) + 2041 * 9.807 * 0.0027 * speed + 0.5 * 1.225 * 3.2 * 0.4 * POW(speed,3)) AS power, + \'{1}\' AS energy_model_id, + source_id + FROM {2} + ORDER BY id, time_step + """ + +VEHICLE_POWER_DEMAND_PRIUS_FINAL_SELECT = """ + SELECT + id, + time_step, + speed, + acceleration, + road_grade, + GREATEST(-2.8 * speed, 1663 * speed * (( + CASE + WHEN acceleration > 0 THEN 1 + WHEN acceleration < 0 THEN 0 + ELSE 0.5 + END * (1 - {0}) + {0}) * acceleration + 9.807 * SIN(road_grade) + ) + 1663 * 9.807 * 0.007 * speed + 0.5 * 1.225 * 2.4 * 0.24 * POW(speed,3)) AS power, \'{1}\' AS energy_model_id, source_id FROM {2} @@ -122,9 +142,9 @@ class QueryStrings(Enum): AND date = \'{{date}}\' AND partition_name=\'{{partition}}\' ) - {}""".format(VEHICLE_POWER_DEMAND_COMBUSTION_FINAL_SELECT.format(1, - 'POWER_DEMAND_MODEL', - 'regular_cte')) + {}""".format(VEHICLE_POWER_DEMAND_TACOMA_FINAL_SELECT.format(1, + 'POWER_DEMAND_MODEL', + 'regular_cte')) POWER_DEMAND_MODEL_DENOISED_ACCEL = """ WITH denoised_accel_cte AS ( @@ -142,9 +162,9 @@ class QueryStrings(Enum): AND date = \'{{date}}\' AND partition_name=\'{{partition}}\' ) - {}""".format(VEHICLE_POWER_DEMAND_COMBUSTION_FINAL_SELECT.format(1, - 'POWER_DEMAND_MODEL_DENOISED_ACCEL', - 'denoised_accel_cte')) + {}""".format(VEHICLE_POWER_DEMAND_TACOMA_FINAL_SELECT.format(1, + 'POWER_DEMAND_MODEL_DENOISED_ACCEL', + 'denoised_accel_cte')) POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL = """ WITH lagged_timestep AS ( @@ -175,9 +195,9 @@ class QueryStrings(Enum): source_id FROM lagged_timestep ) - {}""".format(VEHICLE_POWER_DEMAND_COMBUSTION_FINAL_SELECT.format(1, - 'POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL', - 'denoised_speed_cte')) + {}""".format(VEHICLE_POWER_DEMAND_TACOMA_FINAL_SELECT.format(1, + 'POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL', + 'denoised_speed_cte')) FACT_NETWORK_THROUGHPUT_AGG = """ WITH min_time AS ( @@ -305,6 +325,7 @@ class QueryStrings(Enum): FROM min_max_time_step WHERE 1 = 1 AND min_time_step >= {start_filter} + AND min_time_step < {stop_filter} GROUP BY 1, 2 ), outflows AS ( SELECT @@ -313,11 +334,14 @@ class QueryStrings(Enum): 60 * COUNT(DISTINCT id) AS outflow_rate FROM min_max_time_step WHERE 1 = 1 + AND max_time_step >= {start_filter} AND max_time_step < {stop_filter} GROUP BY 1, 2 ) SELECT - COALESCE(i.time_step, o.time_step) AS time_step, + COALESCE(i.time_step, o.time_step) - MIN(COALESCE(i.time_step, o.time_step)) + OVER (PARTITION BY COALESCE(i.source_id, o.source_id) + ORDER BY COALESCE(i.time_step, o.time_step) ASC) AS time_step, COALESCE(i.source_id, o.source_id) AS source_id, COALESCE(i.inflow_rate, 0) AS inflow_rate, COALESCE(o.outflow_rate, 0) AS outflow_rate @@ -434,7 +458,8 @@ class QueryStrings(Enum): SELECT vt.id, vt.source_id, - vt.time_step, + vt.time_step - FIRST_VALUE(vt.time_step) + OVER (PARTITION BY vt.id, vt.source_id ORDER BY vt.time_step ASC) AS time_step, energy_model_id, et.speed, et.acceleration, @@ -528,17 +553,44 @@ class QueryStrings(Enum): ;""" LEADERBOARD_CHART_AGG = """ + WITH agg AS ( + SELECT + l.date AS submission_date, + m.submission_time, + l.source_id, + m.submitter_name, + m.strategy, + m.network, + m.is_baseline, + l.energy_model_id, + l.efficiency_meters_per_joules, + l.efficiency_miles_per_gallon, + l.throughput_per_hour, + b.source_id AS baseline_source_id + FROM leaderboard_chart AS l, metadata_table AS m, baseline_table as b + WHERE 1 = 1 + AND l.source_id = m.source_id + AND m.network = b.network + AND (m.is_baseline='False' + OR (m.is_baseline='True' + AND m.source_id = b.source_id)) + ) SELECT - l.date AS submission_date, - l.source_id, - m.network, - m.is_baseline, - l.energy_model_id, - l.efficiency_meters_per_joules, - l.efficiency_miles_per_gallon, - l.throughput_per_hour - FROM leaderboard_chart AS l, metadata_table AS m - WHERE 1 = 1 - AND l.source_id = m.source_id - ORDER BY l.date, m.submission_time, l.source_id ASC + agg.submission_date, + agg.source_id, + agg.submitter_name, + agg.strategy, + agg.network, + agg.is_baseline, + agg.energy_model_id, + agg.efficiency_meters_per_joules, + agg.efficiency_miles_per_gallon, + 100 * (1 - baseline.efficiency_miles_per_gallon / agg.efficiency_miles_per_gallon) AS percent_improvement, + agg.throughput_per_hour + FROM agg + JOIN agg AS baseline ON 1 = 1 + AND agg.network = baseline.network + AND baseline.is_baseline = 'True' + AND agg.baseline_source_id = baseline.source_id + ORDER BY agg.submission_date, agg.submission_time ASC ;""" diff --git a/flow/envs/base.py b/flow/envs/base.py index 1e739faba..9dec30025 100644 --- a/flow/envs/base.py +++ b/flow/envs/base.py @@ -26,6 +26,8 @@ from flow.core.kernel import Kernel from flow.utils.exceptions import FatalFlowError +from flow.data_pipeline.data_pipeline import get_extra_info + class Env(gym.Env, metaclass=ABCMeta): """Base environment class. @@ -578,6 +580,14 @@ def reset(self): # perform (optional) warm-up steps before training for _ in range(self.env_params.warmup_steps): observation, _, _, _ = self.step(rl_actions=None) + # collect data for pipeline during the warmup period + try: + extra_info, source_id, run_id = self.pipeline_params + veh_ids = self.k.vehicle.get_ids() + get_extra_info(self.k.vehicle, extra_info, veh_ids, source_id, run_id) + # In case the attribute `pipeline_params` if not added to this instance + except AttributeError as e: + pass # render a frame self.render(reset=True) diff --git a/flow/visualize/i210_replay.py b/flow/visualize/i210_replay.py index b2e22d5b3..fb6792c11 100644 --- a/flow/visualize/i210_replay.py +++ b/flow/visualize/i210_replay.py @@ -32,7 +32,7 @@ from examples.exp_configs.rl.multiagent.multiagent_i210 import flow_params as I210_MA_DEFAULT_FLOW_PARAMS from examples.exp_configs.rl.multiagent.multiagent_i210 import custom_callables -from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info +from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info, get_configuration from flow.data_pipeline.leaderboard_utils import network_name_translate import uuid @@ -221,6 +221,9 @@ def replay(args, flow_params, output_dir=None, transfer_test=None, rllib_config= metadata['submission_time'].append(cur_time) metadata['network'].append(network_name_translate(env.network.name.split('_20')[0])) metadata['is_baseline'].append(str(args.is_baseline)) + name, strategy = get_configuration() + metadata['submitter_name'].append(name) + metadata['strategy'].append(strategy) i = 0 while i < args.num_rollouts: @@ -231,6 +234,8 @@ def replay(args, flow_params, output_dir=None, transfer_test=None, rllib_config= completed_vehicle_avg_energy = {} completed_vehicle_travel_time = {} custom_vals = {key: [] for key in custom_callables.keys()} + run_id = "run_{}".format(i) + env.pipeline_params = (extra_info, source_id, run_id) state = env.reset() initial_vehicles = set(env.k.vehicle.get_ids()) for _ in range(env_params.horizon): @@ -260,10 +265,8 @@ def replay(args, flow_params, output_dir=None, transfer_test=None, rllib_config= veh_ids = env.k.vehicle.get_ids() vel.append(np.mean(env.k.vehicle.get_speed(veh_ids))) - # Collect information from flow for the trajectory output - get_extra_info(env.k.vehicle, extra_info, veh_ids) - extra_info["source_id"].extend([source_id] * len(veh_ids)) - extra_info["run_id"].extend(['run_{}'.format(i)] * len(veh_ids)) + # collect additional information for the data pipeline + get_extra_info(env.k.vehicle, extra_info, veh_ids, source_id, run_id) # Compute the results for the custom callables. for (key, lambda_func) in custom_callables.items(): diff --git a/flow/visualize/visualizer_rllib.py b/flow/visualize/visualizer_rllib.py index 0ab658f75..059cabbbd 100644 --- a/flow/visualize/visualizer_rllib.py +++ b/flow/visualize/visualizer_rllib.py @@ -33,6 +33,11 @@ from flow.utils.rllib import get_rllib_config from flow.utils.rllib import get_rllib_pkl +from flow.data_pipeline.data_pipeline import write_dict_to_csv, upload_to_s3, get_extra_info, get_configuration +from flow.data_pipeline.leaderboard_utils import network_name_translate +from collections import defaultdict +from datetime import datetime, timezone +import uuid EXAMPLE_USAGE = """ example usage: @@ -207,6 +212,23 @@ def visualizer_rllib(args): if not sim_params.restart_instance: env.restart_simulation(sim_params=sim_params, render=sim_params.render) + # data pipeline + extra_info = defaultdict(lambda: []) + source_id = 'flow_{}'.format(uuid.uuid4().hex) + metadata = defaultdict(lambda: []) + # collect current time + cur_datetime = datetime.now(timezone.utc) + cur_date = cur_datetime.date().isoformat() + cur_time = cur_datetime.time().isoformat() + # collecting information for metadata table + metadata['source_id'].append(source_id) + metadata['submission_time'].append(cur_time) + metadata['network'].append(network_name_translate(env.network.name.split('_20')[0])) + metadata['is_baseline'].append(str(args.is_baseline)) + name, strategy = get_configuration() + metadata['submitter_name'].append(name) + metadata['strategy'].append(strategy) + # Simulate and collect metrics final_outflows = [] final_inflows = [] @@ -216,6 +238,8 @@ def visualizer_rllib(args): std_speed = [] for i in range(args.num_rollouts): vel = [] + run_id = "run_{}".format(i) + env.pipeline_params = (extra_info, source_id, run_id) state = env.reset() if multiagent: ret = {key: [0] for key in rets.keys()} @@ -246,6 +270,10 @@ def visualizer_rllib(args): else: action = agent.compute_action(state) state, reward, done, _ = env.step(action) + + # collect data for data pipeline + get_extra_info(vehicles, extra_info, vehicles.get_ids(), source_id, run_id) + if multiagent: for actor, rew in reward.items(): ret[policy_map_fn(actor)][0] += rew @@ -341,6 +369,22 @@ def visualizer_rllib(args): # delete the .xml version of the emission file os.remove(emission_path) + # generate datapipeline output + trajectory_table_path = os.path.join(dir_path, '{}.csv'.format(source_id)) + metadata_table_path = os.path.join(dir_path, '{}_METADATA.csv'.format(source_id)) + write_dict_to_csv(trajectory_table_path, extra_info, True) + write_dict_to_csv(metadata_table_path, metadata, True) + + if args.to_aws: + upload_to_s3('circles.data.pipeline', + 'metadata_table/date={0}/partition_name={1}_METADATA/{1}_METADATA.csv'.format(cur_date, + source_id), + metadata_table_path) + upload_to_s3('circles.data.pipeline', + 'fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv'.format(cur_date, source_id), + trajectory_table_path, + {'network': metadata['network'][0]}) + def create_parser(): """Create the parser to capture CLI arguments.""" @@ -394,6 +438,18 @@ def create_parser(): '--horizon', type=int, help='Specifies the horizon.') + parser.add_argument( + '--is_baseline', + action='store_true', + help='specifies whether this is a baseline run' + ) + parser.add_argument( + '--to_aws', + type=str, nargs='?', default=None, const="default", + help='Specifies the name of the partition to store the output' + 'file on S3. Putting not None value for this argument' + 'automatically set gen_emission to True.' + ) return parser