diff --git a/Cargo.lock b/Cargo.lock index 3b62f3001a9..cfa63cbb8f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6077,6 +6077,7 @@ dependencies = [ "oximeter-instruments", "oximeter-producer", "oxnet", + "oxql-types", "parse-display", "paste", "pem", @@ -6927,6 +6928,7 @@ dependencies = [ "omicron-workspace-hack", "oximeter", "oximeter-test-utils", + "oxql-types", "peg", "reedline", "regex", @@ -7103,6 +7105,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "oxql-types" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "highway", + "num", + "omicron-workspace-hack", + "oximeter-types", + "schemars", + "serde", +] + [[package]] name = "p256" version = "0.13.2" diff --git a/Cargo.toml b/Cargo.toml index ea687936e3f..cf3165bd269 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ members = [ "oximeter/instruments", "oximeter/oximeter-macro-impl", "oximeter/oximeter", + "oximeter/oxql-types", "oximeter/producer", "oximeter/schema", "oximeter/test-utils", @@ -196,6 +197,7 @@ default-members = [ "oximeter/instruments", "oximeter/oximeter-macro-impl", "oximeter/oximeter", + "oximeter/oxql-types", "oximeter/producer", "oximeter/schema", "oximeter/test-utils", @@ -470,6 +472,7 @@ oximeter-schema = { path = "oximeter/schema" } oximeter-test-utils = { path = "oximeter/test-utils" } oximeter-timeseries-macro = { path = "oximeter/timeseries-macro" } oximeter-types = { path = "oximeter/types" } +oxql-types = { path = "oximeter/oxql-types" } p256 = "0.13" parse-display = "0.10.0" partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] } diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 89775075053..5b181c7fa00 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -57,6 +57,7 @@ openssl.workspace = true oximeter-client.workspace = true oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] } oxnet.workspace = true +oxql-types.workspace = true parse-display.workspace = true paste.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. diff --git a/nexus/src/app/metrics.rs b/nexus/src/app/metrics.rs index 3728a3bdc15..3a6e7e27be8 100644 --- a/nexus/src/app/metrics.rs +++ b/nexus/src/app/metrics.rs @@ -14,7 +14,7 @@ use nexus_db_queries::{ }; use omicron_common::api::external::{Error, InternalContext}; use oximeter_db::{ - oxql, Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams, + Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams, }; use std::num::NonZeroU32; @@ -138,7 +138,7 @@ impl super::Nexus { &self, opctx: &OpContext, query: impl AsRef, - ) -> Result, Error> { + ) -> Result, Error> { // Must be a fleet user to list timeseries schema. // // TODO-security: We need to figure out how to implement proper security diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 8e8b63229bd..df522f18ab9 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -6386,7 +6386,7 @@ async fn timeseries_schema_list( async fn timeseries_query( rqctx: RequestContext, body: TypedBody, -) -> Result>, HttpError> { +) -> Result>, HttpError> { let apictx = rqctx.context(); let handler = async { let nexus = &apictx.context.nexus; diff --git a/nexus/tests/integration_tests/metrics.rs b/nexus/tests/integration_tests/metrics.rs index 9cfa0350e8c..e24de2a3ad3 100644 --- a/nexus/tests/integration_tests/metrics.rs +++ b/nexus/tests/integration_tests/metrics.rs @@ -284,7 +284,7 @@ async fn test_timeseries_schema_list( pub async fn timeseries_query( cptestctx: &ControlPlaneTestContext, query: impl ToString, -) -> Vec { +) -> Vec { // first, make sure the latest timeseries have been collected. cptestctx.oximeter.force_collect().await; @@ -429,11 +429,11 @@ async fn test_instance_watcher_metrics( #[track_caller] fn count_state( - table: &oximeter_db::oxql::Table, + table: &oxql_types::Table, instance_id: InstanceUuid, state: &'static str, ) -> i64 { - use oximeter_db::oxql::point::ValueArray; + use oxql_types::point::ValueArray; let uuid = FieldValue::Uuid(instance_id.into_untyped_uuid()); let state = FieldValue::String(state.into()); let mut timeserieses = table.timeseries().filter(|ts| { diff --git a/openapi/nexus.json b/openapi/nexus.json index 27e2870b6ee..a0cbfa2f639 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -20131,10 +20131,20 @@ "type": "object", "properties": { "metric_type": { - "$ref": "#/components/schemas/MetricType" + "description": "The type of this metric.", + "allOf": [ + { + "$ref": "#/components/schemas/MetricType" + } + ] }, "values": { - "$ref": "#/components/schemas/ValueArray" + "description": "The data values.", + "allOf": [ + { + "$ref": "#/components/schemas/ValueArray" + } + ] } }, "required": [ diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index 6a7cedbc226..2a9c615da23 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -24,6 +24,7 @@ num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true +oxql-types.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 176e1bd5f8d..c2b07ebaa6f 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -22,8 +22,6 @@ use crate::Error; use crate::Metric; use crate::Target; use crate::Timeseries; -use crate::TimeseriesKey; -use crate::TimeseriesName; use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; @@ -31,7 +29,9 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; +use oximeter::schema::TimeseriesKey; use oximeter::types::Sample; +use oximeter::TimeseriesName; use regex::Regex; use regex::RegexBuilder; use slog::debug; diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 29586b8189b..4005fa873e8 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -18,7 +18,7 @@ use crate::query::field_table_name; use crate::Error; use crate::Metric; use crate::Target; -use crate::TimeseriesKey; +use oximeter::schema::TimeseriesKey; use oximeter::TimeseriesSchema; use slog::debug; use slog::trace; @@ -68,7 +68,7 @@ pub struct OxqlResult { pub query_summaries: Vec, /// The list of OxQL tables returned from the query. - pub tables: Vec, + pub tables: Vec, } /// The maximum number of data values fetched from the database for an OxQL @@ -479,7 +479,9 @@ impl Client { query_id, total_duration: query_start.elapsed(), query_summaries, - tables: vec![oxql::Table::new(schema.timeseries_name.as_str())], + tables: vec![oxql_types::Table::new( + schema.timeseries_name.as_str(), + )], }; return Ok(result); } @@ -503,7 +505,7 @@ impl Client { // At this point, let's construct a set of tables and run the results // through the transformation pipeline. - let mut tables = vec![oxql::Table::from_timeseries( + let mut tables = vec![oxql_types::Table::from_timeseries( schema.timeseries_name.as_str(), timeseries_by_key.into_values(), )?]; @@ -553,7 +555,7 @@ impl Client { limit: Option, total_rows_fetched: &mut u64, ) -> Result< - (Vec, BTreeMap), + (Vec, BTreeMap), Error, > { // We'll create timeseries for each key on the fly. To enable computing @@ -624,25 +626,25 @@ impl Client { for (key, measurements) in measurements_by_key.into_iter() { // Constuct a new timeseries, from the target/metric info. let (target, metric) = info.get(&key).unwrap(); - let mut timeseries = oxql::Timeseries::new( + let mut timeseries = oxql_types::Timeseries::new( target .fields .iter() .chain(metric.fields.iter()) .map(|field| (field.name.clone(), field.value.clone())), - oxql::point::DataType::try_from(schema.datum_type)?, + oxql_types::point::DataType::try_from(schema.datum_type)?, if schema.datum_type.is_cumulative() { - oxql::point::MetricType::Delta + oxql_types::point::MetricType::Delta } else { - oxql::point::MetricType::Gauge + oxql_types::point::MetricType::Gauge }, )?; // Covert its oximeter measurements into OxQL data types. let points = if schema.datum_type.is_cumulative() { - oxql::point::Points::delta_from_cumulative(&measurements)? + oxql_types::point::Points::delta_from_cumulative(&measurements)? } else { - oxql::point::Points::gauge_from_gauge(&measurements)? + oxql_types::point::Points::gauge_from_gauge(&measurements)? }; timeseries.points = points; debug!( @@ -1108,10 +1110,7 @@ fn update_total_rows_and_check( mod tests { use super::ConsistentKeyGroup; use crate::client::oxql::chunk_consistent_key_groups_impl; - use crate::{ - oxql::{point::Points, Table, Timeseries}, - Client, DbWrite, - }; + use crate::{Client, DbWrite}; use crate::{Metric, Target}; use chrono::{DateTime, Utc}; use dropshot::test_util::LogContext; @@ -1119,6 +1118,7 @@ mod tests { use omicron_test_utils::dev::test_setup_log; use oximeter::{types::Cumulative, FieldValue}; use oximeter::{DatumType, Sample}; + use oxql_types::{point::Points, Table, Timeseries}; use std::collections::BTreeMap; use std::time::Duration; diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 9ad382c97da..5d56d802c9b 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -14,6 +14,7 @@ use dropshot::EmptyScanParams; use dropshot::PaginationParams; pub use oximeter::schema::FieldSchema; pub use oximeter::schema::FieldSource; +use oximeter::schema::TimeseriesKey; pub use oximeter::schema::TimeseriesName; pub use oximeter::schema::TimeseriesSchema; pub use oximeter::DatumType; @@ -267,8 +268,6 @@ pub async fn make_client( Ok(client) } -pub(crate) type TimeseriesKey = u64; - // TODO-cleanup: Add the timeseries version in to the computation of the key. // This will require a full drop of the database, since we're changing the // sorting key and the timeseries key on each past sample. See diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 3e34ad10e3b..986bf00225e 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -11,13 +11,13 @@ use crate::FieldSchema; use crate::FieldSource; use crate::Metric; use crate::Target; -use crate::TimeseriesKey; use crate::TimeseriesSchema; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use num::traits::Zero; use oximeter::histogram::Histogram; +use oximeter::schema::TimeseriesKey; use oximeter::traits; use oximeter::types::Cumulative; use oximeter::types::Datum; diff --git a/oximeter/db/src/oxql/ast/table_ops/align.rs b/oximeter/db/src/oxql/ast/table_ops/align.rs index cf54ebc3125..b0cd7d80f12 100644 --- a/oximeter/db/src/oxql/ast/table_ops/align.rs +++ b/oximeter/db/src/oxql/ast/table_ops/align.rs @@ -6,19 +6,19 @@ // Copyright 2024 Oxide Computer Company -use crate::oxql::point::DataType; -use crate::oxql::point::MetricType; -use crate::oxql::point::Points; -use crate::oxql::point::ValueArray; -use crate::oxql::point::Values; -use crate::oxql::query::Alignment; -use crate::oxql::Error; -use crate::oxql::Table; -use crate::oxql::Timeseries; use anyhow::Context; +use anyhow::Error; use chrono::DateTime; use chrono::TimeDelta; use chrono::Utc; +use oxql_types::point::DataType; +use oxql_types::point::MetricType; +use oxql_types::point::Points; +use oxql_types::point::ValueArray; +use oxql_types::point::Values; +use oxql_types::Alignment; +use oxql_types::Table; +use oxql_types::Timeseries; use std::time::Duration; // The maximum factor by which an alignment operation may upsample data. @@ -144,7 +144,7 @@ fn align_mean_within( "Alignment by mean requires a gauge or delta metric, not {}", metric_type, ); - verify_max_upsampling_ratio(&points.timestamps, &period)?; + verify_max_upsampling_ratio(points.timestamps(), &period)?; // Always convert the output to doubles, when computing the mean. The // output is always a gauge, so we do not need the start times of the @@ -179,7 +179,7 @@ fn align_mean_within( // - Compute the mean of those. let period_ = TimeDelta::from_std(*period).context("time delta out of range")?; - let first_timestamp = points.timestamps[0]; + let first_timestamp = points.timestamps()[0]; let mut ix: u32 = 0; loop { // Compute the next output timestamp, by shifting the query end time @@ -220,15 +220,15 @@ fn align_mean_within( // entries. let output_value = if matches!(metric_type, MetricType::Gauge) { mean_gauge_value_in_window( - &points.timestamps, + points.timestamps(), &input_points, window_start, output_time, ) } else { mean_delta_value_in_window( - points.start_times.as_ref().unwrap(), - &points.timestamps, + points.start_times().unwrap(), + points.timestamps(), &input_points, window_start, output_time, @@ -255,10 +255,9 @@ fn align_mean_within( ValueArray::Double(output_values.into_iter().rev().collect()); let timestamps = output_timestamps.into_iter().rev().collect(); let values = Values { values, metric_type: MetricType::Gauge }; - new_timeseries.points = - Points { start_times: None, timestamps, values: vec![values] }; - new_timeseries.alignment = - Some(Alignment { end_time: *query_end, period: *period }); + new_timeseries.points = Points::new(None, timestamps, vec![values]); + new_timeseries + .set_alignment(Alignment { end_time: *query_end, period: *period }); output_table.insert(new_timeseries).unwrap(); } Ok(output_table) diff --git a/oximeter/db/src/oxql/ast/table_ops/filter.rs b/oximeter/db/src/oxql/ast/table_ops/filter.rs index b6fc533e4d8..ad398da9831 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter.rs @@ -12,18 +12,18 @@ use crate::oxql::ast::literal::Literal; use crate::oxql::ast::logical_op::LogicalOp; use crate::oxql::ast::table_ops::limit::Limit; use crate::oxql::ast::table_ops::limit::LimitKind; -use crate::oxql::point::DataType; -use crate::oxql::point::MetricType; -use crate::oxql::point::Points; -use crate::oxql::point::ValueArray; use crate::oxql::Error; -use crate::oxql::Table; -use crate::oxql::Timeseries; use crate::shells::special_idents; use chrono::DateTime; use chrono::Utc; use oximeter::FieldType; use oximeter::FieldValue; +use oxql_types::point::DataType; +use oxql_types::point::MetricType; +use oxql_types::point::Points; +use oxql_types::point::ValueArray; +use oxql_types::Table; +use oxql_types::Timeseries; use regex::Regex; use std::collections::BTreeSet; use std::fmt; @@ -340,16 +340,13 @@ impl Filter { // Apply the filter to the data points as well. let points = self.filter_points(&input.points)?; - // Similar to above, if the filter removes all data points in - // the timeseries, let's remove the timeseries altogether. - if points.is_empty() { - continue; + if let Some(new_timeseries) = input.copy_with_points(points) { + timeseries.push(new_timeseries); + } else { + // None means that the filter removed all data points in + // the timeseries. In that case, we remove the timeseries + // altogether. } - timeseries.push(Timeseries { - fields: input.fields.clone(), - points, - alignment: input.alignment, - }) } output_tables.push(Table::from_timeseries( table.name(), @@ -823,7 +820,7 @@ impl SimpleFilter { ) -> Result, Error> { let ident = self.ident.as_str(); if ident == "timestamp" { - self.filter_points_by_timestamp(negated, &points.timestamps) + self.filter_points_by_timestamp(negated, points.timestamps()) } else if ident == "datum" { anyhow::ensure!( points.dimensionality() == 1, @@ -1151,15 +1148,15 @@ impl SimpleFilter { mod tests { use crate::oxql::ast::grammar::query_parser; use crate::oxql::ast::logical_op::LogicalOp; - use crate::oxql::point::DataType; - use crate::oxql::point::MetricType; - use crate::oxql::point::Points; - use crate::oxql::point::ValueArray; - use crate::oxql::point::Values; - use crate::oxql::Table; - use crate::oxql::Timeseries; use chrono::Utc; use oximeter::FieldValue; + use oxql_types::point::DataType; + use oxql_types::point::MetricType; + use oxql_types::point::Points; + use oxql_types::point::ValueArray; + use oxql_types::point::Values; + use oxql_types::Table; + use oxql_types::Timeseries; use std::time::Duration; use uuid::Uuid; @@ -1172,7 +1169,7 @@ mod tests { values: ValueArray::Double(vec![Some(0.0), Some(2.0)]), metric_type: MetricType::Gauge, }]; - let points = Points { start_times, timestamps, values }; + let points = Points::new(start_times, timestamps, values); // This filter should remove the first point based on its timestamp. let t = Utc::now() + Duration::from_secs(10); @@ -1205,7 +1202,7 @@ mod tests { values: ValueArray::Double(vec![Some(0.0), Some(2.0)]), metric_type: MetricType::Gauge, }]; - let points = Points { start_times, timestamps, values }; + let points = Points::new(start_times, timestamps, values); let filter = query_parser::filter("filter datum < \"something\"").unwrap(); diff --git a/oximeter/db/src/oxql/ast/table_ops/group_by.rs b/oximeter/db/src/oxql/ast/table_ops/group_by.rs index f40572d762c..c48804a788e 100644 --- a/oximeter/db/src/oxql/ast/table_ops/group_by.rs +++ b/oximeter/db/src/oxql/ast/table_ops/group_by.rs @@ -10,13 +10,13 @@ use chrono::DateTime; use chrono::Utc; use crate::oxql::ast::ident::Ident; -use crate::oxql::point::DataType; -use crate::oxql::point::MetricType; -use crate::oxql::point::ValueArray; -use crate::oxql::Error; -use crate::oxql::Table; -use crate::oxql::Timeseries; -use crate::TimeseriesKey; +use anyhow::Error; +use oximeter::schema::TimeseriesKey; +use oxql_types::point::DataType; +use oxql_types::point::MetricType; +use oxql_types::point::ValueArray; +use oxql_types::Table; +use oxql_types::Timeseries; use std::collections::btree_map::Entry; use std::collections::BTreeMap; @@ -98,7 +98,7 @@ impl GroupBy { ValueArray::Double(new_values), ValueArray::Double(existing_values), ) => { - let new_timestamps = &dropped.points.timestamps; + let new_timestamps = dropped.points.timestamps(); // We will be merging the new data with the // existing, but borrow-checking limits the degree @@ -106,7 +106,7 @@ impl GroupBy { // entry in the output table. Instead, aggregate // everything into a copy of the expected data. let mut timestamps = - existing.points.timestamps.clone(); + existing.points.timestamps().to_owned(); let mut values = existing_values.clone(); // Merge in the new values, so long as they actually @@ -152,10 +152,7 @@ impl GroupBy { // Replace the existing output timeseries's // timestamps and data arrays. - std::mem::swap( - &mut existing.points.timestamps, - &mut timestamps, - ); + existing.points.set_timestamps(timestamps); existing .points .values_mut(0) @@ -166,7 +163,7 @@ impl GroupBy { ValueArray::Integer(new_values), ValueArray::Integer(existing_values), ) => { - let new_timestamps = &dropped.points.timestamps; + let new_timestamps = dropped.points.timestamps(); // We will be merging the new data with the // existing, but borrow-checking limits the degree @@ -174,7 +171,7 @@ impl GroupBy { // entry in the output table. Instead, aggregate // everything into a copy of the expected data. let mut timestamps = - existing.points.timestamps.clone(); + existing.points.timestamps().to_owned(); let mut values = existing_values.clone(); // Merge in the new values, so long as they actually @@ -220,10 +217,7 @@ impl GroupBy { // Replace the existing output timeseries's // timestamps and data arrays. - std::mem::swap( - &mut existing.points.timestamps, - &mut timestamps, - ); + existing.points.set_timestamps(timestamps); existing .points .values_mut(0) @@ -286,14 +280,15 @@ impl GroupBy { else { unreachable!(); }; - let new_timestamps = &new_points.timestamps; + let new_timestamps = new_points.timestamps(); // We will be merging the new data with the // existing, but borrow-checking limits the degree // to which we can easily do this on the `existing` // entry in the output table. Instead, aggregate // everything into a copy of the expected data. - let mut timestamps = existing.points.timestamps.clone(); + let mut timestamps = + existing.points.timestamps().to_owned(); let mut values = existing .points .values(0) @@ -360,10 +355,7 @@ impl GroupBy { // Replace the existing output timeseries's // timestamps and data arrays. - std::mem::swap( - &mut existing.points.timestamps, - &mut timestamps, - ); + existing.points.set_timestamps(timestamps); existing .points .values_mut(0) @@ -388,7 +380,7 @@ impl GroupBy { // _zero_ for any where the values are none. let counts = new_timeseries .points - .timestamps + .timestamps() .iter() .zip(values) .map(|(timestamp, maybe_value)| { @@ -434,16 +426,16 @@ pub enum Reducer { #[cfg(test)] mod tests { use super::{GroupBy, Reducer}; - use crate::oxql::{ - ast::{ - ident::Ident, - table_ops::align::{Align, AlignmentMethod}, - }, - point::{DataType, MetricType, ValueArray}, - Table, Timeseries, + use crate::oxql::ast::{ + ident::Ident, + table_ops::align::{Align, AlignmentMethod}, }; use chrono::{DateTime, Utc}; use oximeter::FieldValue; + use oxql_types::{ + point::{DataType, MetricType, ValueArray}, + Table, Timeseries, + }; use std::{collections::BTreeMap, time::Duration}; // Which timeseries the second data point is missing from. @@ -495,8 +487,8 @@ mod tests { MetricType::Gauge, ) .unwrap(); - ts0.points.start_times = None; - ts0.points.timestamps.clone_from(×tamps); + ts0.points.clear_start_times(); + ts0.points.set_timestamps(timestamps.clone()); *ts0.points.values_mut(0).unwrap() = ValueArray::Double(vec![ Some(1.0), if matches!( @@ -527,7 +519,7 @@ mod tests { MetricType::Gauge, ) .unwrap(); - ts1.points.start_times = None; + ts1.points.clear_start_times(); // Non-overlapping in this test setup means that we just shift one // value from this array backward in time by one additional second. @@ -538,7 +530,7 @@ mod tests { // // When reducing, t0 is never changed, and t1-t2 are always reduced // together, if the values are present. - ts1.points.timestamps = if cfg.overlapping_times { + let new_timestamps = if cfg.overlapping_times { timestamps.clone() } else { let mut new_timestamps = timestamps.clone(); @@ -546,6 +538,7 @@ mod tests { timestamps.insert(0, new_timestamps[0]); new_timestamps }; + ts1.points.set_timestamps(new_timestamps); *ts1.points.values_mut(0).unwrap() = ValueArray::Double(vec![ Some(2.0), if matches!(cfg.missing_value, MissingValue::Both) { @@ -604,11 +597,13 @@ mod tests { let points = &grouped_timeseries.points; assert_eq!(points.dimensionality(), 1, "Points should still be 1D"); assert_eq!( - points.start_times, None, + points.start_times(), + None, "Points should not have start times" ); assert_eq!( - points.timestamps, test.timestamps, + points.timestamps(), + test.timestamps, "Points do not have correct timestamps" ); diff --git a/oximeter/db/src/oxql/ast/table_ops/join.rs b/oximeter/db/src/oxql/ast/table_ops/join.rs index 3c150a4acf9..2893f6cf3e0 100644 --- a/oximeter/db/src/oxql/ast/table_ops/join.rs +++ b/oximeter/db/src/oxql/ast/table_ops/join.rs @@ -6,12 +6,10 @@ // Copyright 2024 Oxide Computer Company -use crate::oxql::point::MetricType; -use crate::oxql::point::Points; -use crate::oxql::point::Values; -use crate::oxql::Error; -use crate::oxql::Table; use anyhow::Context; +use anyhow::Error; +use oxql_types::point::MetricType; +use oxql_types::Table; /// An AST node for a natural inner join. #[derive(Clone, Copy, Debug, PartialEq)] @@ -80,10 +78,8 @@ impl Join { // 1. They have the same alignment, and // 2. We merge the timepoints rather than simply creating a // ragged array of points. - timeseries.points = inner_join_point_arrays( - ×eries.points, - &next_timeseries.points, - )?; + timeseries.points = + timeseries.points.inner_join(&next_timeseries.points)?; } // We'll also update the name, to indicate the joined data. out.name.push(','); @@ -93,101 +89,6 @@ impl Join { } } -// Given two arrays of points, stack them together at matching timepoints. -// -// For time points in either which do not have a corresponding point in the -// other, the entire time point is elided. -fn inner_join_point_arrays( - left: &Points, - right: &Points, -) -> Result { - // Create an output array with roughly the right capacity, and double the - // number of dimensions. We're trying to stack output value arrays together - // along the dimension axis. - let data_types = - left.data_types().chain(right.data_types()).collect::>(); - let metric_types = - left.metric_types().chain(right.metric_types()).collect::>(); - let mut out = Points::with_capacity( - left.len().max(right.len()), - data_types.iter().copied(), - metric_types.iter().copied(), - )?; - - // Iterate through each array until one is exhausted. We're only inserting - // values from both arrays where the timestamps actually match, since this - // is an inner join. We may want to insert missing values where timestamps - // do not match on either side, when we support an outer join of some kind. - let n_left_dim = left.values.len(); - let mut left_ix = 0; - let mut right_ix = 0; - while left_ix < left.len() && right_ix < right.len() { - let left_timestamp = left.timestamps[left_ix]; - let right_timestamp = right.timestamps[right_ix]; - if left_timestamp == right_timestamp { - out.timestamps.push(left_timestamp); - push_concrete_values( - &mut out.values[..n_left_dim], - &left.values, - left_ix, - ); - push_concrete_values( - &mut out.values[n_left_dim..], - &right.values, - right_ix, - ); - left_ix += 1; - right_ix += 1; - } else if left_timestamp < right_timestamp { - left_ix += 1; - } else { - right_ix += 1; - } - } - Ok(out) -} - -// Push the `i`th value from each dimension of `from` onto `to`. -fn push_concrete_values(to: &mut [Values], from: &[Values], i: usize) { - assert_eq!(to.len(), from.len()); - for (output, input) in to.iter_mut().zip(from.iter()) { - let input_array = &input.values; - let output_array = &mut output.values; - assert_eq!(input_array.data_type(), output_array.data_type()); - if let Ok(ints) = input_array.as_integer() { - output_array.as_integer_mut().unwrap().push(ints[i]); - continue; - } - if let Ok(doubles) = input_array.as_double() { - output_array.as_double_mut().unwrap().push(doubles[i]); - continue; - } - if let Ok(bools) = input_array.as_boolean() { - output_array.as_boolean_mut().unwrap().push(bools[i]); - continue; - } - if let Ok(strings) = input_array.as_string() { - output_array.as_string_mut().unwrap().push(strings[i].clone()); - continue; - } - if let Ok(dists) = input_array.as_integer_distribution() { - output_array - .as_integer_distribution_mut() - .unwrap() - .push(dists[i].clone()); - continue; - } - if let Ok(dists) = input_array.as_double_distribution() { - output_array - .as_double_distribution_mut() - .unwrap() - .push(dists[i].clone()); - continue; - } - unreachable!(); - } -} - // Return an error if any metric types are not suitable for joining. fn ensure_all_metric_types( mut metric_types: impl ExactSizeIterator, @@ -200,186 +101,3 @@ fn ensure_all_metric_types( ); Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - use crate::oxql::point::DataType; - use crate::oxql::point::Datum; - use crate::oxql::point::ValueArray; - use chrono::Utc; - use std::time::Duration; - - #[test] - fn test_push_concrete_values() { - let mut points = Points::with_capacity( - 2, - [DataType::Integer, DataType::Double].into_iter(), - [MetricType::Gauge, MetricType::Gauge].into_iter(), - ) - .unwrap(); - - // Push a concrete value for the integer dimension - let from_ints = vec![Values { - values: ValueArray::Integer(vec![Some(1)]), - metric_type: MetricType::Gauge, - }]; - push_concrete_values(&mut points.values[..1], &from_ints, 0); - - // And another for the double dimension. - let from_doubles = vec![Values { - values: ValueArray::Double(vec![Some(2.0)]), - metric_type: MetricType::Gauge, - }]; - push_concrete_values(&mut points.values[1..], &from_doubles, 0); - - assert_eq!( - points.dimensionality(), - 2, - "Points should have 2 dimensions", - ); - let ints = points.values[0].values.as_integer().unwrap(); - assert_eq!( - ints.len(), - 1, - "Should have pushed one point in the first dimension" - ); - assert_eq!( - ints[0], - Some(1), - "Should have pushed 1 onto the first dimension" - ); - let doubles = points.values[1].values.as_double().unwrap(); - assert_eq!( - doubles.len(), - 1, - "Should have pushed one point in the second dimension" - ); - assert_eq!( - doubles[0], - Some(2.0), - "Should have pushed 2.0 onto the second dimension" - ); - } - - #[test] - fn test_join_point_arrays() { - let now = Utc::now(); - - // Create a set of integer points to join with. - // - // This will have two timestamps, one of which will match the points - // below that are merged in. - let int_points = Points { - start_times: None, - timestamps: vec![ - now - Duration::from_secs(3), - now - Duration::from_secs(2), - now, - ], - values: vec![Values { - values: ValueArray::Integer(vec![Some(1), Some(2), Some(3)]), - metric_type: MetricType::Gauge, - }], - }; - - // Create an additional set of double points. - // - // This also has two timepoints, one of which matches with the above, - // and one of which does not. - let double_points = Points { - start_times: None, - timestamps: vec![ - now - Duration::from_secs(3), - now - Duration::from_secs(1), - now, - ], - values: vec![Values { - values: ValueArray::Double(vec![ - Some(4.0), - Some(5.0), - Some(6.0), - ]), - metric_type: MetricType::Gauge, - }], - }; - - // Merge the arrays. - let merged = - inner_join_point_arrays(&int_points, &double_points).unwrap(); - - // Basic checks that we merged in the right values and have the right - // types and dimensions. - assert_eq!( - merged.dimensionality(), - 2, - "Should have appended the dimensions from each input array" - ); - assert_eq!(merged.len(), 2, "Should have merged two common points",); - assert_eq!( - merged.data_types().collect::>(), - &[DataType::Integer, DataType::Double], - "Should have combined the data types of the input arrays" - ); - assert_eq!( - merged.metric_types().collect::>(), - &[MetricType::Gauge, MetricType::Gauge], - "Should have combined the metric types of the input arrays" - ); - - // Check the actual values of the array. - let mut points = merged.iter_points(); - - // The first and last timepoint overlapped between the two arrays, so we - // should have both of them as concrete samples. - let pt = points.next().unwrap(); - assert_eq!(pt.start_time, None, "Gauges don't have a start time"); - assert_eq!( - *pt.timestamp, int_points.timestamps[0], - "Should have taken the first input timestamp from both arrays", - ); - assert_eq!( - *pt.timestamp, double_points.timestamps[0], - "Should have taken the first input timestamp from both arrays", - ); - let values = pt.values; - assert_eq!(values.len(), 2, "Should have 2 dimensions"); - assert_eq!( - &values[0], - &(Datum::Integer(Some(&1)), MetricType::Gauge), - "Should have pulled value from first integer array." - ); - assert_eq!( - &values[1], - &(Datum::Double(Some(&4.0)), MetricType::Gauge), - "Should have pulled value from second double array." - ); - - // And the next point - let pt = points.next().unwrap(); - assert_eq!(pt.start_time, None, "Gauges don't have a start time"); - assert_eq!( - *pt.timestamp, int_points.timestamps[2], - "Should have taken the input timestamp from both arrays", - ); - assert_eq!( - *pt.timestamp, double_points.timestamps[2], - "Should have taken the input timestamp from both arrays", - ); - let values = pt.values; - assert_eq!(values.len(), 2, "Should have 2 dimensions"); - assert_eq!( - &values[0], - &(Datum::Integer(Some(&3)), MetricType::Gauge), - "Should have pulled value from first integer array." - ); - assert_eq!( - &values[1], - &(Datum::Double(Some(&6.0)), MetricType::Gauge), - "Should have pulled value from second double array." - ); - - // And there should be no other values. - assert!(points.next().is_none(), "There should be no more points"); - } -} diff --git a/oximeter/db/src/oxql/ast/table_ops/limit.rs b/oximeter/db/src/oxql/ast/table_ops/limit.rs index 0205868f5c9..89afb31a7ce 100644 --- a/oximeter/db/src/oxql/ast/table_ops/limit.rs +++ b/oximeter/db/src/oxql/ast/table_ops/limit.rs @@ -6,12 +6,8 @@ // Copyright 2024 Oxide Computer Company -use crate::oxql::point::Points; -use crate::oxql::point::ValueArray; -use crate::oxql::point::Values; -use crate::oxql::Error; -use crate::oxql::Table; -use crate::oxql::Timeseries; +use anyhow::Error; +use oxql_types::Table; use std::num::NonZeroUsize; /// The kind of limiting operation @@ -65,58 +61,7 @@ impl Limit { } }; - // Slice the various data arrays. - let start_times = input_points - .start_times - .as_ref() - .map(|s| s[start..end].to_vec()); - let timestamps = - input_points.timestamps[start..end].to_vec(); - let values = input_points - .values - .iter() - .map(|vals| { - let values = match &vals.values { - ValueArray::Integer(inner) => { - ValueArray::Integer( - inner[start..end].to_vec(), - ) - } - ValueArray::Double(inner) => { - ValueArray::Double( - inner[start..end].to_vec(), - ) - } - ValueArray::Boolean(inner) => { - ValueArray::Boolean( - inner[start..end].to_vec(), - ) - } - ValueArray::String(inner) => { - ValueArray::String( - inner[start..end].to_vec(), - ) - } - ValueArray::IntegerDistribution(inner) => { - ValueArray::IntegerDistribution( - inner[start..end].to_vec(), - ) - } - ValueArray::DoubleDistribution(inner) => { - ValueArray::DoubleDistribution( - inner[start..end].to_vec(), - ) - } - }; - Values { values, metric_type: vals.metric_type } - }) - .collect(); - let points = Points { start_times, timestamps, values }; - Timeseries { - fields: timeseries.fields.clone(), - points, - alignment: timeseries.alignment, - } + timeseries.limit(start, end) }); Table::from_timeseries(table.name(), timeseries) }) @@ -127,9 +72,12 @@ impl Limit { #[cfg(test)] mod tests { use super::*; - use crate::oxql::point::{DataType, MetricType}; use chrono::Utc; use oximeter::FieldValue; + use oxql_types::{ + point::{DataType, MetricType}, + Timeseries, + }; use std::{collections::BTreeMap, time::Duration}; fn test_tables() -> Vec { @@ -150,12 +98,14 @@ mod tests { MetricType::Gauge, ) .unwrap(); - timeseries.points.timestamps.clone_from(×tamps); - timeseries.points.values[0].values.as_integer_mut().unwrap().extend([ - Some(1), - Some(2), - Some(3), - ]); + timeseries.points.set_timestamps(timestamps.clone()); + timeseries + .points + .values_mut(0) + .unwrap() + .as_integer_mut() + .unwrap() + .extend([Some(1), Some(2), Some(3)]); let table1 = Table::from_timeseries("first", std::iter::once(timeseries)) .unwrap(); @@ -166,12 +116,14 @@ mod tests { MetricType::Gauge, ) .unwrap(); - timeseries.points.timestamps.clone_from(×tamps); - timeseries.points.values[0].values.as_integer_mut().unwrap().extend([ - Some(4), - Some(5), - Some(6), - ]); + timeseries.points.set_timestamps(timestamps.clone()); + timeseries + .points + .values_mut(0) + .unwrap() + .as_integer_mut() + .unwrap() + .extend([Some(4), Some(5), Some(6)]); let table2 = Table::from_timeseries("second", std::iter::once(timeseries)) .unwrap(); @@ -223,7 +175,8 @@ mod tests { "Limited table should have the same fields" ); assert_eq!( - timeseries.alignment, limited_timeseries.alignment, + timeseries.alignment(), + limited_timeseries.alignment(), "Limited timeseries should have the same alignment" ); assert_eq!( @@ -237,14 +190,15 @@ mod tests { // These depend on the limit operation. let points = ×eries.points; let limited_points = &limited_timeseries.points; - assert_eq!(points.start_times, limited_points.start_times); + assert_eq!(points.start_times(), limited_points.start_times()); assert_eq!( - points.timestamps[start..end], - limited_points.timestamps + &points.timestamps()[start..end], + limited_points.timestamps() ); assert_eq!( - limited_points.values[0].values.as_integer().unwrap(), - &points.values[0].values.as_integer().unwrap()[start..end], + limited_points.values(0).unwrap().as_integer().unwrap(), + &points.values(0).unwrap().as_integer().unwrap() + [start..end], "Points should be limited to [{start}..{end}]", ); } diff --git a/oximeter/db/src/oxql/ast/table_ops/mod.rs b/oximeter/db/src/oxql/ast/table_ops/mod.rs index 46f5106a089..8b8d4cbe1b5 100644 --- a/oximeter/db/src/oxql/ast/table_ops/mod.rs +++ b/oximeter/db/src/oxql/ast/table_ops/mod.rs @@ -20,10 +20,10 @@ use self::join::Join; use self::limit::Limit; use crate::oxql::ast::Query; use crate::oxql::Error; -use crate::oxql::Table; use chrono::DateTime; use chrono::Utc; use oximeter::TimeseriesName; +use oxql_types::Table; /// A basic table operation, the atoms of an OxQL query. #[derive(Clone, Debug, PartialEq)] diff --git a/oximeter/db/src/oxql/mod.rs b/oximeter/db/src/oxql/mod.rs index 3961fae1cce..fcdfb783c5b 100644 --- a/oximeter/db/src/oxql/mod.rs +++ b/oximeter/db/src/oxql/mod.rs @@ -10,13 +10,9 @@ use peg::error::ParseError as PegError; use peg::str::LineCol; pub mod ast; -pub mod point; pub mod query; -pub mod table; pub use self::query::Query; -pub use self::table::Table; -pub use self::table::Timeseries; pub use anyhow::Error; /// Format a PEG parsing error into a nice anyhow error. diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index e1fada9f2aa..46c9bbc92c8 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -23,7 +23,6 @@ use crate::oxql::Error; use crate::TimeseriesName; use chrono::DateTime; use chrono::Utc; -use std::time::Duration; /// A parsed OxQL query. #[derive(Clone, Debug, PartialEq)] @@ -391,15 +390,6 @@ fn restrict_filter_idents( } } -/// Describes the time alignment for an OxQL query. -#[derive(Clone, Copy, Debug, PartialEq)] -pub struct Alignment { - /// The end time of the query, which the temporal reference point. - pub end_time: DateTime, - /// The alignment period, the interval on which values are produced. - pub period: Duration, -} - #[cfg(test)] mod tests { use super::Filter; diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index ceabf008883..556ced04375 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -6,11 +6,12 @@ // Copyright 2021 Oxide Computer Company use crate::{ - Error, FieldSchema, FieldSource, TimeseriesKey, TimeseriesSchema, - DATABASE_NAME, DATABASE_SELECT_FORMAT, + Error, FieldSchema, FieldSource, TimeseriesSchema, DATABASE_NAME, + DATABASE_SELECT_FORMAT, }; use chrono::{DateTime, Utc}; use dropshot::PaginationOrder; +use oximeter::schema::TimeseriesKey; use oximeter::types::{DatumType, FieldType, FieldValue}; use oximeter::{Metric, Target}; use regex::Regex; diff --git a/oximeter/db/src/shells/oxql.rs b/oximeter/db/src/shells/oxql.rs index 0f23ea7d64a..f46d08c0cfc 100644 --- a/oximeter/db/src/shells/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -7,9 +7,10 @@ // Copyright 2024 Oxide Computer use super::{list_timeseries, prepare_columns}; -use crate::{make_client, oxql::Table, Client, OxqlResult}; +use crate::{make_client, Client, OxqlResult}; use clap::Args; use crossterm::style::Stylize; +use oxql_types::Table; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; diff --git a/oximeter/oxql-types/Cargo.toml b/oximeter/oxql-types/Cargo.toml new file mode 100644 index 00000000000..da7c7bcd1c6 --- /dev/null +++ b/oximeter/oxql-types/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "oxql-types" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +anyhow.workspace = true +chrono.workspace = true +highway.workspace = true +num.workspace = true +omicron-workspace-hack.workspace = true +oximeter-types.workspace = true +schemars.workspace = true +serde.workspace = true diff --git a/oximeter/oxql-types/src/lib.rs b/oximeter/oxql-types/src/lib.rs new file mode 100644 index 00000000000..00468705a93 --- /dev/null +++ b/oximeter/oxql-types/src/lib.rs @@ -0,0 +1,23 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Core types for OxQL. + +use chrono::{DateTime, Utc}; +use std::time::Duration; + +pub mod point; +pub mod table; + +pub use self::table::Table; +pub use self::table::Timeseries; + +/// Describes the time alignment for an OxQL query. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct Alignment { + /// The end time of the query, which the temporal reference point. + pub end_time: DateTime, + /// The alignment period, the interval on which values are produced. + pub period: Duration, +} diff --git a/oximeter/db/src/oxql/point.rs b/oximeter/oxql-types/src/point.rs similarity index 82% rename from oximeter/db/src/oxql/point.rs rename to oximeter/oxql-types/src/point.rs index e04193e8b83..6e3c7143dc1 100644 --- a/oximeter/db/src/oxql/point.rs +++ b/oximeter/oxql-types/src/point.rs @@ -6,15 +6,15 @@ // Copyright 2024 Oxide Computer Company -use super::Error; use anyhow::Context; +use anyhow::Error; use chrono::DateTime; use chrono::Utc; use num::ToPrimitive; -use oximeter::traits::HistogramSupport; -use oximeter::DatumType; -use oximeter::Measurement; -use oximeter::Quantile; +use oximeter_types::traits::HistogramSupport; +use oximeter_types::DatumType; +use oximeter_types::Measurement; +use oximeter_types::Quantile; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -131,32 +131,32 @@ impl CumulativeDatum { // not cumulative. fn from_cumulative(meas: &Measurement) -> Result { let datum = match meas.datum() { - oximeter::Datum::CumulativeI64(val) => { + oximeter_types::Datum::CumulativeI64(val) => { CumulativeDatum::Integer(val.value()) } - oximeter::Datum::CumulativeU64(val) => { + oximeter_types::Datum::CumulativeU64(val) => { let int = val .value() .try_into() .context("Overflow converting u64 to i64")?; CumulativeDatum::Integer(int) } - oximeter::Datum::CumulativeF32(val) => { + oximeter_types::Datum::CumulativeF32(val) => { CumulativeDatum::Double(val.value().into()) } - oximeter::Datum::CumulativeF64(val) => { + oximeter_types::Datum::CumulativeF64(val) => { CumulativeDatum::Double(val.value()) } - oximeter::Datum::HistogramI8(hist) => hist.into(), - oximeter::Datum::HistogramU8(hist) => hist.into(), - oximeter::Datum::HistogramI16(hist) => hist.into(), - oximeter::Datum::HistogramU16(hist) => hist.into(), - oximeter::Datum::HistogramI32(hist) => hist.into(), - oximeter::Datum::HistogramU32(hist) => hist.into(), - oximeter::Datum::HistogramI64(hist) => hist.into(), - oximeter::Datum::HistogramU64(hist) => hist.try_into()?, - oximeter::Datum::HistogramF32(hist) => hist.into(), - oximeter::Datum::HistogramF64(hist) => hist.into(), + oximeter_types::Datum::HistogramI8(hist) => hist.into(), + oximeter_types::Datum::HistogramU8(hist) => hist.into(), + oximeter_types::Datum::HistogramI16(hist) => hist.into(), + oximeter_types::Datum::HistogramU16(hist) => hist.into(), + oximeter_types::Datum::HistogramI32(hist) => hist.into(), + oximeter_types::Datum::HistogramU32(hist) => hist.into(), + oximeter_types::Datum::HistogramI64(hist) => hist.into(), + oximeter_types::Datum::HistogramU64(hist) => hist.try_into()?, + oximeter_types::Datum::HistogramF32(hist) => hist.into(), + oximeter_types::Datum::HistogramF64(hist) => hist.into(), other => anyhow::bail!( "Input datum of type {} is not cumulative", other.datum_type(), @@ -169,10 +169,10 @@ impl CumulativeDatum { /// A single list of values, for one dimension of a timeseries. #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] pub struct Values { - // The data values. - pub(super) values: ValueArray, - // The type of this metric. - pub(super) metric_type: MetricType, + /// The data values. + pub values: ValueArray, + /// The type of this metric. + pub metric_type: MetricType, } impl Values { @@ -285,14 +285,23 @@ impl<'a> fmt::Display for Datum<'a> { #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] pub struct Points { // The start time points for cumulative or delta metrics. - pub(super) start_times: Option>>, + pub(crate) start_times: Option>>, // The timestamp of each value. - pub(super) timestamps: Vec>, + pub(crate) timestamps: Vec>, // The array of data values, one for each dimension. - pub(super) values: Vec, + pub(crate) values: Vec, } impl Points { + /// Construct a new `Points` with the provided data. + pub fn new( + start_times: Option>>, + timestamps: Vec>, + values: Vec, + ) -> Self { + Self { start_times, timestamps, values } + } + /// Construct an empty array of points to hold data of the provided type. pub fn empty(data_type: DataType, metric_type: MetricType) -> Self { Self::with_capacity( @@ -303,8 +312,28 @@ impl Points { .unwrap() } - // Return a mutable reference to the value array of the specified dimension, if any. - pub(super) fn values_mut(&mut self, dim: usize) -> Option<&mut ValueArray> { + /// Return the start times of the points, if any. + pub fn start_times(&self) -> Option<&[DateTime]> { + self.start_times.as_deref() + } + + /// Clear the start times of the points. + pub fn clear_start_times(&mut self) { + self.start_times = None; + } + + /// Return the timestamps of the points. + pub fn timestamps(&self) -> &[DateTime] { + &self.timestamps + } + + pub fn set_timestamps(&mut self, timestamps: Vec>) { + self.timestamps = timestamps; + } + + /// Return a mutable reference to the value array of the specified + /// dimension, if any. + pub fn values_mut(&mut self, dim: usize) -> Option<&mut ValueArray> { self.values.get_mut(dim).map(|val| &mut val.values) } @@ -563,8 +592,8 @@ impl Points { }) } - // Filter points in self to those where `to_keep` is true. - pub(crate) fn filter(&self, to_keep: Vec) -> Result { + /// Filter points in self to those where `to_keep` is true. + pub fn filter(&self, to_keep: Vec) -> Result { anyhow::ensure!( to_keep.len() == self.len(), "Filter array must be the same length as self", @@ -646,8 +675,8 @@ impl Points { Ok(out) } - // Return a new set of points, with the values casted to the provided types. - pub(crate) fn cast(&self, types: &[DataType]) -> Result { + /// Return a new set of points, with the values casted to the provided types. + pub fn cast(&self, types: &[DataType]) -> Result { anyhow::ensure!( types.len() == self.dimensionality(), "Cannot cast to {} types, the data has dimensionality {}", @@ -863,12 +892,104 @@ impl Points { Ok(Self { start_times, timestamps, values: new_values }) } + /// Given two arrays of points, stack them together at matching timepoints. + /// + /// For time points in either which do not have a corresponding point in + /// the other, the entire time point is elided. + pub fn inner_join(&self, right: &Points) -> Result { + // Create an output array with roughly the right capacity, and double the + // number of dimensions. We're trying to stack output value arrays together + // along the dimension axis. + let data_types = + self.data_types().chain(right.data_types()).collect::>(); + let metric_types = + self.metric_types().chain(right.metric_types()).collect::>(); + let mut out = Points::with_capacity( + self.len().max(right.len()), + data_types.iter().copied(), + metric_types.iter().copied(), + )?; + + // Iterate through each array until one is exhausted. We're only inserting + // values from both arrays where the timestamps actually match, since this + // is an inner join. We may want to insert missing values where timestamps + // do not match on either side, when we support an outer join of some kind. + let n_left_dim = self.dimensionality(); + let mut left_ix = 0; + let mut right_ix = 0; + while left_ix < self.len() && right_ix < right.len() { + let left_timestamp = self.timestamps()[left_ix]; + let right_timestamp = right.timestamps()[right_ix]; + if left_timestamp == right_timestamp { + out.timestamps.push(left_timestamp); + push_concrete_values( + &mut out.values[..n_left_dim], + &self.values, + left_ix, + ); + push_concrete_values( + &mut out.values[n_left_dim..], + &right.values, + right_ix, + ); + left_ix += 1; + right_ix += 1; + } else if left_timestamp < right_timestamp { + left_ix += 1; + } else { + right_ix += 1; + } + } + Ok(out) + } + /// Return true if self contains no data points. pub fn is_empty(&self) -> bool { self.len() == 0 } } +// Push the `i`th value from each dimension of `from` onto `to`. +fn push_concrete_values(to: &mut [Values], from: &[Values], i: usize) { + assert_eq!(to.len(), from.len()); + for (output, input) in to.iter_mut().zip(from.iter()) { + let input_array = &input.values; + let output_array = &mut output.values; + assert_eq!(input_array.data_type(), output_array.data_type()); + if let Ok(ints) = input_array.as_integer() { + output_array.as_integer_mut().unwrap().push(ints[i]); + continue; + } + if let Ok(doubles) = input_array.as_double() { + output_array.as_double_mut().unwrap().push(doubles[i]); + continue; + } + if let Ok(bools) = input_array.as_boolean() { + output_array.as_boolean_mut().unwrap().push(bools[i]); + continue; + } + if let Ok(strings) = input_array.as_string() { + output_array.as_string_mut().unwrap().push(strings[i].clone()); + continue; + } + if let Ok(dists) = input_array.as_integer_distribution() { + output_array + .as_integer_distribution_mut() + .unwrap() + .push(dists[i].clone()); + continue; + } + if let Ok(dists) = input_array.as_double_distribution() { + output_array + .as_double_distribution_mut() + .unwrap() + .push(dists[i].clone()); + continue; + } + unreachable!(); + } +} + /// List of data values for one timeseries. /// /// Each element is an option, where `None` represents a missing sample. @@ -900,8 +1021,8 @@ impl ValueArray { } } - // Return the data type in self. - pub(super) fn data_type(&self) -> DataType { + /// Return the data type in self. + pub fn data_type(&self) -> DataType { match self { ValueArray::Integer(_) => DataType::Integer, ValueArray::Double(_) => DataType::Double, @@ -947,10 +1068,8 @@ impl ValueArray { Ok(inner) } - // Access the inner array of integers, if possible. - pub(super) fn as_integer_mut( - &mut self, - ) -> Result<&mut Vec>, Error> { + /// Access the inner array of integers, if possible. + pub fn as_integer_mut(&mut self) -> Result<&mut Vec>, Error> { let ValueArray::Integer(inner) = self else { anyhow::bail!( "Cannot access value array as integer type, it has type {}", @@ -1107,91 +1226,97 @@ impl ValueArray { // Push a value directly from a datum, without modification. fn push_value_from_datum( &mut self, - datum: &oximeter::Datum, + datum: &oximeter_types::Datum, ) -> Result<(), Error> { match datum { - oximeter::Datum::Bool(b) => self.as_boolean_mut()?.push(Some(*b)), - oximeter::Datum::I8(i) => { + oximeter_types::Datum::Bool(b) => { + self.as_boolean_mut()?.push(Some(*b)) + } + oximeter_types::Datum::I8(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::U8(i) => { + oximeter_types::Datum::U8(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::I16(i) => { + oximeter_types::Datum::I16(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::U16(i) => { + oximeter_types::Datum::U16(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::I32(i) => { + oximeter_types::Datum::I32(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::U32(i) => { + oximeter_types::Datum::U32(i) => { self.as_integer_mut()?.push(Some(i64::from(*i))) } - oximeter::Datum::I64(i) => self.as_integer_mut()?.push(Some(*i)), - oximeter::Datum::U64(i) => { + oximeter_types::Datum::I64(i) => { + self.as_integer_mut()?.push(Some(*i)) + } + oximeter_types::Datum::U64(i) => { let i = i.to_i64().context("Failed to convert u64 datum to i64")?; self.as_integer_mut()?.push(Some(i)); } - oximeter::Datum::F32(f) => { + oximeter_types::Datum::F32(f) => { self.as_double_mut()?.push(Some(f64::from(*f))) } - oximeter::Datum::F64(f) => self.as_double_mut()?.push(Some(*f)), - oximeter::Datum::String(s) => { + oximeter_types::Datum::F64(f) => { + self.as_double_mut()?.push(Some(*f)) + } + oximeter_types::Datum::String(s) => { self.as_string_mut()?.push(Some(s.clone())) } - oximeter::Datum::Bytes(_) => { + oximeter_types::Datum::Bytes(_) => { anyhow::bail!("Bytes data types are not yet supported") } - oximeter::Datum::CumulativeI64(c) => { + oximeter_types::Datum::CumulativeI64(c) => { self.as_integer_mut()?.push(Some(c.value())) } - oximeter::Datum::CumulativeU64(c) => { + oximeter_types::Datum::CumulativeU64(c) => { let c = c .value() .to_i64() .context("Failed to convert u64 datum to i64")?; self.as_integer_mut()?.push(Some(c)); } - oximeter::Datum::CumulativeF32(c) => { + oximeter_types::Datum::CumulativeF32(c) => { self.as_double_mut()?.push(Some(f64::from(c.value()))) } - oximeter::Datum::CumulativeF64(c) => { + oximeter_types::Datum::CumulativeF64(c) => { self.as_double_mut()?.push(Some(c.value())) } - oximeter::Datum::HistogramI8(h) => self + oximeter_types::Datum::HistogramI8(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramU8(h) => self + oximeter_types::Datum::HistogramU8(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramI16(h) => self + oximeter_types::Datum::HistogramI16(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramU16(h) => self + oximeter_types::Datum::HistogramU16(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramI32(h) => self + oximeter_types::Datum::HistogramI32(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramU32(h) => self + oximeter_types::Datum::HistogramU32(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramI64(h) => self + oximeter_types::Datum::HistogramI64(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramU64(h) => self + oximeter_types::Datum::HistogramU64(h) => self .as_integer_distribution_mut()? .push(Some(Distribution::try_from(h)?)), - oximeter::Datum::HistogramF32(h) => self + oximeter_types::Datum::HistogramF32(h) => self .as_double_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::HistogramF64(h) => self + oximeter_types::Datum::HistogramF64(h) => self .as_double_distribution_mut()? .push(Some(Distribution::from(h))), - oximeter::Datum::Missing(missing) => { + oximeter_types::Datum::Missing(missing) => { self.push_missing(missing.datum_type())? } } @@ -1216,7 +1341,7 @@ impl ValueArray { fn push_diff_from_last_to_datum( &mut self, last_datum: &Option, - new_datum: &oximeter::Datum, + new_datum: &oximeter_types::Datum, data_type: DataType, ) -> Result<(), Error> { match (last_datum.as_ref(), new_datum.is_missing()) { @@ -1253,49 +1378,49 @@ impl ValueArray { match (last_datum, new_datum) { ( CumulativeDatum::Integer(last), - oximeter::Datum::I8(new), + oximeter_types::Datum::I8(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::U8(new), + oximeter_types::Datum::U8(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::I16(new), + oximeter_types::Datum::I16(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::U16(new), + oximeter_types::Datum::U16(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::I32(new), + oximeter_types::Datum::I32(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::U32(new), + oximeter_types::Datum::U32(new), ) => { let new = i64::from(*new); self.as_integer_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::I64(new), + oximeter_types::Datum::I64(new), ) => { let diff = new .checked_sub(*last) @@ -1304,7 +1429,7 @@ impl ValueArray { } ( CumulativeDatum::Integer(last), - oximeter::Datum::U64(new), + oximeter_types::Datum::U64(new), ) => { let new = new .to_i64() @@ -1316,20 +1441,20 @@ impl ValueArray { } ( CumulativeDatum::Double(last), - oximeter::Datum::F32(new), + oximeter_types::Datum::F32(new), ) => { self.as_double_mut()? .push(Some(f64::from(*new) - last)); } ( CumulativeDatum::Double(last), - oximeter::Datum::F64(new), + oximeter_types::Datum::F64(new), ) => { self.as_double_mut()?.push(Some(new - last)); } ( CumulativeDatum::Integer(last), - oximeter::Datum::CumulativeI64(new), + oximeter_types::Datum::CumulativeI64(new), ) => { let new = new.value(); let diff = new @@ -1339,7 +1464,7 @@ impl ValueArray { } ( CumulativeDatum::Integer(last), - oximeter::Datum::CumulativeU64(new), + oximeter_types::Datum::CumulativeU64(new), ) => { let new = new .value() @@ -1352,20 +1477,20 @@ impl ValueArray { } ( CumulativeDatum::Double(last), - oximeter::Datum::CumulativeF32(new), + oximeter_types::Datum::CumulativeF32(new), ) => { self.as_double_mut()? .push(Some(f64::from(new.value()) - last)); } ( CumulativeDatum::Double(last), - oximeter::Datum::CumulativeF64(new), + oximeter_types::Datum::CumulativeF64(new), ) => { self.as_double_mut()?.push(Some(new.value() - last)); } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramI8(new), + oximeter_types::Datum::HistogramI8(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1373,7 +1498,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramU8(new), + oximeter_types::Datum::HistogramU8(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1381,7 +1506,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramI16(new), + oximeter_types::Datum::HistogramI16(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1389,7 +1514,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramU16(new), + oximeter_types::Datum::HistogramU16(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1397,7 +1522,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramI32(new), + oximeter_types::Datum::HistogramI32(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1405,7 +1530,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramU32(new), + oximeter_types::Datum::HistogramU32(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1413,7 +1538,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramI64(new), + oximeter_types::Datum::HistogramI64(new), ) => { let new = Distribution::from(new); self.as_integer_distribution_mut()? @@ -1421,7 +1546,7 @@ impl ValueArray { } ( CumulativeDatum::IntegerDistribution(last), - oximeter::Datum::HistogramU64(new), + oximeter_types::Datum::HistogramU64(new), ) => { let new = Distribution::try_from(new)?; self.as_integer_distribution_mut()? @@ -1429,7 +1554,7 @@ impl ValueArray { } ( CumulativeDatum::DoubleDistribution(last), - oximeter::Datum::HistogramF32(new), + oximeter_types::Datum::HistogramF32(new), ) => { let new = Distribution::::from(new); self.as_double_distribution_mut()? @@ -1437,7 +1562,7 @@ impl ValueArray { } ( CumulativeDatum::DoubleDistribution(last), - oximeter::Datum::HistogramF64(new), + oximeter_types::Datum::HistogramF64(new), ) => { let new = Distribution::::from(new); self.as_double_distribution_mut()? @@ -1486,8 +1611,8 @@ impl ValueArray { } } - // Swap the value in self with other, asserting they're the same type. - pub(crate) fn swap(&mut self, mut values: ValueArray) { + /// Swap the value in self with other, asserting they're the same type. + pub fn swap(&mut self, mut values: ValueArray) { use std::mem::swap; match (self, &mut values) { (ValueArray::Integer(x), ValueArray::Integer(y)) => swap(x, y), @@ -1733,8 +1858,10 @@ where macro_rules! i64_dist_from { ($t:ty) => { - impl From<&oximeter::histogram::Histogram<$t>> for Distribution { - fn from(hist: &oximeter::histogram::Histogram<$t>) -> Self { + impl From<&oximeter_types::histogram::Histogram<$t>> + for Distribution + { + fn from(hist: &oximeter_types::histogram::Histogram<$t>) -> Self { let (bins, counts) = hist.bins_and_counts(); Self { bins: bins.into_iter().map(i64::from).collect(), @@ -1750,8 +1877,10 @@ macro_rules! i64_dist_from { } } - impl From<&oximeter::histogram::Histogram<$t>> for CumulativeDatum { - fn from(hist: &oximeter::histogram::Histogram<$t>) -> Self { + impl From<&oximeter_types::histogram::Histogram<$t>> + for CumulativeDatum + { + fn from(hist: &oximeter_types::histogram::Histogram<$t>) -> Self { CumulativeDatum::IntegerDistribution(hist.into()) } } @@ -1766,10 +1895,10 @@ i64_dist_from!(i32); i64_dist_from!(u32); i64_dist_from!(i64); -impl TryFrom<&oximeter::histogram::Histogram> for Distribution { +impl TryFrom<&oximeter_types::histogram::Histogram> for Distribution { type Error = Error; fn try_from( - hist: &oximeter::histogram::Histogram, + hist: &oximeter_types::histogram::Histogram, ) -> Result { let (bins, counts) = hist.bins_and_counts(); let bins = bins @@ -1791,10 +1920,10 @@ impl TryFrom<&oximeter::histogram::Histogram> for Distribution { } } -impl TryFrom<&oximeter::histogram::Histogram> for CumulativeDatum { +impl TryFrom<&oximeter_types::histogram::Histogram> for CumulativeDatum { type Error = Error; fn try_from( - hist: &oximeter::histogram::Histogram, + hist: &oximeter_types::histogram::Histogram, ) -> Result { hist.try_into().map(CumulativeDatum::IntegerDistribution) } @@ -1802,8 +1931,10 @@ impl TryFrom<&oximeter::histogram::Histogram> for CumulativeDatum { macro_rules! f64_dist_from { ($t:ty) => { - impl From<&oximeter::histogram::Histogram<$t>> for Distribution { - fn from(hist: &oximeter::histogram::Histogram<$t>) -> Self { + impl From<&oximeter_types::histogram::Histogram<$t>> + for Distribution + { + fn from(hist: &oximeter_types::histogram::Histogram<$t>) -> Self { let (bins, counts) = hist.bins_and_counts(); Self { bins: bins.into_iter().map(f64::from).collect(), @@ -1819,8 +1950,10 @@ macro_rules! f64_dist_from { } } - impl From<&oximeter::histogram::Histogram<$t>> for CumulativeDatum { - fn from(hist: &oximeter::histogram::Histogram<$t>) -> Self { + impl From<&oximeter_types::histogram::Histogram<$t>> + for CumulativeDatum + { + fn from(hist: &oximeter_types::histogram::Histogram<$t>) -> Self { CumulativeDatum::DoubleDistribution(hist.into()) } } @@ -1833,9 +1966,9 @@ f64_dist_from!(f64); #[cfg(test)] mod tests { use super::{Distribution, MetricType, Points, Values}; - use crate::oxql::point::{DataType, ValueArray}; + use crate::point::{push_concrete_values, DataType, Datum, ValueArray}; use chrono::{DateTime, Utc}; - use oximeter::{ + use oximeter_types::{ histogram::Record, types::Cumulative, Measurement, Quantile, }; use std::time::Duration; @@ -1939,12 +2072,12 @@ mod tests { let now = Utc::now(); let current1 = now + Duration::from_secs(1); let mut hist1 = - oximeter::histogram::Histogram::new(&[0i64, 10, 20]).unwrap(); + oximeter_types::histogram::Histogram::new(&[0i64, 10, 20]).unwrap(); hist1.sample(1).unwrap(); hist1.set_start_time(current1); let current2 = now + Duration::from_secs(2); let mut hist2 = - oximeter::histogram::Histogram::new(&[0i64, 10, 20]).unwrap(); + oximeter_types::histogram::Histogram::new(&[0i64, 10, 20]).unwrap(); hist2.sample(5).unwrap(); hist2.sample(10).unwrap(); hist2.sample(15).unwrap(); @@ -2273,4 +2406,176 @@ mod tests { .cast(&[DataType::DoubleDistribution, DataType::DoubleDistribution]) .is_err()); } + + #[test] + fn test_push_concrete_values() { + let mut points = Points::with_capacity( + 2, + [DataType::Integer, DataType::Double].into_iter(), + [MetricType::Gauge, MetricType::Gauge].into_iter(), + ) + .unwrap(); + + // Push a concrete value for the integer dimension + let from_ints = vec![Values { + values: ValueArray::Integer(vec![Some(1)]), + metric_type: MetricType::Gauge, + }]; + push_concrete_values(&mut points.values[..1], &from_ints, 0); + + // And another for the double dimension. + let from_doubles = vec![Values { + values: ValueArray::Double(vec![Some(2.0)]), + metric_type: MetricType::Gauge, + }]; + push_concrete_values(&mut points.values[1..], &from_doubles, 0); + + assert_eq!( + points.dimensionality(), + 2, + "Points should have 2 dimensions", + ); + let ints = points.values[0].values.as_integer().unwrap(); + assert_eq!( + ints.len(), + 1, + "Should have pushed one point in the first dimension" + ); + assert_eq!( + ints[0], + Some(1), + "Should have pushed 1 onto the first dimension" + ); + let doubles = points.values[1].values.as_double().unwrap(); + assert_eq!( + doubles.len(), + 1, + "Should have pushed one point in the second dimension" + ); + assert_eq!( + doubles[0], + Some(2.0), + "Should have pushed 2.0 onto the second dimension" + ); + } + + #[test] + fn test_join_point_arrays() { + let now = Utc::now(); + + // Create a set of integer points to join with. + // + // This will have two timestamps, one of which will match the points + // below that are merged in. + let int_points = Points { + start_times: None, + timestamps: vec![ + now - Duration::from_secs(3), + now - Duration::from_secs(2), + now, + ], + values: vec![Values { + values: ValueArray::Integer(vec![Some(1), Some(2), Some(3)]), + metric_type: MetricType::Gauge, + }], + }; + + // Create an additional set of double points. + // + // This also has two timepoints, one of which matches with the above, + // and one of which does not. + let double_points = Points { + start_times: None, + timestamps: vec![ + now - Duration::from_secs(3), + now - Duration::from_secs(1), + now, + ], + values: vec![Values { + values: ValueArray::Double(vec![ + Some(4.0), + Some(5.0), + Some(6.0), + ]), + metric_type: MetricType::Gauge, + }], + }; + + // Merge the arrays. + let merged = int_points.inner_join(&double_points).unwrap(); + + // Basic checks that we merged in the right values and have the right + // types and dimensions. + assert_eq!( + merged.dimensionality(), + 2, + "Should have appended the dimensions from each input array" + ); + assert_eq!(merged.len(), 2, "Should have merged two common points",); + assert_eq!( + merged.data_types().collect::>(), + &[DataType::Integer, DataType::Double], + "Should have combined the data types of the input arrays" + ); + assert_eq!( + merged.metric_types().collect::>(), + &[MetricType::Gauge, MetricType::Gauge], + "Should have combined the metric types of the input arrays" + ); + + // Check the actual values of the array. + let mut points = merged.iter_points(); + + // The first and last timepoint overlapped between the two arrays, so we + // should have both of them as concrete samples. + let pt = points.next().unwrap(); + assert_eq!(pt.start_time, None, "Gauges don't have a start time"); + assert_eq!( + *pt.timestamp, int_points.timestamps[0], + "Should have taken the first input timestamp from both arrays", + ); + assert_eq!( + *pt.timestamp, double_points.timestamps[0], + "Should have taken the first input timestamp from both arrays", + ); + let values = pt.values; + assert_eq!(values.len(), 2, "Should have 2 dimensions"); + assert_eq!( + &values[0], + &(Datum::Integer(Some(&1)), MetricType::Gauge), + "Should have pulled value from first integer array." + ); + assert_eq!( + &values[1], + &(Datum::Double(Some(&4.0)), MetricType::Gauge), + "Should have pulled value from second double array." + ); + + // And the next point + let pt = points.next().unwrap(); + assert_eq!(pt.start_time, None, "Gauges don't have a start time"); + assert_eq!( + *pt.timestamp, int_points.timestamps[2], + "Should have taken the input timestamp from both arrays", + ); + assert_eq!( + *pt.timestamp, double_points.timestamps[2], + "Should have taken the input timestamp from both arrays", + ); + let values = pt.values; + assert_eq!(values.len(), 2, "Should have 2 dimensions"); + assert_eq!( + &values[0], + &(Datum::Integer(Some(&3)), MetricType::Gauge), + "Should have pulled value from first integer array." + ); + assert_eq!( + &values[1], + &(Datum::Double(Some(&6.0)), MetricType::Gauge), + "Should have pulled value from second double array." + ); + + // And there should be no other values. + assert!(points.next().is_none(), "There should be no more points"); + } } diff --git a/oximeter/db/src/oxql/table.rs b/oximeter/oxql-types/src/table.rs similarity index 75% rename from oximeter/db/src/oxql/table.rs rename to oximeter/oxql-types/src/table.rs index 2cd141d2faa..f37992942fd 100644 --- a/oximeter/db/src/oxql/table.rs +++ b/oximeter/oxql-types/src/table.rs @@ -6,14 +6,16 @@ // Copyright 2024 Oxide Computer Company -use super::point::DataType; -use super::point::MetricType; -use super::point::Points; -use super::query::Alignment; -use super::Error; -use crate::TimeseriesKey; +use crate::point::DataType; +use crate::point::MetricType; +use crate::point::Points; +use crate::point::ValueArray; +use crate::point::Values; +use crate::Alignment; +use anyhow::Error; use highway::HighwayHasher; -use oximeter::FieldValue; +use oximeter_types::schema::TimeseriesKey; +use oximeter_types::FieldValue; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -67,10 +69,20 @@ impl Timeseries { hasher.finish() } + /// Return the alignment of this timeseries, if any. + pub fn alignment(&self) -> Option { + self.alignment + } + + /// Set the alignment of this timeseries. + pub fn set_alignment(&mut self, alignment: Alignment) { + self.alignment = Some(alignment); + } + /// Return a copy of the timeseries, keeping only the provided fields. /// /// An error is returned if the timeseries does not contain those fields. - pub(crate) fn copy_with_fields( + pub fn copy_with_fields( &self, kept_fields: &[&str], ) -> Result { @@ -88,6 +100,20 @@ impl Timeseries { }) } + /// Return a copy of the timeseries, keeping only the provided points. + /// + /// Returns `None` if `kept_points` is empty. + pub fn copy_with_points(&self, kept_points: Points) -> Option { + if kept_points.is_empty() { + return None; + } + Some(Self { + fields: self.fields.clone(), + points: kept_points, + alignment: self.alignment, + }) + } + // Return `true` if the schema in `other` matches that of `self`. fn matches_schema(&self, other: &Timeseries) -> bool { if self.fields.len() != other.fields.len() { @@ -125,7 +151,7 @@ impl Timeseries { /// This returns an error if the points cannot be so cast, or the /// dimensionality of the types requested differs from the dimensionality of /// the points themselves. - pub(crate) fn cast(&self, types: &[DataType]) -> Result { + pub fn cast(&self, types: &[DataType]) -> Result { let fields = self.fields.clone(); Ok(Self { fields, @@ -133,6 +159,49 @@ impl Timeseries { alignment: self.alignment, }) } + + /// Return a new timeseries, with the points limited to the provided range. + pub fn limit(&self, start: usize, end: usize) -> Self { + let input_points = &self.points; + + // Slice the various data arrays. + let start_times = + input_points.start_times().map(|s| s[start..end].to_vec()); + let timestamps = input_points.timestamps()[start..end].to_vec(); + let values = input_points + .values + .iter() + .map(|vals| { + let values = match &vals.values { + ValueArray::Integer(inner) => { + ValueArray::Integer(inner[start..end].to_vec()) + } + ValueArray::Double(inner) => { + ValueArray::Double(inner[start..end].to_vec()) + } + ValueArray::Boolean(inner) => { + ValueArray::Boolean(inner[start..end].to_vec()) + } + ValueArray::String(inner) => { + ValueArray::String(inner[start..end].to_vec()) + } + ValueArray::IntegerDistribution(inner) => { + ValueArray::IntegerDistribution( + inner[start..end].to_vec(), + ) + } + ValueArray::DoubleDistribution(inner) => { + ValueArray::DoubleDistribution( + inner[start..end].to_vec(), + ) + } + }; + Values { values, metric_type: vals.metric_type } + }) + .collect(); + let points = Points::new(start_times, timestamps, values); + Self { fields: self.fields.clone(), points, alignment: self.alignment } + } } /// A table represents one or more timeseries with the same schema. @@ -146,7 +215,7 @@ pub struct Table { // // This starts as the name of the timeseries schema the data is derived // from, but can be modified as operations are done. - pub(super) name: String, + pub name: String, // The set of timeseries in the table, ordered by key. timeseries: BTreeMap, } diff --git a/oximeter/types/src/schema.rs b/oximeter/types/src/schema.rs index 2efd5265ffc..80aaa6f1014 100644 --- a/oximeter/types/src/schema.rs +++ b/oximeter/types/src/schema.rs @@ -28,6 +28,8 @@ use std::num::NonZeroU8; pub const SCHEMA_DIRECTORY: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../oximeter/schema"); +pub type TimeseriesKey = u64; + /// The name and type information for a field of a timeseries schema. #[derive( Clone,