Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add disk metrics endpoint #1348

Merged
merged 21 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 117 additions & 6 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::authz;
use crate::context::OpContext;
use crate::db;
use crate::db::identity::Asset;
use crate::external_api::params::ResourceMetrics;
use crate::internal_api::params::OximeterInfo;
use dropshot::PaginationParams;
use internal_dns_client::{
multiclient::{ResolveError, Resolver},
names::{ServiceName, SRV},
Expand All @@ -21,6 +23,8 @@ use omicron_common::api::external::PaginationOrder;
use omicron_common::api::internal::nexus;
use omicron_common::backoff;
use oximeter_client::Client as OximeterClient;
use oximeter_db::query::Timestamp;
use oximeter_db::Measurement;
use oximeter_db::TimeseriesSchema;
use oximeter_db::TimeseriesSchemaPaginationParams;
use oximeter_producer::register;
Expand Down Expand Up @@ -212,14 +216,112 @@ impl super::Nexus {
.map_err(|e| Error::internal_error(&e.to_string()))?
.timeseries_schema_list(&pag_params.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
.map_err(map_oximeter_err)
}

/// Returns a results from the timeseries DB based on the provided query
/// parameters.
///
/// * `timeseries_name`: The "target:metric" name identifying the metric to
/// be queried.
/// * `criteria`: Any additional parameters to help narrow down the query
smklein marked this conversation as resolved.
Show resolved Hide resolved
/// selection further. These parameters are passed directly to
/// [oximeter::db::Client::select_timeseries_with].
/// * `query_params`: Pagination parameter, identifying which page of
/// results to return.
/// * `limit`: The maximum number of results to return in a paginated
/// request.
pub async fn select_timeseries(
&self,
timeseries_name: &str,
criteria: &[&str],
query_params: PaginationParams<ResourceMetrics, ResourceMetrics>,
limit: NonZeroU32,
) -> Result<dropshot::ResultsPage<Measurement>, Error> {
#[inline]
fn no_results() -> dropshot::ResultsPage<Measurement> {
dropshot::ResultsPage { next_page: None, items: Vec::new() }
}

let (start_time, end_time, query) = match query_params.page {
// Generally, we want the time bounds to be inclusive for the
// start time, and exclusive for the end time...
dropshot::WhichPage::First(query) => (
Timestamp::Inclusive(query.start_time),
Timestamp::Exclusive(query.end_time),
query,
),
// ... but for subsequent pages, we use the "last observed"
// timestamp as the start time. If we used an inclusive bound,
// we'd duplicate the returned measurement. To return each
// measurement exactly once, we make the start time "exclusive"
// on all "next" pages.
dropshot::WhichPage::Next(query) => (
Timestamp::Exclusive(query.start_time),
smklein marked this conversation as resolved.
Show resolved Hide resolved
Timestamp::Exclusive(query.end_time),
query,
),
};
if query.start_time >= query.end_time {
return Ok(no_results());
}

let timeseries_list = self
.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.select_timeseries_with(
timeseries_name,
criteria,
Some(start_time),
Some(end_time),
Some(limit),
)
.await
.or_else(|err| {
// If the timeseries name exists in the API, but not in Clickhouse,
// it might just not have been populated yet.
match err {
oximeter_db::Error::TimeseriesNotFound(_) => Ok(vec![]),
_ => Err(err),
}
_ => Error::InternalError { internal_message: e.to_string() },
})
.map_err(map_oximeter_err)?;

if timeseries_list.len() > 1 {
return Err(Error::internal_error(&format!(
"expected 1 timeseries but got {} ({:?} {:?})",
timeseries_list.len(),
timeseries_name,
criteria
)));
}

// If we received no data, exit early.
let timeseries =
if let Some(timeseries) = timeseries_list.into_iter().next() {
timeseries
} else {
return Ok(no_results());
};

Ok(dropshot::ResultsPage::new(
timeseries.measurements,
&query,
|last_measurement: &Measurement, query: &ResourceMetrics| {
ResourceMetrics {
start_time: last_measurement.timestamp(),
end_time: query.end_time,
}
},
)
.unwrap())
}

// Internal helper to build an Oximeter client from its ID and address (common data between
Expand Down Expand Up @@ -259,3 +361,12 @@ impl super::Nexus {
Ok((self.build_oximeter_client(&id, address), id))
}
}

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
}
}
70 changes: 70 additions & 0 deletions nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use omicron_common::api::external::Saga;
use omicron_common::api::external::VpcFirewallRuleUpdateParams;
use omicron_common::api::external::VpcFirewallRules;
use omicron_common::bail_unless;
use parse_display::Display;
use ref_cast::RefCast;
use schemars::JsonSchema;
use serde::Deserialize;
Expand Down Expand Up @@ -115,6 +116,7 @@ pub fn external_api() -> NexusApiDescription {
api.register(disk_view)?;
api.register(disk_view_by_id)?;
api.register(disk_delete)?;
api.register(disk_metrics_list)?;

api.register(instance_list)?;
api.register(instance_create)?;
Expand Down Expand Up @@ -1515,6 +1517,65 @@ async fn disk_delete(
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

#[derive(Display, Deserialize, JsonSchema)]
#[display(style = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum DiskMetricName {
smklein marked this conversation as resolved.
Show resolved Hide resolved
Activated,
Flush,
Read,
ReadBytes,
Write,
WriteBytes,
}

/// Fetch metrics for a disk.
#[endpoint {
method = GET,
path = "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}",
tags = ["disks"],
}]
async fn disk_metrics_list(
rqctx: Arc<RequestContext<Arc<ServerContext>>>,
path_params: Path<MetricsPathParam<DiskPathParam, DiskMetricName>>,
query_params: Query<
PaginationParams<params::ResourceMetrics, params::ResourceMetrics>,
>,
) -> Result<HttpResponseOk<ResultsPage<oximeter_db::Measurement>>, HttpError> {
let apictx = rqctx.context();
let nexus = &apictx.nexus;

let path = path_params.into_inner();
let organization_name = &path.inner.organization_name;
let project_name = &path.inner.project_name;
let disk_name = &path.inner.disk_name;
let metric_name = path.metric_name;

let query = query_params.into_inner();
let limit = rqctx.page_limit(&query)?;

let handler = async {
let opctx = OpContext::for_external_api(&rqctx).await?;

// This ensures the user is authorized on Action::Read for this disk
let disk = nexus
.disk_fetch(&opctx, organization_name, project_name, disk_name)
.await?;
let upstairs_uuid = disk.id();
let result = nexus
.select_timeseries(
&format!("crucible_upstairs:{}", metric_name),
&[&format!("upstairs_uuid=={}", upstairs_uuid)],
query,
limit,
)
.await?;

Ok(HttpResponseOk(result))
};
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

// Instances

/// List instances in a project.
Expand Down Expand Up @@ -4093,6 +4154,15 @@ async fn session_sshkey_delete(
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

/// Path parameters for metrics requests where `/metrics/{metric_name}` is
/// appended to an existing path parameter type
#[derive(Deserialize, JsonSchema)]
struct MetricsPathParam<T, M> {
#[serde(flatten)]
inner: T,
metric_name: M,
}

#[cfg(test)]
mod test {
use super::external_api;
Expand Down
23 changes: 20 additions & 3 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub async fn test_setup_with_config(
)
.await
.unwrap();
register_test_producer(&producer).unwrap();

ControlPlaneTestContext {
server,
Expand Down Expand Up @@ -253,6 +254,10 @@ impl oximeter::Producer for IntegrationProducer {
}
}

/// Creates and starts a producer server.
///
/// Actual producers can be registered with the [`register_producer`]
/// helper function.
pub async fn start_producer_server(
nexus_address: SocketAddr,
id: Uuid,
Expand Down Expand Up @@ -281,9 +286,22 @@ pub async fn start_producer_server(
};
let server =
ProducerServer::start(&config).await.map_err(|e| e.to_string())?;
Ok(server)
}

/// Registers an arbitrary producer with the test server.
pub fn register_producer(
server: &ProducerServer,
producer: impl oximeter::Producer,
) -> Result<(), String> {
server.registry().register_producer(producer).map_err(|e| e.to_string())?;
Ok(())
}

/// Registers a sample-generating test-specific producer.
pub fn register_test_producer(server: &ProducerServer) -> Result<(), String> {
// Create and register an actual metric producer.
let producer = IntegrationProducer {
let test_producer = IntegrationProducer {
target: IntegrationTarget {
name: "integration-test-target".to_string(),
},
Expand All @@ -292,8 +310,7 @@ pub async fn start_producer_server(
datum: 0,
},
};
server.registry().register_producer(producer).map_err(|e| e.to_string())?;
Ok(server)
register_producer(server, test_producer)
}

/// Returns whether the two identity metadata objects are identical.
Expand Down
13 changes: 10 additions & 3 deletions nexus/test-utils/src/resource_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,35 @@ pub async fn create_disk(
.await
}

/// Creates an instance with a default NIC and no disks.
///
/// Wrapper around [`create_instance_with`].
pub async fn create_instance(
client: &ClientTestContext,
organization_name: &str,
project_name: &str,
instance_name: &str,
) -> Instance {
create_instance_with_nics(
create_instance_with(
client,
organization_name,
project_name,
instance_name,
&params::InstanceNetworkInterfaceAttachment::Default,
// Disks=
vec![],
)
.await
}

pub async fn create_instance_with_nics(
/// Creates an instance with attached resou8rces.
pub async fn create_instance_with(
client: &ClientTestContext,
organization_name: &str,
project_name: &str,
instance_name: &str,
nics: &params::InstanceNetworkInterfaceAttachment,
disks: Vec<params::InstanceDiskAttachment>,
) -> Instance {
let url = format!(
"/organizations/{}/projects/{}/instances",
Expand All @@ -231,7 +238,7 @@ pub async fn create_instance_with_nics(
.to_vec(),
network_interfaces: nics.clone(),
external_ips: vec![],
disks: vec![],
disks,
},
)
.await
Expand Down
Loading