Skip to content

Commit

Permalink
Merge pull request #795 from shankari/add_infinite_scroll
Browse files Browse the repository at this point in the history
Server side changes to support infinite scroll
  • Loading branch information
shankari authored Feb 17, 2021
2 parents c42e075 + 56da76b commit b53f8df
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 10 deletions.
42 changes: 38 additions & 4 deletions emission/net/api/cfc_webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ def getPipelineState():
user_uuid = getUUID(request)
return {"complete_ts": pipeline.get_complete_ts(user_uuid)}

@post("/pipeline/get_range_ts")
def getPipelineState():
user_uuid = getUUID(request)
(start_ts, end_ts) = pipeline.get_range(user_uuid)
return {
"start_ts": start_ts,
"end_ts": end_ts
}

@post("/datastreams/find_entries/<time_type>")
def getTimeseriesEntries(time_type):
if 'user' not in request.json:
Expand All @@ -206,21 +215,46 @@ def getTimeseriesEntries(time_type):
if 'from_local_date' in request.json and 'to_local_date' in request.json:
start_time = request.json['from_local_date']
end_time = request.json['to_local_date']
time_query = esttc.TimeComponentQuery("metadata.write_ts",
time_key = request.json.get('key_local_date', 'metadata.write_ts')
time_query = esttc.TimeComponentQuery(time_key,
start_time,
end_time)
else:
start_time = request.json['start_time']
end_time = request.json['end_time']
time_query = estt.TimeQuery("metadata.write_ts",
start_time,
end_time)
time_key = request.json.get('key_time', 'metadata.write_ts')
time_query = estt.TimeQuery(time_key,
start_time,
end_time)
# Note that queries from usercache are limited to 100,000 entries
# and entries from timeseries are limited to 250,000, so we will
# return at most 350,000 entries. So this means that we don't need
# additional filtering, but this should be documented in
# the API
data_list = esdc.find_entries(user_uuid, key_list, time_query)
if 'max_entries' in request.json:
me = request.json['max_entries']
if (type(me) != int):
logging.error("aborting: max entry count is %s, type %s, expected int" % (me, type(me)))
abort(500, "Invalid max_entries %s" % me)

if len(data_list) > me:
if request.json['trunc_method'] == 'first':
logging.debug("first n entries is %s" % me)
data_list = data_list[:me]
if request.json['trunc_method'] == 'last':
logging.debug("first n entries is %s" % me)
data_list = data_list[-me:]
elif request.json["trunc_method"] == "sample":
sample_rate = len(data_list)//me + 1
logging.debug("sampling rate is %s" % sample_rate)
data_list = data_list[::sample_rate]
else:
logging.error("aborting: unexpected sampling method %s" % request.json["trunc_method"])
abort(500, "sampling method not specified while retriving limited data")
else:
logging.debug("Found %d entries < %s, no truncation" % (len(data_list), me))
logging.debug("successfully returning list of size %s" % len(data_list))
return {'phone_data': data_list}

@post('/usercache/get')
Expand Down
25 changes: 25 additions & 0 deletions emission/net/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,35 @@
standard_library.install_aliases()
from builtins import *
import logging
import pymongo

import emission.storage.pipeline_queries as esp
import emission.storage.timeseries.abstract_timeseries as esta

def get_complete_ts(user_id):
complete_ts = esp.get_complete_ts(user_id)
logging.debug("Returning complete_ts = %s" % complete_ts)
return complete_ts

def get_range(user_id):
ts = esta.TimeSeries.get_time_series(user_id)
start_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = None

end_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = None

complete_ts = get_complete_ts(user_id)
if complete_ts is not None and end_ts is not None\
and (end_ts != (complete_ts - esp.END_FUZZ_AVOID_LTE)):
logging.exception("end_ts %s != complete_ts no fuzz %s" %
(end_ts, (complete_ts - esp.END_FUZZ_AVOID_LTE)))

logging.debug("Returning range (%s, %s)" % (start_ts, end_ts))
return (start_ts, end_ts)
2 changes: 1 addition & 1 deletion emission/net/usercache/builtin_usercache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def storeTimelineToCache(self, time_query):
trip_gj_list = self.get_trip_list_for_seven_days(start_ts)
if len(trip_gj_list) == 0:
ts = etsa.TimeSeries.get_time_series(self.user_id)
max_loc_ts = ts.get_max_value_for_field("background/filtered_location", "data.ts")
max_loc_ts = ts.get_first_value_for_field("background/filtered_location", "data.ts", pymongo.DESCENDING)
if max_loc_ts == -1:
logging.warning("No entries for user %s, early return " % self.user_id)
return
Expand Down
4 changes: 3 additions & 1 deletion emission/storage/timeseries/abstract_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ def get_data_df(self, key, time_query = None, geo_query=None, extra_query_list=N
"""
pass

def get_max_value_for_field(self, key, field):
def get_first_value_for_field(self, key, sort_order, field):
"""
Currently used to get the max value of the location values so that we can send data
that actually exists into the usercache. Is that too corner of a use case? Do we want to do
this in some other way?
:param key: the metadata key for the entries, used to identify the stream
:param field: the field in the stream whose max value we want.
:param time_query: the time range in which to search the stream
:param sort_order: pymongo.ASCENDING or pymongon.DESCENDING
It is assumed that the values for the field are sortable.
:return: the max value for the field in the stream identified by key. -1 if there are no entries for the key.
"""
Expand Down
5 changes: 3 additions & 2 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,20 @@ def to_data_df(key, entry_it, map_fn = None):
return deduped_df.reset_index(drop=True)


def get_max_value_for_field(self, key, field, time_query=None):
def get_first_value_for_field(self, key, field, sort_order, time_query=None):
"""
Currently used to get the max value of the location values so that we can send data
that actually exists into the usercache. Is that too corner of a use case? Do we want to do
this in some other way?
:param key: the metadata key for the entries, used to identify the stream
:param field: the field in the stream whose max value we want.
:param time_query: the time range in which to search the stream
:param sort_order: pymongo.ASCENDING or pymongon.DESCENDING
It is assumed that the values for the field are sortable.
:return: the max value for the field in the stream identified by key. -1 if there are no entries for the key.
"""
result_it = self.get_timeseries_db(key).find(self._get_query([key], time_query),
{"_id": False, field: True}).sort(field, pymongo.DESCENDING).limit(1)
{"_id": False, field: True}).sort(field, sort_order).limit(1)
result_list = list(result_it)
if len(result_list) == 0:
return -1
Expand Down
2 changes: 1 addition & 1 deletion emission/storage/timeseries/non_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def find_entries(self, key_list = None, time_query = None, geo_query = None,
# get_entry_at_ts is unchanged
# get_data_df is unchanged
# to_data_df is unchanged
# get_max_value_for_field is unchanged
# get_first_value_for_field is unchanged
# bulk_insert is unchanged

def insert(self, entry):
Expand Down
42 changes: 42 additions & 0 deletions emission/tests/netTests/TestPipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import unittest
import logging
import arrow
import os

import emission.core.get_database as edb
import emission.core.wrapper.localdate as ecwl
import emission.tests.common as etc

from emission.net.api import pipeline

class TestPipeline(unittest.TestCase):
def setUp(self):
etc.setupRealExample(self,
"emission/tests/data/real_examples/shankari_2015-aug-21")
self.testUUID1 = self.testUUID
etc.setupRealExample(self,
"emission/tests/data/real_examples/shankari_2015-aug-27")

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID})
edb.get_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID1})

def testNoAnalysisResults(self):
self.assertEqual(pipeline.get_range(self.testUUID), (None, None))

def testAnalysisResults(self):
self.assertEqual(pipeline.get_range(self.testUUID), (None, None))
etc.runIntakePipeline(self.testUUID)
self.assertAlmostEqual(pipeline.get_range(self.testUUID), (1440688739.672, 1440729142.709))

if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
unittest.main()
3 changes: 2 additions & 1 deletion emission/tests/storageTests/TestTimeSeries.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datetime as pydt
import logging
import json
import pymongo

# Our imports
import emission.core.get_database as edb
Expand Down Expand Up @@ -56,7 +57,7 @@ def testGetEntryAtTs(self):

def testGetMaxValueForField(self):
ts = esta.TimeSeries.get_time_series(self.testUUID)
self.assertEqual(ts.get_max_value_for_field("background/filtered_location", "data.ts"), 1440729334.797)
self.assertEqual(ts.get_first_value_for_field("background/filtered_location", "data.ts", pymongo.DESCENDING), 1440729334.797)

def testGetDataDf(self):
ts = esta.TimeSeries.get_time_series(self.testUUID)
Expand Down

0 comments on commit b53f8df

Please sign in to comment.