Skip to content

Commit

Permalink
backend/enhancement: Stop Detection
Browse files Browse the repository at this point in the history
  • Loading branch information
khuzema786 committed Oct 18, 2024
1 parent 77e2526 commit f00da81
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 108 deletions.
134 changes: 29 additions & 105 deletions crates/location_tracking_service/src/common/stop_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,9 @@
*/

use crate::common::types::*;
use crate::common::utils::{
distance_between_in_meters, get_bucket_from_timestamp, get_bucket_weightage_from_timestamp,
};
use crate::common::utils::distance_between_in_meters;
use crate::environment::StopDetectionConfig;

/// Filters driver locations based on bucket weightage, including both current and previous bucket.
///
/// # Arguments
///
/// * `locations` - A slice of `DriverLocation` representing the driver's location history.
/// * `bucket_size` - The size of the bucket in terms of time (e.g., seconds).
/// * `curr_bucket` - The current bucket based on the latest driver's location timestamp.
/// * `curr_bucket_capacity_based_on_weightage` - The weightage used to limit the number of points in the current bucket.
///
/// # Returns
///
/// A vector containing filtered driver locations based on the bucket weightage (current and previous buckets).
fn filter_locations_based_on_bucket_weightage(
locations: &[DriverLocation],
bucket_size: u64,
curr_bucket: u64,
curr_bucket_capacity_based_on_weightage: u64,
) -> Vec<DriverLocation> {
let prev_bucket_capacity_based_on_weightage =
std::cmp::max(0, bucket_size - curr_bucket_capacity_based_on_weightage);

// Using fold to filter locations
let (mut filtered_locations, _, _) = locations.iter().rev().fold(
(
vec![],
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
),
|(
mut locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
),
location| {
let bucket = get_bucket_from_timestamp(&bucket_size, location.timestamp);
if bucket == curr_bucket {
if curr_bucket_capacity_based_on_weightage > 0 {
locations.push(location.to_owned());
(
locations,
curr_bucket_capacity_based_on_weightage - 1,
prev_bucket_capacity_based_on_weightage,
)
} else {
locations.push(location.to_owned());
(
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage - 1,
)
}
} else if bucket == curr_bucket - 1 && prev_bucket_capacity_based_on_weightage > 0 {
locations.push(location.to_owned());
(
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage - 1,
)
} else {
(
locations,
curr_bucket_capacity_based_on_weightage,
prev_bucket_capacity_based_on_weightage,
)
}
},
);

filtered_locations.reverse(); // Ensure locations are returned in correct order
filtered_locations
}
use std::collections::VecDeque;

