diff --git a/flow/data_pipeline/data_pipeline.py b/flow/data_pipeline/data_pipeline.py index 858640914..626c59e39 100644 --- a/flow/data_pipeline/data_pipeline.py +++ b/flow/data_pipeline/data_pipeline.py @@ -1,11 +1,13 @@ """contains class and helper functions for the data pipeline.""" import pandas as pd import boto3 -from flow.data_pipeline.query import QueryStrings +from botocore.exceptions import ClientError +from flow.data_pipeline.query import QueryStrings, prerequisites from time import time from datetime import date import csv from io import StringIO +import json def generate_trajectory_table(data_path, extra_info, partition_name): @@ -158,6 +160,42 @@ def update_baseline(s3, baseline_network, baseline_source_id): Body=new_str.getvalue().replace('\r', '').encode()) +def get_completed_queries(s3, source_id): + """Return the deserialized list of completed queries from S3.""" + try: + completed_queries_obj = \ + s3.get_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id))['Body'] + completed_queries = json.loads(completed_queries_obj.read().decode('utf-8')) + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + completed_queries = set() + else: + raise + return set(completed_queries) + + +def put_completed_queries(s3, completed_queries): + """Put all the completed queries lists into S3 as in a serialized json format.""" + for source_id, completed_queries_set in completed_queries.items(): + completed_queries_list = list(completed_queries_set) + completed_queries_json = json.dumps(completed_queries_list) + s3.put_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id), + Body=completed_queries_json.encode('utf-8')) + + +def get_ready_queries(completed_queries, new_query): + """Return queries whose prerequisite queries are completed.""" + readied_queries = [] + unfinished_queries = prerequisites.keys() - completed_queries + upadted_completed_queries = completed_queries.copy() + upadted_completed_queries.add(new_query) + for query_name in unfinished_queries: + if not prerequisites[query_name][1].issubset(completed_queries): + if prerequisites[query_name][1].issubset(upadted_completed_queries): + readied_queries.append((query_name, prerequisites[query_name][0])) + return readied_queries + + class AthenaQuery: """Class used to run queries. diff --git a/flow/data_pipeline/lambda_function.py b/flow/data_pipeline/lambda_function.py index 97f625eab..1d813f98b 100644 --- a/flow/data_pipeline/lambda_function.py +++ b/flow/data_pipeline/lambda_function.py @@ -1,9 +1,9 @@ """lambda function on AWS Lambda.""" import boto3 from urllib.parse import unquote_plus -from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline -from flow.data_pipeline.query import tags, tables, network_using_edge, summary_tables -from flow.data_pipeline.query import X_FILTER, EDGE_FILTER, WARMUP_STEPS, HORIZON_STEPS +from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline, \ + get_ready_queries, get_completed_queries, put_completed_queries +from flow.data_pipeline.query import tables, network_filters, summary_tables, triggers s3 = boto3.client('s3') queryEngine = AthenaQuery() @@ -11,6 +11,8 @@ def lambda_handler(event, context): """Handle S3 put event on AWS Lambda.""" + # stores all lists of completed query for each source_id + completed = {} records = [] # do a pre-sweep to handle tasks other than initalizing a query for record in event['Records']: @@ -19,58 +21,55 @@ def lambda_handler(event, context): table = key.split('/')[0] if table not in tables: continue - # delete unwanted metadata files - if (key[-9:] == '.metadata'): - s3.delete_object(Bucket=bucket, Key=key) - continue - + s3.delete_object(Bucket=bucket, Key=(key + '.metadata')) # load the partition for newly added table query_date = key.split('/')[-3].split('=')[-1] partition = key.split('/')[-2].split('=')[-1] + source_id = "flow_{}".format(partition.split('_')[1]) + if table == "fact_vehicle_trace": + query_name = "FACT_VEHICLE_TRACE" + else: + query_name = partition.replace(source_id, "")[1:] queryEngine.repair_partition(table, query_date, partition) - # delete obsolete data if table in summary_tables: delete_obsolete_data(s3, key, table) - # add table that need to start a query to list - if table in tags.keys(): - records.append((bucket, key, table, query_date, partition)) + if query_name in triggers: + records.append((bucket, key, table, query_name, query_date, partition, source_id)) # initialize the queries - start_filter = WARMUP_STEPS - stop_filter = WARMUP_STEPS + HORIZON_STEPS - for bucket, key, table, query_date, partition in records: - source_id = "flow_{}".format(partition.split('_')[1]) + for bucket, key, table, query_name, query_date, partition, source_id in records: + # retrieve the set of completed query for this source_id if not already available + if source_id not in completed.keys(): + completed[source_id] = get_completed_queries(s3, source_id) + # if query already recorded before, skip it. This is to tolerate repetitive execution by Lambda + if query_name in completed[source_id]: + continue + # retrieve metadata and use it to determine the right loc_filter metadata_key = "fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv".format(query_date, source_id) response = s3.head_object(Bucket=bucket, Key=metadata_key) - loc_filter = X_FILTER if 'network' in response["Metadata"]: - if response["Metadata"]['network'] in network_using_edge: - loc_filter = EDGE_FILTER + network = response["Metadata"]['network'] + loc_filter = network_filters[network]['loc_filter'] + start_filter = network_filters[network]['warmup_steps'] + stop_filter = network_filters[network]['horizon_steps'] + + # update baseline if needed if table == 'fact_vehicle_trace' \ and 'is_baseline' in response['Metadata'] and response['Metadata']['is_baseline'] == 'True': - update_baseline(s3, response["Metadata"]['network'], source_id) - - query_dict = tags[table] - - # handle different energy models - if table == "fact_energy_trace": - energy_model_id = partition.replace(source_id, "")[1:] - query_dict = tags[energy_model_id] + update_baseline(s3, network, source_id) + readied_queries = get_ready_queries(completed[source_id], query_name) + completed[source_id].add(query_name) # initialize queries and store them at appropriate locations - for table_name, query_list in query_dict.items(): - for query_name in query_list: - result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name, - query_date, - source_id, - query_name) - queryEngine.run_query(query_name, - result_location, - query_date, - partition, - loc_filter=loc_filter, - start_filter=start_filter, - stop_filter=stop_filter) + for readied_query_name, table_name in readied_queries: + result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name, + query_date, + source_id, + readied_query_name) + queryEngine.run_query(readied_query_name, result_location, query_date, partition, loc_filter=loc_filter, + start_filter=start_filter, stop_filter=stop_filter) + # stores all the updated lists of completed queries back to S3 + put_completed_queries(s3, completed) diff --git a/flow/data_pipeline/query.py b/flow/data_pipeline/query.py index 302048632..adc472176 100644 --- a/flow/data_pipeline/query.py +++ b/flow/data_pipeline/query.py @@ -1,78 +1,90 @@ """stores all the pre-defined query strings.""" +from collections import defaultdict from enum import Enum # tags for different queries -tags = { - "fact_vehicle_trace": { - "fact_energy_trace": [ - "POWER_DEMAND_MODEL", - "POWER_DEMAND_MODEL_DENOISED_ACCEL", - "POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL" - ], - "fact_safety_metrics": [ - "FACT_SAFETY_METRICS" - ], - "fact_network_throughput_agg": [ - "FACT_NETWORK_THROUGHPUT_AGG" - ], - "fact_network_inflows_outflows": [ - "FACT_NETWORK_INFLOWS_OUTFLOWS" - ], - "fact_vehicle_counts_by_time": [ - "FACT_VEHICLE_COUNTS_BY_TIME" - ] - }, - "fact_energy_trace": {}, - "fact_vehicle_counts_by_time": {}, - "fact_safety_metrics": { - "fact_safety_metrics_agg": [ - "FACT_SAFETY_METRICS_AGG" - ] - }, - "POWER_DEMAND_MODEL_DENOISED_ACCEL": { - "fact_vehicle_fuel_efficiency_agg": [ - "FACT_VEHICLE_FUEL_EFFICIENCY_AGG" - ], - "fact_network_metrics_by_distance_agg": [ - "FACT_NETWORK_METRICS_BY_DISTANCE_AGG" - ], - "fact_network_metrics_by_time_agg": [ - "FACT_NETWORK_METRICS_BY_TIME_AGG" - ] - }, - "POWER_DEMAND_MODEL": {}, - "POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL": {}, - "fact_vehicle_fuel_efficiency_agg": { - "fact_network_fuel_efficiency_agg": [ - "FACT_NETWORK_FUEL_EFFICIENCY_AGG" - ] - }, - "fact_network_fuel_efficiency_agg": { - "leaderboard_chart": [ - "LEADERBOARD_CHART" - ] - }, - "leaderboard_chart": { - "leaderboard_chart_agg": [ - "LEADERBOARD_CHART_AGG" - ] - }, - "leaderboard_chart_agg": { - "fact_top_scores": [ - "FACT_TOP_SCORES" - ] - } +prerequisites = { + "POWER_DEMAND_MODEL": ( + "fact_energy_trace", {"FACT_VEHICLE_TRACE"} + ), + "POWER_DEMAND_MODEL_DENOISED_ACCEL": ( + "fact_energy_trace", {"FACT_VEHICLE_TRACE"} + ), + "POWER_DEMAND_MODEL_DENOISED_ACCEL_VEL": ( + "fact_energy_trace", {"FACT_VEHICLE_TRACE"} + ), + "FACT_SAFETY_METRICS": ( + "fact_safety_metrics", {"FACT_VEHICLE_TRACE"} + ), + "FACT_NETWORK_THROUGHPUT_AGG": ( + "fact_network_throughput_agg", {"FACT_VEHICLE_TRACE"} + ), + "FACT_NETWORK_INFLOWS_OUTFLOWS": ( + "fact_network_inflows_outflows", {"FACT_VEHICLE_TRACE"} + ), + "FACT_VEHICLE_COUNTS_BY_TIME": ( + "fact_vehicle_counts_by_time", {"FACT_VEHICLE_TRACE"} + ), + "FACT_VEHICLE_FUEL_EFFICIENCY_AGG": ( + "fact_vehicle_fuel_efficiency_agg", {"FACT_VEHICLE_TRACE", + "POWER_DEMAND_MODEL_DENOISED_ACCEL"} + ), + "FACT_NETWORK_METRICS_BY_DISTANCE_AGG": ( + "fact_network_metrics_by_distance_agg", {"FACT_VEHICLE_TRACE", + "POWER_DEMAND_MODEL_DENOISED_ACCEL"} + ), + "FACT_NETWORK_METRICS_BY_TIME_AGG": ( + "fact_network_metrics_by_time_agg", {"FACT_VEHICLE_TRACE", + "POWER_DEMAND_MODEL_DENOISED_ACCEL"} + ), + "FACT_VEHICLE_FUEL_EFFICIENCY_BINNED": ( + "fact_vehicle_fuel_efficiency_binned", {"FACT_VEHICLE_FUEL_EFFICIENCY_AGG"} + ), + "FACT_NETWORK_FUEL_EFFICIENCY_AGG": ( + "fact_network_fuel_efficiency_agg", {"FACT_VEHICLE_FUEL_EFFICIENCY_AGG"} + ), + "FACT_SAFETY_METRICS_AGG": ( + "fact_safety_metrics_agg", {"FACT_SAFETY_METRICS"} + ), + "FACT_SAFETY_METRICS_BINNED": ( + "fact_safety_metrics_binned", {"FACT_SAFETY_METRICS"} + ), + "LEADERBOARD_CHART": ( + "leaderboard_chart", {"FACT_NETWORK_THROUGHPUT_AGG", + "FACT_NETWORK_FUEL_EFFICIENCY_AGG", + "FACT_SAFETY_METRICS_AGG"} + ), + "LEADERBOARD_CHART_AGG": ( + "leaderboard_chart_agg", {"LEADERBOARD_CHART"} + ), + "FACT_TOP_SCORES": ( + "fact_top_scores", {"LEADERBOARD_CHART_AGG"} + ), } +triggers = [ + "FACT_VEHICLE_TRACE", + "POWER_DEMAND_MODEL_DENOISED_ACCEL", + "FACT_VEHICLE_FUEL_EFFICIENCY_AGG", + "FACT_SAFETY_METRICS", + "FACT_NETWORK_THROUGHPUT_AGG", + "FACT_NETWORK_FUEL_EFFICIENCY_AGG", + "FACT_SAFETY_METRICS_AGG", + "LEADERBOARD_CHART", + "LEADERBOARD_CHART_AGG" +] + tables = [ "fact_vehicle_trace", "fact_energy_trace", "fact_vehicle_counts_by_time", "fact_safety_metrics", "fact_safety_metrics_agg", + "fact_safety_metrics_binned", "fact_network_throughput_agg", "fact_network_inflows_outflows", "fact_vehicle_fuel_efficiency_agg", + "fact_vehicle_fuel_efficiency_binned", "fact_network_metrics_by_distance_agg", "fact_network_metrics_by_time_agg", "fact_network_fuel_efficiency_agg", @@ -84,15 +96,16 @@ summary_tables = ["leaderboard_chart_agg", "fact_top_scores"] -network_using_edge = ["I-210 without Ramps"] - -X_FILTER = "x BETWEEN 500 AND 2300" - -EDGE_FILTER = "edge_id <> ALL (VALUES 'ghost0', '119257908#3')" - -WARMUP_STEPS = 600 * 3 * 0.4 - -HORIZON_STEPS = 1000 * 3 * 0.4 +network_filters = defaultdict(lambda: { + 'loc_filter': "x BETWEEN 500 AND 2300", + 'warmup_steps': 500 * 3 * 0.4, + 'horizon_steps': 1000 * 3 * 0.4 + }) +network_filters['I-210 without Ramps'] = { + 'loc_filter': "edge_id <> ALL (VALUES 'ghost0', '119257908#3')", + 'warmup_steps': 600 * 3 * 0.4, + 'horizon_steps': 1000 * 3 * 0.4 + } VEHICLE_POWER_DEMAND_TACOMA_FINAL_SELECT = """ SELECT @@ -231,7 +244,7 @@ class QueryStrings(Enum): value_lower_right*(headway-headway_lower)*(rel_speed_upper-leader_rel_speed) + value_upper_left*(headway_upper-headway)*(leader_rel_speed-rel_speed_lower) + value_upper_right*(headway-headway_lower)*(leader_rel_speed-rel_speed_lower) - ) / ((headway_upper-headway_lower)*(rel_speed_upper-rel_speed_lower)), 200) AS safety_value, + ) / ((headway_upper-headway_lower)*(rel_speed_upper-rel_speed_lower)), 200.0) AS safety_value, vt.source_id FROM fact_vehicle_trace vt LEFT OUTER JOIN fact_safety_matrix sm ON 1 = 1 @@ -248,13 +261,42 @@ class QueryStrings(Enum): FACT_SAFETY_METRICS_AGG = """ SELECT source_id, - SUM(CASE WHEN safety_value < 0 THEN 1 ELSE 0 END) * 100 / COUNT() safety_rate, + SUM(CASE WHEN safety_value < 0 THEN 1.0 ELSE 0.0 END) * 100.0 / COUNT() safety_rate, MAX(safety_value) AS safety_value_max FROM fact_safety_metrics WHERE 1 = 1 AND date = \'{date}\' AND partition_name = \'{partition}_FACT_SAFETY_METRICS\' GROUP BY 1 + ; + """ + + FACT_SAFETY_METRICS_BINNED = """ + WITH unfilter_bins AS ( + SELECT + ROW_NUMBER() OVER() - 51 AS lb, + ROW_NUMBER() OVER() - 50 AS ub + FROM fact_safety_metrics + ), bins AS ( + SELECT + lb, + ub + FROM unfilter_bins + WHERE 1=1 + AND lb >= -10 + AND ub <= 10 + ) + SELECT + CONCAT('[', CAST(bins.lb AS VARCHAR), ', ', CAST(bins.ub AS VARCHAR), ')') AS safety_value_bin, + COUNT() AS count + FROM bins, fact_safety_metrics fsm + WHERE 1 = 1 + AND fsm.date = \'{date}\' + AND fsm.partition_name = \'{partition}_FACT_SAFETY_METRICS\' + AND fsm.safety_value >= bins.lb + AND fsm.safety_value < bins.ub + GROUP BY 1 + ; """ FACT_NETWORK_THROUGHPUT_AGG = """ @@ -326,6 +368,35 @@ class QueryStrings(Enum): ; """ + FACT_VEHICLE_FUEL_EFFICIENCY_BINNED = """ + WITH unfilter_bins AS ( + SELECT + ROW_NUMBER() OVER() - 1 AS lb, + ROW_NUMBER() OVER() AS ub + FROM fact_safety_metrics + ) bins AS ( + SELECT + lb, + ub + FROM unfilter_bins + WHERE 1=1 + AND lb >= 0 + AND ub <= 20 + ) + SELECT + CONCAT('[', CAST(bins.lb AS VARCHAR), ', ', CAST(bins.ub AS VARCHAR), ')') AS fuel_efficiency_bin, + COUNT() AS count + FROM bins, fact_vehicle_fuel_efficiency_agg agg + WHERE 1 = 1 + AND agg.date = \'{date}\' + AND agg.partition_name = \'{partition}_FACT_VEHICLE_FUEL_EFFICIENCY_AGG\' + AND agg.energy_model_id = 'POWER_DEMAND_MODEL_DENOISED_ACCEL' + AND 1000 * agg.efficiency_meters_per_joules >= bins.lb + AND 1000 * agg.efficiency_meters_per_joules < bins.ub + GROUP BY 1 + ; + """ + FACT_NETWORK_FUEL_EFFICIENCY_AGG = """ SELECT source_id, @@ -701,7 +772,7 @@ class QueryStrings(Enum): SELECT network, submission_date, - LAG(max_score, 1) OVER (PARTITION BY network ORDER BY submission_date ASC) AS max_score + LAG(max_score IGNORE NULLS, 1) OVER (PARTITION BY network ORDER BY submission_date ASC) AS max_score FROM curr_max ), unioned AS ( SELECT * FROM curr_max