diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..e233183a8 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -7,6 +7,7 @@ from builtins import * from builtins import object import logging +import time import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.place_queries as esdp @@ -23,6 +24,9 @@ import emission.analysis.intake.segmentation.restart_checking as eaisr import emission.core.common as ecc +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class TripSegmentationMethod(object): def segment_into_trips(self, timeseries, time_query): @@ -47,68 +51,172 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - time_query = epq.get_time_range_for_segmentation(user_id) + with ect.Timer() as timer_get_time_series: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/get_time_series", + time.time(), + timer_get_time_series.elapsed + ) + + with ect.Timer() as timer_get_time_range: + time_query = epq.get_time_range_for_segmentation(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/get_time_range", + time.time(), + timer_get_time_range.elapsed + ) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins - point_threshold = 9, - distance_threshold = 100) # 100 m - - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 10 mins - point_threshold = 9, - distance_threshold = 50) # 50 m - - filter_methods = {"time": dstfsm, "distance": dsdfsm} - filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} - # We need to use the appropriate filter based on the incoming data - # So let's read in the location points for the specified query - loc_df = ts.get_data_df("background/filtered_location", time_query) + + + with ect.Timer() as timer_initialize_filters: + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/initialize_filters", + time.time(), + timer_initialize_filters.elapsed + ) + + with ect.Timer() as timer_setup_filter_methods: + filter_methods = {"time": dstfsm, "distance": dsdfsm} + filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} + # We need to use the appropriate filter based on the incoming data + # So let's read in the location points for the specified query + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/setup_filter_methods", + time.time(), + timer_setup_filter_methods.elapsed + ) + + with ect.Timer() as timer_fetch_location_data: + loc_df = ts.get_data_df("background/filtered_location", time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/fetch_location_data", + time.time(), + timer_fetch_location_data.elapsed + ) + if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") epq.mark_segmentation_done(user_id, None) return - out_of_order_points = loc_df[loc_df.ts.diff() < 0] + with ect.Timer() as timer_check_out_of_order: + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/check_out_of_order_points", + time.time(), + timer_check_out_of_order.elapsed + ) + if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - - filters_in_df = loc_df["filter"].dropna().unique() - logging.debug("Filters in the dataframe = %s" % filters_in_df) + with ect.Timer() as timer_handle_out_of_order: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/handle_out_of_order_points", + time.time(), + timer_handle_out_of_order.elapsed + ) + + with ect.Timer() as timer_identify_filters: + filters_in_df = loc_df["filter"].dropna().unique() + logging.debug("Filters in the dataframe = %s" % filters_in_df) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/identify_active_filters", + time.time(), + timer_identify_filters.elapsed + ) + if len(filters_in_df) == 1: - # Common case - let's make it easy - - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, - time_query) + with ect.Timer() as timer_segment_single_filter: + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/segment_single_filter", + time.time(), + timer_segment_single_filter.elapsed + ) else: - segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, - filters_in_df, - filter_methods) - # Create and store trips and places based on the segmentation points - if segmentation_points is None: - epq.mark_segmentation_failed(user_id) - elif len(segmentation_points) == 0: - # no new segments, no need to keep looking at these again - logging.debug("len(segmentation_points) == 0, early return") - epq.mark_segmentation_done(user_id, None) - else: - try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) - epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) - except: - logging.exception("Trip generation failed for user %s" % user_id) + with ect.Timer() as timer_segment_combined_filters: + segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, + filters_in_df, + filter_methods) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/segment_combined_filters", + time.time(), + timer_segment_combined_filters.elapsed + ) + + with ect.Timer() as timer_process_segmentation: + if segmentation_points is None: epq.mark_segmentation_failed(user_id) + elif len(segmentation_points) == 0: + # no new segments, no need to keep looking at these again + logging.debug("len(segmentation_points) == 0, early return") + epq.mark_segmentation_done(user_id, None) + else: + try: + with ect.Timer() as timer_create_places_trips: + create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/create_places_and_trips", + time.time(), + timer_create_places_trips.elapsed + ) + + with ect.Timer() as timer_mark_done: + epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/mark_segmentation_done", + time.time(), + timer_mark_done.elapsed + ) + except: + with ect.Timer() as timer_handle_failure: + logging.exception("Trip generation failed for user %s" % user_id) + epq.mark_segmentation_failed(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/handle_segmentation_failure", + time.time(), + timer_handle_failure.elapsed + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/SEGMENT_CURRENT_TRIPS/process_segmentation_points", + time.time(), + timer_process_segmentation.elapsed + ) + def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): """ @@ -181,13 +289,40 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na # description of dealing with gaps in tracking can be found in the wiki. # Let us first deal with the easy case. # restart_events_df = get_restart_events(ts, time_query) - ts = esta.TimeSeries.get_time_series(user_id) - last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + with ect.Timer() as timer_retrieve_ts: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/retrieve_time_series", + time.time(), + timer_retrieve_ts.elapsed + ) + + with ect.Timer() as timer_get_last_place: + last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/get_last_place_entry", + time.time(), + timer_get_last_place.elapsed + ) + if last_place_entry is None: - last_place = start_new_chain(user_id) - last_place.source = segmentation_method_name - last_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", last_place, create_id = True) + with ect.Timer() as timer_start_new_chain: + last_place = start_new_chain(user_id) + last_place.source = segmentation_method_name + last_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + last_place, + create_id=True + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/start_new_chain", + time.time(), + timer_start_new_chain.elapsed + ) else: last_place = last_place_entry.data @@ -204,45 +339,97 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc)) # Stitch together the last place and the current trip - curr_trip = ecwrt.Rawtrip() - curr_trip.source = segmentation_method_name - curr_trip_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_trip", curr_trip, create_id = True) - - new_place = ecwrp.Rawplace() - new_place.source = segmentation_method_name - new_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", new_place, create_id = True) - - if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): - # Fill in the gap in the chain with an untracked period - curr_untracked = ecwut.Untrackedtime() - curr_untracked.source = segmentation_method_name - curr_untracked_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_untracked", curr_untracked, create_id=True) - - restarted_place = ecwrp.Rawplace() - restarted_place.source = segmentation_method_name - restarted_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", restarted_place, create_id=True) - - untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location", - "data.ts", last_place_entry.data.enter_ts)).data - untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE - _link_and_save(ts, last_place_entry, curr_untracked_entry, restarted_place_entry, - untracked_start_loc, start_loc) - logging.debug("Created untracked period %s from %s to %s" % - (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts)) - logging.debug("Resetting last_place_entry from %s to %s" % - (last_place_entry, restarted_place_entry)) - last_place_entry = restarted_place_entry - - _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc) - last_place_entry = new_place_entry - - # The last last_place hasn't been stitched together yet, but we - # need to save it so that it can be the last_place for the next run - ts.insert(last_place_entry) + with ect.Timer() as timer_stitch: + curr_trip = ecwrt.Rawtrip() + curr_trip.source = segmentation_method_name + curr_trip_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_trip", + curr_trip, + create_id=True + ) + + new_place = ecwrp.Rawplace() + new_place.source = segmentation_method_name + new_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + new_place, + create_id=True + ) + + if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): + # Fill in the gap in the chain with an untracked period + curr_untracked = ecwut.Untrackedtime() + curr_untracked.source = segmentation_method_name + curr_untracked_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_untracked", + curr_untracked, + create_id=True + ) + + restarted_place = ecwrp.Rawplace() + restarted_place.source = segmentation_method_name + restarted_place_entry = ecwe.Entry.create_entry( + user_id, + "segmentation/raw_place", + restarted_place, + create_id=True + ) + + untracked_start_loc = ecwe.Entry( + ts.get_entry_at_ts( + "background/filtered_location", + "data.ts", + last_place_entry.data.enter_ts + ) + ).data + untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE + _link_and_save( + ts, + last_place_entry, + curr_untracked_entry, + restarted_place_entry, + untracked_start_loc, + start_loc + ) + logging.debug( + "Created untracked period %s from %s to %s" % + (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts) + ) + logging.debug( + "Resetting last_place_entry from %s to %s" % + (last_place_entry, restarted_place_entry) + ) + last_place_entry = restarted_place_entry + + _link_and_save( + ts, + last_place_entry, + curr_trip_entry, + new_place_entry, + start_loc, + end_loc + ) + last_place_entry = new_place_entry + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/stitch_trip_place", + time.time(), + timer_stitch.elapsed + ) + + with ect.Timer() as timer_insert_last_place: + # The last last_place hasn't been stitched together yet, but we + # need to save it so that it can be the last_place for the next run + ts.insert(last_place_entry) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.name + "/CREATE_PLACES_TRIPS/insert_last_place", + time.time(), + timer_insert_last_place.elapsed + ) def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc): stitch_together_start(last_place_entry, curr_trip_entry, start_loc)