/// Calculates the mean location (latitude and longitude) based on a set of driver locations.
///
Expand All @@ -94,7 +21,7 @@ fn filter_locations_based_on_bucket_weightage(
/// # Returns
///
/// A `Point` representing the mean location.
fn calculate_mean_location(locations: &[DriverLocation], total_points: usize) -> Point {
fn calculate_mean_location(locations: &VecDeque<DriverLocation>, total_points: usize) -> Point {
let location_sum = locations.iter().fold(
Point {
lat: Latitude(0.0),
Expand All @@ -118,6 +45,7 @@ fn calculate_mean_location(locations: &[DriverLocation], total_points: usize) ->
/// * `mean_location` - The calculated mean location of driver pings.
/// * `total_points` - The total number of points used to calculate the mean.
/// * `latest_location` - The most recent driver location.
/// * `speed` - The speed in m/s of the most recent driver location.
/// * `config` - Configuration for stop detection.
///
/// # Returns
Expand All @@ -127,11 +55,15 @@ fn is_stop_detected(
mean_location: &Point,
total_points: usize,
latest_location: &Point,
speed: Option<SpeedInMeterPerSecond>,
config: &StopDetectionConfig,
) -> bool {
let distance = distance_between_in_meters(mean_location, latest_location);
distance < config.radius_threshold_meters as f64
&& total_points >= config.min_points_within_radius_threshold
&& speed.map_or(false, |speed| {
speed >= SpeedInMeterPerSecond(config.max_eligible_stop_speed_threshold)
})
}

/// Detects whether the driver has stopped based on a sliding window of driver locations and the stop detection configuration.
Expand All @@ -152,56 +84,48 @@ pub fn detect_stop(
driver_ride_status: Option<&RideStatus>,
stop_detection: Option<StopDetection>,
latest_driver_location: DriverLocation,
speed: Option<SpeedInMeterPerSecond>,
config: &StopDetectionConfig,
) -> (Option<Point>, Option<StopDetection>) {
if driver_ride_status != Some(&RideStatus::NEW) {
return (None, None);
}

// Determine the current time bucket and the associated weightage
let curr_bucket = get_bucket_from_timestamp(
&config.duration_threshold_seconds,
latest_driver_location.timestamp,
);

let curr_bucket_weightage = get_bucket_weightage_from_timestamp(
&config.duration_threshold_seconds,
latest_driver_location.timestamp,
);

let curr_bucket_max_points_to_consider =
curr_bucket_weightage * config.min_points_within_radius_threshold as u64;

if let Some(stop_detection) = stop_detection {
let mut locations = filter_locations_based_on_bucket_weightage(
&stop_detection.locations,
config.duration_threshold_seconds,
curr_bucket,
curr_bucket_max_points_to_consider,
);

let mean_location = calculate_mean_location(&locations, locations.len());
if let Some(mut stop_detection) = stop_detection {
let mean_location =
calculate_mean_location(&stop_detection.locations, stop_detection.locations.len());

let stop_detected = if is_stop_detected(
&mean_location,
locations.len(),
stop_detection.locations.len(),
&latest_driver_location.location,
speed,
config,
) {
locations.clear(); // Clear the history if a stop is detected
// Clear the history if a stop is detected
stop_detection.locations.clear();
Some(mean_location)
} else {
locations.push(latest_driver_location); // Add the latest location
if stop_detection.locations.len() >= config.min_points_within_radius_threshold {
// Remove oldest location to always maintain a window of `min_points_within_radius_threshold`
stop_detection.locations.pop_front();
}
// Add the latest location
stop_detection.locations.push_back(latest_driver_location);
None
};

(stop_detected, Some(StopDetection { locations }))
(
stop_detected,
Some(StopDetection {
locations: stop_detection.locations,
}),
)
} else {
// First time: start with the latest location
(
None,
Some(StopDetection {
locations: vec![latest_driver_location],
locations: VecDeque::new(),
}),
)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/location_tracking_service/src/common/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

/* Copyright 2022-23, Juspay India Pvt Ltd
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License
as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program
Expand Down Expand Up @@ -193,7 +195,7 @@ pub struct DriverLocation {

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct StopDetection {
pub locations: Vec<DriverLocation>,
pub locations: VecDeque<DriverLocation>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async fn process_driver_locations(
location: latest_driver_location.pt.to_owned(),
timestamp: latest_driver_location.ts,
},
latest_driver_location.v,
&data.stop_detection,
);

Expand Down
2 changes: 1 addition & 1 deletion crates/location_tracking_service/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct RedisConfig {
pub struct StopDetectionConfig {
#[serde(deserialize_with = "deserialize_url")]
pub stop_detection_update_callback_url: Url,
pub duration_threshold_seconds: u64,
pub max_eligible_stop_speed_threshold: f64,
pub radius_threshold_meters: u64,
pub min_points_within_radius_threshold: usize,
}
Expand Down
2 changes: 1 addition & 1 deletion dhall-configs/dev/location_tracking_service.dhall
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ let logger_cfg = {

let stop_detection_config = {
stop_detection_update_callback_url = "http://127.0.0.1:8016/internal/stopDetection",
duration_threshold_seconds = 60,
max_eligible_stop_speed_threshold = 2,
radius_threshold_meters = 25,
min_points_within_radius_threshold = 5,
}
Expand Down

0 comments on commit f00da81

Please sign in to comment.