Skip to content

Commit

Permalink
rove_connector: handle SpaceSpec::All case
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Dec 3, 2024
1 parent ff195f8 commit dddfc5a
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rove_connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ bb8.workspace = true
bb8-postgres.workspace = true
chrono.workspace = true
chronoutil.workspace = true
postgres-types.workspace = true
rove.workspace = true
thiserror.workspace = true
tokio-postgres.workspace = true
Expand Down
193 changes: 171 additions & 22 deletions rove_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, TimeZone, Utc};
use chronoutil::RelativeDuration;
use rove::data_switch::{self, DataCache, DataConnector, SpaceSpec, TimeSpec, Timeseries};
use thiserror::Error;
use tokio_postgres::NoTls;
use tokio_postgres::{types::FromSql, NoTls};

#[derive(Error, Debug)]
#[non_exhaustive]
Expand All @@ -20,26 +20,27 @@ pub struct Connector {
pool: PgConnectionPool,
}

#[derive(Debug, FromSql)]
struct Obs {
value: f32,
time: DateTime<Utc>,
}

// TODO: this should probably live somewhere else
#[derive(Debug, FromSql)]
#[postgres(name = "location")]
pub struct Location {
lat: Option<f32>,
lon: Option<f32>,
hamsl: Option<f32>,
_hag: Option<f32>,
}

