Skip to content

Commit

Permalink
Merge pull request #829 from corinne-hcr/modeling_and_functions
Browse files Browse the repository at this point in the history
Modeling and functions
  • Loading branch information
shankari authored Aug 19, 2021
2 parents 6126201 + 10772f8 commit bb83b42
Show file tree
Hide file tree
Showing 32 changed files with 2,406 additions and 145 deletions.
67 changes: 67 additions & 0 deletions bin/build_label_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Script to build and save the labeling model.
"""
import logging

import argparse
import uuid
import copy

import emission.pipeline.reset as epr
import emission.core.get_database as edb
import emission.core.wrapper.user as ecwu
import emission.storage.timeseries.abstract_timeseries as esta
import emission.analysis.modelling.tour_model_first_only.build_save_model as eamtb

def _get_user_list(args):
if args.all:
return _find_all_users()
elif args.platform:
return _find_platform_users(args.platform)
elif args.email_list:
return _email_2_user_list(args.email_list)
else:
assert args.user_list is not None
return [uuid.UUID(u) for u in args.user_list]

def _find_platform_users(platform):
# Since all new clients register a profile with the server, we don't have
# to run a 'distinct' query over the entire contents of the timeseries.
# Instead, we can simply query from the profile users, which is
# significantly faster
# Use the commented out line instead for better performance.
# Soon, we can move to the more performant option, because there will be
# no users that don't have a profile
# return edb.get_timeseries_db().find({'metadata.platform': platform}).distinct(
# 'user_id')
return edb.get_profile_db().find({"curr_platform": platform}).distinct("user_id")

def _find_all_users():
return esta.TimeSeries.get_uuid_list()

def _email_2_user_list(email_list):
return [ecwu.User.fromEmail(e).uuid for e in email_list]

if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
level=logging.DEBUG)

parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--all", action="store_true", default=False,
help="build the model for all users")
group.add_argument("-p", "--platform", choices = ['android', 'ios'],
help="build the model for all on the specified platform")
group.add_argument("-u", "--user_list", nargs='+',
help="user ids to build the model for")
group.add_argument("-e", "--email_list", nargs='+',
help="email addresses to build the model for")

args = parser.parse_args()
print(args)

user_list = _get_user_list(args)
logging.info("received list with %s users" % user_list)
for user_id in user_list:
logging.info("building model for user %s" % user_id)
eamtb.build_user_model(user_id)
41 changes: 41 additions & 0 deletions bin/debug/label_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import emission.core.get_database as edb
import uuid
import argparse


parser = argparse.ArgumentParser(prog="intake_single_user")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--user_email")
group.add_argument("-u", "--user_uuid")

args = parser.parse_args()
if args.user_uuid:
sel_uuid = uuid.UUID(args.user_uuid)
else:
sel_uuid = ecwu.User.fromEmail(args.user_email).uuid

print("All inferred trips %s" % edb.get_analysis_timeseries_db().count_documents({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid}))

print("Inferred trips with inferences %s" % edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid, "data.inferred_labels": {"$ne": []}}).count())

print("All expected trips %s" % edb.get_analysis_timeseries_db().count_documents({"metadata.key": "analysis/expected_trip", "user_id": sel_uuid}))

print("Expected trips with inferences %s" % edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/expected_trip", "user_id": sel_uuid, "data.expectation": {"$exists": True}}).count())

for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})):
if t["data"]["inferred_labels"] != []:
confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"],
"metadata.key": "analysis/confirmed_trip",
"data.start_ts": t["data"]["start_ts"]})
if confirmed_trip is None:
print("No matching confirmed trip for %s" % t["data"]["start_fmt_time"])
continue

if confirmed_trip["data"]["user_input"] == {}:
print("Found confirmed trip with matching inferred trip, without user labels")

print("all inferred trips %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid}).count()))
print("all confirmed trips %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid}).count()))
print("confirmed trips with inferred labels %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.inferred_labels": {"$ne": []}}).count()))
print("confirmed trips without inferred labels %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.inferred_labels": []}).count()))
print("confirmed trips with expectation %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.expectation": {"$exists": True}}).count()))
29 changes: 29 additions & 0 deletions bin/debug/reset_partial_label_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json
import logging
import argparse
import numpy as np
import uuid

import emission.core.get_database as edb
import emission.storage.decorations.analysis_timeseries_queries as esda


parser = argparse.ArgumentParser(prog="reset_partial_label_testing")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-i", "--inferred", action='store_true')
group.add_argument("-c", "--confirmed", action='store_true')

args = parser.parse_args()

if args.inferred:
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.INFERRED_TRIP_KEY}).raw_result)
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.EXPECTED_TRIP_KEY}).raw_result)
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": "inference/labels"}).raw_result)
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": "analysis/inferred_labels"}).raw_result)
print(edb.get_pipeline_state_db().delete_many({"pipeline_stage": {"$in": [14,15]}}).raw_result)

if args.confirmed:
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.EXPECTED_TRIP_KEY}).raw_result)
print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.CONFIRMED_TRIP_KEY}).raw_result)
print(edb.get_pipeline_state_db().delete_many({"pipeline_stage": {"$in": [13]}}).raw_result)

30 changes: 30 additions & 0 deletions emission/analysis/classification/inference/labels/ensembles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This file encapsulates the various ensemble algorithms that take a trip and a list of primary predictions and return a label data structure

import copy
import logging

import emission.core.wrapper.labelprediction as ecwl

# This placeholder ensemble simply returns the first prediction run
def ensemble_first_prediction(trip, predictions):
# Since this is not a real ensemble yet, we will not mark it as such
# algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE
algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"]);
prediction = copy.copy(predictions[0]["prediction"])
return algorithm_id, prediction

# If we get a real prediction, use it, otherwise fallback to the placeholder
def ensemble_real_and_placeholder(trip, predictions):
if predictions[0]["prediction"] != []:
sel_prediction = predictions[0]
logging.debug(f"Found real prediction {sel_prediction}, using that preferentially")
# assert sel_prediction.algorithm_id == ecwl.AlgorithmTypes.TWO_STAGE_BIN_CLUSTER
else:
sel_prediction = predictions[1]
logging.debug(f"No real prediction found, using placeholder prediction {sel_prediction}")
# Use a not equal assert since we may want to change the placeholder
assert sel_prediction.algorithm_id != ecwl.AlgorithmTypes.TWO_STAGE_BIN_CLUSTER

algorithm_id = ecwl.AlgorithmTypes(sel_prediction["algorithm_id"])
prediction = copy.copy(sel_prediction["prediction"])
return algorithm_id, prediction
153 changes: 153 additions & 0 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# This file encapsulates the various prediction algorithms that take a trip and return a label data structure
# Named "inferrers.py" instead of "predictors.py" to avoid a name collection in our abbreviated import convention

import logging
import random
import copy

import emission.analysis.modelling.tour_model_first_only.load_predict as lp

# A set of placeholder predictors to allow pipeline development without a real inference algorithm.
# For the moment, the system is configured to work with two labels, "mode_confirm" and
# "purpose_confirm", so I'll do that.

# The first placeholder scenario represents a case where it is hard to distinguish between
# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish
# between work and shopping at the grocery store (e.g., because the user works at the
# grocery store), but whenever the user bikes to the location it is to work and whenever
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
def placeholder_predictor_0(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]


# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
def placeholder_predictor_1(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []


# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
def placeholder_predictor_2(trip):
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]


# This fourth scenario provides labels designed to test the expectation and notification system.
def placeholder_predictor_3(trip):
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]

# Placeholder that is suitable for a demo.
# Finds all unique label combinations for this user and picks one randomly
def placeholder_predictor_demo(trip):
import random

import emission.core.get_database as edb
user = trip["user_id"]
unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input")
if len(unique_user_inputs) == 0:
return []
random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else []

logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}")
return [{"labels": random_user_input, "p": random.random()}]

# Non-placeholder implementation. First bins the trips, and then clusters every bin
# See emission.analysis.modelling.tour_model for more details
# Assumes that pre-built models are stored in working directory
# Models are built using evaluation_pipeline.py and build_save_model.py
# This algorithm is now DEPRECATED in favor of predict_cluster_confidence_discounting (see https://github.com/e-mission/e-mission-docs/issues/663)
def predict_two_stage_bin_cluster(trip):
return lp.predict_labels(trip)

# Reduce the confidence of the clustering prediction when the number of trips in the cluster is small
# See https://github.com/e-mission/e-mission-docs/issues/663
def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confidence_multiplier=None):
if max_confidence is None: max_confidence = 0.99 # Confidence coefficient for n approaching infinity -- in the GitHub issue, this is 1-A
if first_confidence is None: first_confidence = 0.80 # Confidence coefficient for n = 1 -- in the issue, this is B
if confidence_multiplier is None: confidence_multiplier = 0.30 # How much of the remaining removable confidence to remove between n = k and n = k+1 -- in the issue, this is C
return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue

# predict_two_stage_bin_cluster but with the above reduction in confidence
def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
labels, n = lp.predict_labels_with_n(trip)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
return labels

confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
Loading

0 comments on commit bb83b42

Please sign in to comment.