Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ members = [
"oximeter/instruments",
"oximeter/oximeter-macro-impl",
"oximeter/oximeter",
"oximeter/oxql-types",
"oximeter/producer",
"oximeter/schema",
"oximeter/test-utils",
Expand Down Expand Up @@ -196,6 +197,7 @@ default-members = [
"oximeter/instruments",
"oximeter/oximeter-macro-impl",
"oximeter/oximeter",
"oximeter/oxql-types",
"oximeter/producer",
"oximeter/schema",
"oximeter/test-utils",
Expand Down Expand Up @@ -470,6 +472,7 @@ oximeter-schema = { path = "oximeter/schema" }
oximeter-test-utils = { path = "oximeter/test-utils" }
oximeter-timeseries-macro = { path = "oximeter/timeseries-macro" }
oximeter-types = { path = "oximeter/types" }
oxql-types = { path = "oximeter/oxql-types" }
p256 = "0.13"
parse-display = "0.10.0"
partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] }
Expand Down
1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ openssl.workspace = true
oximeter-client.workspace = true
oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] }
oxnet.workspace = true
oxql-types.workspace = true
parse-display.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
Expand Down
4 changes: 2 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use nexus_db_queries::{
};
use omicron_common::api::external::{Error, InternalContext};
use oximeter_db::{
oxql, Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams,
Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams,
};
use std::num::NonZeroU32;