fn extract_time_spec(
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
) -> Result<(DateTime<Utc>, DateTime<Utc>, &str), data_switch::Error> {
// TODO: matching intervals like this is a hack, but currently necessary to avoid
// SQL injection. Ideally we could pass an interval type as a query param, which would
// also save us the query_string allocation, but no ToSql implementations for intervals
// currently exist in tokio_postgres, so we need to implement it ourselves.
let interval = match time_spec.time_resolution {
x if x == RelativeDuration::minutes(1) => "1 minute",
x if x == RelativeDuration::hours(1) => "1 hour",
x if x == RelativeDuration::days(1) => "1 day",
_ => {
return Err(data_switch::Error::Other(Box::new(
Error::UnhandledTimeResolution(time_spec.time_resolution),
)))
}
};

) -> Result<(DateTime<Utc>, DateTime<Utc>), data_switch::Error> {
// TODO: should time_spec just use chrono timestamps instead of unix?
// IIRC the reason for unix timestamps was easy compatibility with protobuf, but that's
// less of a priority now
Expand All @@ -50,7 +51,41 @@ fn extract_time_spec(
let end_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap()
+ (time_spec.time_resolution * num_trailing_points.into());

Ok((start_time, end_time, interval))
Ok((start_time, end_time))
}

// TODO: does the input type match postgres-types?
fn regularize(
obses: Vec<Obs>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
time_resolution: RelativeDuration,
expected_len: usize,
) -> Vec<Option<f32>> {
let mut out = Vec::with_capacity(expected_len);
let mut curr_obs_time = start_time;

for obs in obses {
while curr_obs_time < obs.time {
out.push(None);
curr_obs_time = curr_obs_time + time_resolution;
}
if curr_obs_time == obs.time {
out.push(Some(obs.value));
curr_obs_time = curr_obs_time + time_resolution;
} else {
// In this case the observation is misaligned, so we should skip it. There's a case
// to be made for returning an error, but I think we ought to be more robust.
continue;
}
}

while curr_obs_time <= end_time {
out.push(None);
curr_obs_time = curr_obs_time + time_resolution;
}

out
}

impl Connector {
Expand All @@ -61,9 +96,29 @@ impl Connector {
num_leading_points: u8,
num_trailing_points: u8,
) -> Result<DataCache, data_switch::Error> {
let (start_time, end_time, interval) =
// TODO: matching intervals like this is a hack, but currently necessary to avoid
// SQL injection. Ideally we could pass an interval type as a query param, which would
// also save us the query_string allocation, but no ToSql implementations for intervals
// currently exist in tokio_postgres, so we need to implement it ourselves.
let interval = match time_spec.time_resolution {
x if x == RelativeDuration::minutes(1) => "1 minute",
x if x == RelativeDuration::hours(1) => "1 hour",
x if x == RelativeDuration::days(1) => "1 day",
_ => {
return Err(data_switch::Error::Other(Box::new(
Error::UnhandledTimeResolution(time_spec.time_resolution),
)))
}
};

let (start_time, end_time) =
extract_time_spec(time_spec, num_leading_points, num_trailing_points)?;

// TODO: should this contain an ORDER BY?
// TODO: should we drop ts_rule.timestamp from the SELECT? we don't seem to use it
// TODO: should we make this like the fetch_all query and regularize outside the query?
// I think this query might perform badly because the join against the generated series
// doesn't use the index optimally. Doing this would also save us the "interval" mess
let query_string = format!("SELECT data.obsvalue, ts_rule.timestamp \
FROM (SELECT data.obsvalue, data.obstime FROM data WHERE data.timeseries = $1) as data
RIGHT JOIN generate_series($2::timestamptz, $3::timestamptz, interval '{}') AS ts_rule(timestamp) \
Expand Down Expand Up @@ -106,6 +161,98 @@ impl Connector {

Ok(cache)
}

async fn fetch_all(
&self,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
) -> Result<DataCache, data_switch::Error> {
let (start_time, end_time) =
extract_time_spec(time_spec, num_leading_points, num_trailing_points)?;

let conn = self
.pool
.get()
.await
.map_err(|e| data_switch::Error::Other(Box::new(e)))?;

let data_results = conn
.query(
"
SELECT timeseries.id, data.values, timeseries.loc \
FROM ( \
SELECT timeseries, ARRAY_AGG ((value, timestamp) ORDER BY timestamp ASC) as values \
FROM data \
WHERE obstime BETWEEN $1 AND $2 \
GROUP BY timeseries \
) as data \
JOIN timeseries \
ON data.timeseries = timeseries.id \
",
&[&start_time, &end_time],
)
.await
.map_err(|e| data_switch::Error::Other(Box::new(e)))?;

let cache = {
let mut data = Vec::with_capacity(data_results.len());
let mut lats = Vec::with_capacity(data_results.len());
let mut lons = Vec::with_capacity(data_results.len());
let mut elevs = Vec::with_capacity(data_results.len());

let ts_length = {
let mut ts_length = 0;
let mut curr_time = start_time;
while curr_time <= end_time {
ts_length += 1;
curr_time = curr_time + time_spec.time_resolution;
}
ts_length
};

for row in data_results {
let ts_id: i32 = row.get(0);
let raw_values: Vec<Obs> = row.get(1);
let loc: Location = row.get(2);

// TODO: is there a better way to handle this? If we insert with default latlon we
// risk corrupting spatial checks, if not we miss QCing data we probably should be
// QCing... Perhaps we can change the definition of DataCache to accommodate this
// better?
if loc.lat.is_none() || loc.lon.is_none() || loc.hamsl.is_none() {
continue;
}

data.push(Timeseries {
tag: ts_id.to_string(),
values: regularize(
raw_values,
start_time,
end_time,
time_spec.time_resolution,
ts_length,
),
});
lats.push(loc.lat.unwrap());
lons.push(loc.lon.unwrap());
elevs.push(loc.hamsl.unwrap());
}

DataCache::new(
data,
lats,
lons,
elevs,
time_spec.timerange.start,
time_spec.time_resolution,
num_leading_points,
num_trailing_points,
)
};

Ok(cache)
}
}

#[async_trait]
Expand All @@ -130,9 +277,11 @@ impl DataConnector for Connector {
)
.await
}
// TODO: We should handle at least the All case, Polygon can be left unimplemented for
// now
_ => todo!(),
SpaceSpec::Polygon(_) => unimplemented!(),
SpaceSpec::All => {
self.fetch_all(time_spec, num_leading_points, num_trailing_points)
.await
}
}
}
}

0 comments on commit dddfc5a

Please sign in to comment.