Expand Down Expand Up @@ -138,7 +138,7 @@ impl super::Nexus {
&self,
opctx: &OpContext,
query: impl AsRef<str>,
) -> Result<Vec<oxql::Table>, Error> {
) -> Result<Vec<oxql_types::Table>, Error> {
// Must be a fleet user to list timeseries schema.
//
// TODO-security: We need to figure out how to implement proper security
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6386,7 +6386,7 @@ async fn timeseries_schema_list(
async fn timeseries_query(
rqctx: RequestContext<ApiContext>,
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<Vec<oximeter_db::oxql::Table>>, HttpError> {
) -> Result<HttpResponseOk<Vec<oxql_types::Table>>, HttpError> {
let apictx = rqctx.context();
let handler = async {
let nexus = &apictx.context.nexus;
Expand Down
6 changes: 3 additions & 3 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async fn test_timeseries_schema_list(
pub async fn timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
query: impl ToString,
) -> Vec<oximeter_db::oxql::Table> {
) -> Vec<oxql_types::Table> {
// first, make sure the latest timeseries have been collected.
cptestctx.oximeter.force_collect().await;

Expand Down Expand Up @@ -429,11 +429,11 @@ async fn test_instance_watcher_metrics(

#[track_caller]
fn count_state(
table: &oximeter_db::oxql::Table,
table: &oxql_types::Table,
instance_id: InstanceUuid,
state: &'static str,
) -> i64 {
use oximeter_db::oxql::point::ValueArray;
use oxql_types::point::ValueArray;
let uuid = FieldValue::Uuid(instance_id.into_untyped_uuid());
let state = FieldValue::String(state.into());
let mut timeserieses = table.timeseries().filter(|ts| {
Expand Down
14 changes: 12 additions & 2 deletions openapi/nexus.json
Original file line number Diff line number Diff line change
Expand Up @@ -20131,10 +20131,20 @@
"type": "object",
"properties": {
"metric_type": {
"$ref": "#/components/schemas/MetricType"
"description": "The type of this metric.",
"allOf": [
{
"$ref": "#/components/schemas/MetricType"
}
]
},
"values": {
"$ref": "#/components/schemas/ValueArray"
"description": "The data values.",
"allOf": [
{
"$ref": "#/components/schemas/ValueArray"
}
]
}
},
"required": [
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ num.workspace = true
omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use crate::Error;
use crate::Metric;
use crate::Target;
use crate::Timeseries;
use crate::TimeseriesKey;
use crate::TimeseriesName;
use crate::TimeseriesPageSelector;
use crate::TimeseriesScanParams;
use crate::TimeseriesSchema;
use dropshot::EmptyScanParams;
use dropshot::PaginationOrder;
use dropshot::ResultsPage;
use dropshot::WhichPage;
use oximeter::schema::TimeseriesKey;
use oximeter::types::Sample;
use oximeter::TimeseriesName;
use regex::Regex;
use regex::RegexBuilder;
use slog::debug;
Expand Down
30 changes: 15 additions & 15 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::query::field_table_name;
use crate::Error;
use crate::Metric;
use crate::Target;
use crate::TimeseriesKey;
use oximeter::schema::TimeseriesKey;
use oximeter::TimeseriesSchema;
use slog::debug;
use slog::trace;
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct OxqlResult {
pub query_summaries: Vec<QuerySummary>,

/// The list of OxQL tables returned from the query.
pub tables: Vec<oxql::Table>,
pub tables: Vec<oxql_types::Table>,
}

/// The maximum number of data values fetched from the database for an OxQL
Expand Down Expand Up @@ -479,7 +479,9 @@ impl Client {
query_id,
total_duration: query_start.elapsed(),
query_summaries,
tables: vec![oxql::Table::new(schema.timeseries_name.as_str())],
tables: vec![oxql_types::Table::new(
schema.timeseries_name.as_str(),
)],
};
return Ok(result);
}
Expand All @@ -503,7 +505,7 @@ impl Client {

// At this point, let's construct a set of tables and run the results
// through the transformation pipeline.
let mut tables = vec![oxql::Table::from_timeseries(
let mut tables = vec![oxql_types::Table::from_timeseries(
schema.timeseries_name.as_str(),
timeseries_by_key.into_values(),
)?];
Expand Down Expand Up @@ -553,7 +555,7 @@ impl Client {
limit: Option<Limit>,
total_rows_fetched: &mut u64,
) -> Result<
(Vec<QuerySummary>, BTreeMap<TimeseriesKey, oxql::Timeseries>),
(Vec<QuerySummary>, BTreeMap<TimeseriesKey, oxql_types::Timeseries>),
Error,
> {
// We'll create timeseries for each key on the fly. To enable computing
Expand Down Expand Up @@ -624,25 +626,25 @@ impl Client {
for (key, measurements) in measurements_by_key.into_iter() {
// Constuct a new timeseries, from the target/metric info.
let (target, metric) = info.get(&key).unwrap();
let mut timeseries = oxql::Timeseries::new(
let mut timeseries = oxql_types::Timeseries::new(
target
.fields
.iter()
.chain(metric.fields.iter())
.map(|field| (field.name.clone(), field.value.clone())),
oxql::point::DataType::try_from(schema.datum_type)?,
oxql_types::point::DataType::try_from(schema.datum_type)?,
if schema.datum_type.is_cumulative() {
oxql::point::MetricType::Delta
oxql_types::point::MetricType::Delta
} else {
oxql::point::MetricType::Gauge
oxql_types::point::MetricType::Gauge
},
)?;

// Covert its oximeter measurements into OxQL data types.
let points = if schema.datum_type.is_cumulative() {
oxql::point::Points::delta_from_cumulative(&measurements)?
oxql_types::point::Points::delta_from_cumulative(&measurements)?
} else {
oxql::point::Points::gauge_from_gauge(&measurements)?
oxql_types::point::Points::gauge_from_gauge(&measurements)?
};
timeseries.points = points;
debug!(
Expand Down Expand Up @@ -1108,17 +1110,15 @@ fn update_total_rows_and_check(
mod tests {
use super::ConsistentKeyGroup;
use crate::client::oxql::chunk_consistent_key_groups_impl;
use crate::{
oxql::{point::Points, Table, Timeseries},
Client, DbWrite,
};
use crate::{Client, DbWrite};
use crate::{Metric, Target};
use chrono::{DateTime, Utc};
use dropshot::test_util::LogContext;
use omicron_test_utils::dev::clickhouse::ClickHouseInstance;
use omicron_test_utils::dev::test_setup_log;
use oximeter::{types::Cumulative, FieldValue};
use oximeter::{DatumType, Sample};
use oxql_types::{point::Points, Table, Timeseries};
use std::collections::BTreeMap;
use std::time::Duration;

Expand Down
3 changes: 1 addition & 2 deletions oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dropshot::EmptyScanParams;
use dropshot::PaginationParams;
pub use oximeter::schema::FieldSchema;
pub use oximeter::schema::FieldSource;
use oximeter::schema::TimeseriesKey;
pub use oximeter::schema::TimeseriesName;
pub use oximeter::schema::TimeseriesSchema;
pub use oximeter::DatumType;
Expand Down Expand Up @@ -267,8 +268,6 @@ pub async fn make_client(
Ok(client)
}

pub(crate) type TimeseriesKey = u64;

// TODO-cleanup: Add the timeseries version in to the computation of the key.
// This will require a full drop of the database, since we're changing the
// sorting key and the timeseries key on each past sample. See
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::FieldSchema;
use crate::FieldSource;
use crate::Metric;
use crate::Target;
use crate::TimeseriesKey;
use crate::TimeseriesSchema;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use num::traits::Zero;
use oximeter::histogram::Histogram;
use oximeter::schema::TimeseriesKey;
use oximeter::traits;
use oximeter::types::Cumulative;
use oximeter::types::Datum;
Expand Down
35 changes: 17 additions & 18 deletions oximeter/db/src/oxql/ast/table_ops/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@

// Copyright 2024 Oxide Computer Company

use crate::oxql::point::DataType;
use crate::oxql::point::MetricType;
use crate::oxql::point::Points;
use crate::oxql::point::ValueArray;
use crate::oxql::point::Values;
use crate::oxql::query::Alignment;
use crate::oxql::Error;
use crate::oxql::Table;
use crate::oxql::Timeseries;
use anyhow::Context;
use anyhow::Error;
use chrono::DateTime;
use chrono::TimeDelta;
use chrono::Utc;
use oxql_types::point::DataType;
use oxql_types::point::MetricType;
use oxql_types::point::Points;
use oxql_types::point::ValueArray;
use oxql_types::point::Values;
use oxql_types::Alignment;
use oxql_types::Table;
use oxql_types::Timeseries;
use std::time::Duration;

// The maximum factor by which an alignment operation may upsample data.
Expand Down Expand Up @@ -144,7 +144,7 @@ fn align_mean_within(
"Alignment by mean requires a gauge or delta metric, not {}",
metric_type,
);
verify_max_upsampling_ratio(&points.timestamps, &period)?;
verify_max_upsampling_ratio(points.timestamps(), &period)?;

// Always convert the output to doubles, when computing the mean. The
// output is always a gauge, so we do not need the start times of the
Expand Down Expand Up @@ -179,7 +179,7 @@ fn align_mean_within(
// - Compute the mean of those.
let period_ =
TimeDelta::from_std(*period).context("time delta out of range")?;
let first_timestamp = points.timestamps[0];
let first_timestamp = points.timestamps()[0];
let mut ix: u32 = 0;
loop {
// Compute the next output timestamp, by shifting the query end time
Expand Down Expand Up @@ -220,15 +220,15 @@ fn align_mean_within(
// entries.
let output_value = if matches!(metric_type, MetricType::Gauge) {
mean_gauge_value_in_window(
&points.timestamps,
points.timestamps(),
&input_points,
window_start,
output_time,
)
} else {
mean_delta_value_in_window(
points.start_times.as_ref().unwrap(),
&points.timestamps,
points.start_times().unwrap(),
points.timestamps(),
&input_points,
window_start,
output_time,
Expand All @@ -255,10 +255,9 @@ fn align_mean_within(
ValueArray::Double(output_values.into_iter().rev().collect());
let timestamps = output_timestamps.into_iter().rev().collect();
let values = Values { values, metric_type: MetricType::Gauge };
new_timeseries.points =
Points { start_times: None, timestamps, values: vec![values] };
new_timeseries.alignment =
Some(Alignment { end_time: *query_end, period: *period });
new_timeseries.points = Points::new(None, timestamps, vec![values]);
new_timeseries
.set_alignment(Alignment { end_time: *query_end, period: *period });
output_table.insert(new_timeseries).unwrap();
}
Ok(output_table)
Expand Down
Loading