diff --git a/Cargo.lock b/Cargo.lock index 5d1c8e90d62..9c87ed750f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2717,6 +2717,81 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "ereport-test-utils" +version = "0.1.0" +dependencies = [ + "anyhow", + "camino", + "clap", + "dropshot", + "ereporter", + "ereporter-client", + "nexus-types", + "omicron-common", + "omicron-workspace-hack", + "schemars", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", + "uuid", +] + +[[package]] +name = "ereporter" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "dropshot", + "ereporter-api", + "internal-dns", + "nexus-client", + "omicron-common", + "omicron-workspace-hack", + "serde", + "slog", + "slog-async", + "slog-dtrace", + "slog-error-chain", + "slog-term", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "ereporter-api" +version = "0.1.0" +dependencies = [ + "chrono", + "dropshot", + "omicron-common", + "omicron-workspace-hack", + "schemars", + "serde", + "uuid", +] + +[[package]] +name = "ereporter-client" +version = "0.1.0" +dependencies = [ + "chrono", + "futures", + "omicron-common", + "omicron-workspace-hack", + "progenitor", + "reqwest 0.12.7", + "serde", + "slog", + "uuid", +] + [[package]] name = "errno" version = "0.3.9" @@ -7152,6 +7227,7 @@ dependencies = [ "cockroach-admin-api", "dns-server-api", "dropshot", + "ereporter-api", "fs-err", "gateway-api", "indent_write", diff --git a/Cargo.toml b/Cargo.toml index 7d096c67878..de4bd9a4d6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "clients/ddm-admin-client", "clients/dns-service-client", "clients/dpd-client", + "clients/ereporter-client", "clients/gateway-client", "clients/installinator-client", "clients/nexus-client", @@ -41,6 +42,9 @@ members = [ "dns-server", "dns-server-api", "end-to-end-tests", + "ereporter", + "ereporter/api", + "ereporter/test-utils", "gateway", "gateway-api", "gateway-cli", @@ -132,6 +136,7 @@ default-members = [ "clients/ddm-admin-client", "clients/dns-service-client", "clients/dpd-client", + "clients/ereporter-client", "clients/gateway-client", "clients/installinator-client", "clients/nexus-client", @@ -164,6 +169,9 @@ default-members = [ "dns-server", "dns-server-api", "end-to-end-tests", + "ereporter", + "ereporter/api", + "ereporter/test-utils", "gateway", "gateway-api", "gateway-cli", @@ -346,6 +354,9 @@ dpd-client = { path = "clients/dpd-client" } dropshot = { version = "0.12.0", features = [ "usdt-probes" ] } dyn-clone = "1.0.17" either = "1.13.0" +ereporter-api = { path = "ereporter/api" } +ereporter-client = { path = "clients/ereporter-client" } +ereporter = { path = "ereporter" } expectorate = "1.1.0" fatfs = "0.3.6" filetime = "0.2.25" diff --git a/clients/ereporter-client/Cargo.toml b/clients/ereporter-client/Cargo.toml new file mode 100644 index 00000000000..52cc0073da7 --- /dev/null +++ b/clients/ereporter-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "ereporter-client" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +chrono.workspace = true +futures.workspace = true +omicron-common.workspace = true +progenitor.workspace = true +reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } +serde.workspace = true +slog.workspace = true +uuid.workspace = true +omicron-workspace-hack.workspace = true diff --git a/clients/ereporter-client/src/lib.rs b/clients/ereporter-client/src/lib.rs new file mode 100644 index 00000000000..2f55b4e316b --- /dev/null +++ b/clients/ereporter-client/src/lib.rs @@ -0,0 +1,32 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2021 Oxide Computer Company + +//! Interface for API requests to an ereporter. + +progenitor::generate_api!( + spec = "../../openapi/ereporter.json", + inner_type = slog::Logger, + pre_hook = (|log: &slog::Logger, request: &reqwest::Request| { + slog::debug!(log, "client request"; + "method" => %request.method(), + "uri" => %request.url(), + "body" => ?&request.body(), + ); + }), + post_hook = (|log: &slog::Logger, result: &Result<_, _>| { + slog::debug!(log, "client response"; "result" => ?result); + }), + + replace = { + Generation = omicron_common::api::external::Generation, + }, +); + +impl omicron_common::api::external::ClientError for types::Error { + fn message(&self) -> String { + self.message.clone() + } +} diff --git a/dev-tools/openapi-manager/Cargo.toml b/dev-tools/openapi-manager/Cargo.toml index 211e1340161..b751b2aec40 100644 --- a/dev-tools/openapi-manager/Cargo.toml +++ b/dev-tools/openapi-manager/Cargo.toml @@ -17,6 +17,7 @@ cockroach-admin-api.workspace = true clap.workspace = true dns-server-api.workspace = true dropshot.workspace = true +ereporter-api.workspace = true fs-err.workspace = true gateway-api.workspace = true indent_write.workspace = true diff --git a/dev-tools/openapi-manager/src/spec.rs b/dev-tools/openapi-manager/src/spec.rs index 7d734218fc8..545e20ab325 100644 --- a/dev-tools/openapi-manager/src/spec.rs +++ b/dev-tools/openapi-manager/src/spec.rs @@ -129,6 +129,15 @@ pub fn all_apis() -> Vec { filename: "wicketd.json", extra_validation: None, }, + ApiSpec { + title: "Oxide Error Reporter API", + version: "0.0.1", + description: "API for collecting error reports from a reporter.", + boundary: ApiBoundary::Internal, + api_description: ereporter_api::ereporter_api_mod::stub_api_description, + filename: "ereporter.json", + extra_validation: None, + }, // Add your APIs here! Please keep this list sorted by filename. ] } diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml new file mode 100644 index 00000000000..d92509c9ead --- /dev/null +++ b/ereporter/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "ereporter" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +chrono.workspace = true +dropshot.workspace = true +ereporter-api.workspace = true +internal-dns.workspace = true +nexus-client.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +uuid.workspace = true +serde.workspace = true +slog.workspace = true +slog-error-chain.workspace = true +slog-dtrace.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] +clap.workspace = true +slog.workspace = true +slog-async.workspace = true +slog-term.workspace = true +tokio = { workspace = true, features = ["sync", "macros", "full"] } + +[lints] +workspace = true diff --git a/ereporter/api/Cargo.toml b/ereporter/api/Cargo.toml new file mode 100644 index 00000000000..23df8a790ca --- /dev/null +++ b/ereporter/api/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ereporter-api" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +chrono.workspace = true +dropshot.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +schemars.workspace = true +serde.workspace = true +uuid.workspace = true diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs new file mode 100644 index 00000000000..fb434a8cbb3 --- /dev/null +++ b/ereporter/api/src/lib.rs @@ -0,0 +1,105 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use chrono::{DateTime, Utc}; +use dropshot::{ + EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk, + PaginationParams, Query, RequestContext, ResultsPage, +}; +use omicron_common::api::external::Generation; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use uuid::Uuid; + +#[dropshot::api_description] +pub trait EreporterApi { + type Context; + + /// Get a list of ereports from `reporter_id`, paginated by sequence number. + #[endpoint { + method = GET, + path = "/ereports/{reporter_id}" + }] + async fn ereports_list( + request_context: RequestContext, + path: dropshot::Path, + query: Query>, + ) -> Result>, HttpError>; + + /// Informs the reporter with the given `reporter_id` that its ereports up + /// to the sequence number `seq` have been successfully ingested into + /// persistant storage. + /// + /// The reporter may now freely discard ereports with sequence numbers less + /// than or equal to `seq`. + #[endpoint { + method = DELETE, + path = "/ereports/{reporter_id}/{seq}" + }] + async fn ereports_acknowledge( + request_context: RequestContext, + path: dropshot::Path, + ) -> Result; +} + +/// Path parameters to the [`EreporterApi::ereports_list`] endpoint. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct ListPathParams { + pub reporter_id: Uuid, +} + +/// Path parameters to the [`EreporterApi::ereports_acknowledge`] endpoint. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct AcknowledgePathParams { + pub reporter_id: Uuid, + pub seq: Generation, +} + +/// An entry in the ereport batch returned by a reporter. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct Entry { + /// UUID of the entity that generated this ereport. + pub reporter_id: Uuid, + /// The ereport's sequence number, unique with regards to ereports generated + /// by the entity with the `reporter_id`. + pub seq: Generation, + + pub value: EntryKind, +} + +/// Kinds of entry in a batch. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum EntryKind { + /// An ereport. + Ereport(Ereport), + /// Ereports may have been lost. + DataLoss { + /// The number of ereports that were discarded, if it is known. + /// + /// If ereports are dropped because a buffer has reached its capacity, + /// the reporter is strongly encouraged to attempt to count the number + /// of ereports lost. In other cases, such as a reporter crashing and + /// restarting, the reporter may not be capable of determining the + /// number of ereports that were lost, or even *if* data loss actually + /// occurred. Therefore, a `None` here indicates *possible* data loss, + /// while a `Some(u32)` indicates *known* data loss. + dropped: Option, + }, +} + +/// An error report. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct Ereport { + /// A string indicating the kind of ereport. + /// + /// This may be used by diagnosis engines as an indication of what `facts` + /// to expect. + pub class: String, + /// The UTC timestamp when this ereport was observed, as determined by the reporter. + pub time_created: DateTime, + /// The set of facts (key-value pairs) associated with this ereport. + pub facts: HashMap, +} diff --git a/ereporter/src/buffer.rs b/ereporter/src/buffer.rs new file mode 100644 index 00000000000..d0e2c8a221e --- /dev/null +++ b/ereporter/src/buffer.rs @@ -0,0 +1,172 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::server; +use crate::EreportData; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use std::collections::VecDeque; +use tokio::sync::{mpsc, oneshot, watch}; +use uuid::Uuid; + +pub(crate) enum ServerReq { + TruncateTo { + seq: Generation, + tx: oneshot::Sender>, + }, + List { + start_seq: Option, + limit: usize, + tx: oneshot::Sender>, + }, +} + +pub(crate) struct BufferWorker { + seq: Generation, + buf: VecDeque, + log: slog::Logger, + id: Uuid, + ereports: mpsc::Receiver, + requests: mpsc::Receiver, +} + +#[derive(Debug)] +pub(crate) struct Handle { + pub(crate) requests: mpsc::Sender, + pub(crate) ereports: mpsc::Sender, + pub(crate) task: tokio::task::JoinHandle<()>, +} + +impl BufferWorker { + pub(crate) fn spawn( + id: Uuid, + log: &slog::Logger, + buffer_capacity: usize, + mut server: watch::Receiver>, + ) -> Handle { + let (requests_tx, requests) = mpsc::channel(128); + let (ereports_tx, ereports) = mpsc::channel(buffer_capacity); + let log = log.new(slog::o!("reporter_id" => id.to_string())); + let task = tokio::task::spawn(async move { + // Wait for the server to come up, and then register the reporter. + let seq = loop { + let state = server.borrow_and_update().as_ref().cloned(); + if let Some(server) = state { + break server.register_reporter(&log, id).await; + } + if server.changed().await.is_err() { + slog::warn!(log, "server disappeared surprisingly before we could register the reporter!"); + return; + } + }; + // Start running the buffer worker. + let worker = Self { + seq, + buf: VecDeque::with_capacity(buffer_capacity), + log, + id, + ereports, + requests, + }; + worker.run().await + }); + Handle { ereports: ereports_tx, requests: requests_tx, task } + } + + pub(crate) async fn run(mut self) { + while let Some(req) = self.requests.recv().await { + match req { + // Asked to list ereports! + ServerReq::List { start_seq, limit, tx } => { + // First, grab any new ereports and stick them in our cache. + while let Ok(ereport) = self.ereports.try_recv() { + self.push_ereport(ereport); + } + + let mut list = { + let cap = std::cmp::min(limit, self.buf.len()); + Vec::with_capacity(cap) + }; + + match start_seq { + // Start at lowest sequence number. + None => { + list.extend( + self.buf.iter().by_ref().take(limit).cloned(), + ); + } + Some(seq) => { + todo!( + "eliza: draw the rest of the pagination {seq}" + ) + } + } + slog::info!( + self.log, + "produced ereport batch from {start_seq:?}"; + "start" => ?start_seq, + "len" => list.len(), + "limit" => limit + ); + if tx.send(list).is_err() { + slog::warn!(self.log, "client canceled list request"); + } + } + ServerReq::TruncateTo { seq, tx } if seq > self.seq => { + if tx.send(Err(Error::invalid_value( + "seq", + "cannot truncate to a sequence number greater than the current maximum" + ))).is_err() { + // If the receiver no longer cares about the response to + // this request, no biggie. + slog::warn!(self.log, "client canceled truncate request"); + } + } + ServerReq::TruncateTo { seq, tx } => { + let prev_len = self.buf.len(); + self.buf.retain(|ereport| ereport.seq > seq); + + slog::info!( + self.log, + "truncated ereports up to {seq}"; + "seq" => ?seq, + "dropped" => prev_len - self.buf.len(), + "remaining" => self.buf.len(), + ); + + if tx.send(Ok(())).is_err() { + // If the receiver no longer cares about the response to + // this request, no biggie. + slog::warn!( + self.log, + "client canceled truncate request" + ); + } + } + } + } + + slog::info!(self.log, "server requests channel closed, shutting down"); + } + + fn push_ereport(&mut self, ereport: EreportData) { + let EreportData { facts, class, time_created } = ereport; + let seq = self.seq; + self.buf.push_back(ereporter_api::Entry { + seq, + reporter_id: self.id, + value: ereporter_api::EntryKind::Ereport(ereporter_api::Ereport { + facts, + class, + time_created, + }), + }); + self.seq = seq.next(); + slog::trace!( + self.log, + "recorded ereport"; + "seq" => %seq, + ); + } +} diff --git a/ereporter/src/lib.rs b/ereporter/src/lib.rs new file mode 100644 index 00000000000..6ba58ca5fdc --- /dev/null +++ b/ereporter/src/lib.rs @@ -0,0 +1,86 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use chrono::DateTime; +use chrono::Utc; +use std::collections::HashMap; +use tokio::sync::mpsc; + +mod buffer; +pub mod proxy; +pub mod registry; +pub mod server; +pub use self::registry::ReporterRegistry; + +#[must_use = "an `EreportBuilder` does nothing unless `submit()` is called"] +pub struct EreportBuilder { + data: EreportData, + permit: mpsc::OwnedPermit, +} + +impl EreportBuilder { + /// Reserve capacity for at least `facts` facts. + pub fn reserve(&mut self, facts: usize) { + self.data.facts.reserve(facts); + } + + pub fn time_created( + &mut self, + time_created: chrono::DateTime, + ) -> &mut Self { + self.data.time_created = time_created; + self + } + + pub fn fact( + &mut self, + name: impl ToString, + value: impl ToString, + ) -> Option { + self.data.facts.insert(name.to_string(), value.to_string()) + } + + pub fn facts( + &mut self, + facts: impl Iterator, + ) { + self.data + .facts + .extend(facts.map(|(k, v)| (k.to_string(), v.to_string()))) + } + + pub fn submit(self) { + self.permit.send(self.data); + } +} + +/// A reporter handle used to generate ereports. +#[derive(Clone, Debug)] +pub struct Reporter(pub(crate) mpsc::Sender); + +impl Reporter { + /// Begin constructing a new ereport, returning an [`EreportBuilder`]. + pub async fn report( + &self, + class: impl ToString, + ) -> Result { + let time_created = Utc::now(); + let permit = self.0.clone().reserve_owned().await.map_err(|_| ())?; + Ok(EreportBuilder { + data: EreportData { + class: class.to_string(), + time_created, + facts: HashMap::new(), + }, + permit, + }) + } +} + +/// An ereport. +pub(crate) struct EreportData { + pub(crate) class: String, + pub(crate) facts: HashMap, + pub(crate) time_created: DateTime, +} diff --git a/ereporter/src/proxy.rs b/ereporter/src/proxy.rs new file mode 100644 index 00000000000..0d2971644eb --- /dev/null +++ b/ereporter/src/proxy.rs @@ -0,0 +1,55 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +#![allow(dead_code)] // TODO(eliza): i'm still working on this bit +use ereporter_api::Entry; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use std::future::Future; +use tokio::sync::{mpsc, watch}; +use uuid::Uuid; + +pub trait ReporterProxy { + fn list( + &self, + starting_seq: Option, + limit: Option, + ) -> impl Future, Error>> + Send; + + fn acknowledge( + &self, + seq: Generation, + ) -> impl Future> + Send; + + fn recover_seq( + &self, + seq: Generation, + ) -> impl Future> + Send; +} + +pub(crate) struct ProxyWorker

{ + proxy: P, + log: slog::Logger, + id: Uuid, + server: watch::Receiver>, + server_reqs: mpsc::Receiver, + seqs: mpsc::Receiver, +} + +impl ProxyWorker

{ + async fn run(mut self) { + loop { + tokio::select! { + biased; + seq = self.seqs.recv() => { + if let Some(seq) = seq { + if let Err(_e) = self.proxy.recover_seq(seq).await { + // TODO(eliza): waht do about error + } + } + + }, + } + } + } +} diff --git a/ereporter/src/registry.rs b/ereporter/src/registry.rs new file mode 100644 index 00000000000..00ed682217c --- /dev/null +++ b/ereporter/src/registry.rs @@ -0,0 +1,135 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::buffer; +use crate::server; +use crate::Reporter; +use omicron_common::FileKv; +use slog::debug; +use slog::error; +use slog::Drain; +use slog::Logger; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tokio::sync::mpsc; +use tokio::sync::watch; +use uuid::Uuid; + +// Our public interface depends directly or indirectly on these types; we +// export them so that consumers need not depend on dropshot themselves and +// to simplify how we stage incompatible upgrades. +pub use dropshot::ConfigLogging; +pub use dropshot::ConfigLoggingIfExists; +pub use dropshot::ConfigLoggingLevel; + +#[derive(Clone, Debug)] +pub struct ReporterRegistry(pub(crate) Arc); + +/// Either configuration for building a logger, or an actual logger already +/// instantiated. +/// +/// This can be used to start a [`ReporterRegistry`] with a new logger or a child of a +/// parent logger if desired. +#[derive(Debug, Clone)] +pub enum LogConfig { + /// Configuration for building a new logger. + Config(ConfigLogging), + /// An explicit logger to use. + Logger(slog::Logger), +} + +#[derive(Debug)] +pub(crate) struct RegistryInner { + reporters: RwLock>, + buffer_capacity: usize, + // Used for registering reporters as they are inserted. + pub(crate) log: slog::Logger, + pub(crate) server_tx: watch::Sender>, + server_state: watch::Receiver>, +} + +impl ReporterRegistry { + pub fn new(log: LogConfig, buffer_capacity: usize) -> anyhow::Result { + let log = { + let base_logger = match log { + LogConfig::Config(conf) => conf.to_logger("ereporter")?, + LogConfig::Logger(log) => log.clone(), + }; + let (drain, registration) = slog_dtrace::with_drain(base_logger); + let log = Logger::root(drain.fuse(), slog::o!(FileKv)); + if let slog_dtrace::ProbeRegistration::Failed(e) = registration { + error!(log, "failed to register DTrace probes: {e}",); + } else { + debug!(log, "registered DTrace probes"); + } + log + }; + + anyhow::ensure!( + buffer_capacity > 0, + "a 0-capacity ereport buffer is nonsensical" + ); + + let (server_tx, server_state) = tokio::sync::watch::channel(None); + Ok(Self(Arc::new(RegistryInner { + reporters: RwLock::new(HashMap::new()), + buffer_capacity, + log, + server_tx, + server_state, + }))) + } + + pub fn register_reporter(&self, id: Uuid) -> Reporter { + let mut reporters = self.0.reporters.write().unwrap(); + let ereports = reporters + .entry(id) + .or_insert_with(|| { + buffer::BufferWorker::spawn( + id, + &self.0.log, + self.0.buffer_capacity, + self.0.server_state.clone(), + ) + }) + .ereports + .clone(); + Reporter(ereports) + } + + pub fn register_proxy(&self, _id: Uuid, _proxy: ()) { + unimplemented!( + "TODO(eliza): this will eventually take a trait representing how \ + to proxy requests to a non-local reporter, for use by e.g. MGS", + ); + } + + pub(crate) fn get_worker( + &self, + id: &Uuid, + ) -> Option> { + self.0 + .reporters + .read() + .unwrap() + .get(id) + .map(|handle| &handle.requests) + .cloned() + } +} + +impl Drop for RegistryInner { + // Abbort all spawned buffer tasks when the reporter registry is dropped. + fn drop(&mut self) { + let reporters = self + .reporters + .get_mut() + // if the lock is poisoned, don't panic, because we are in a `Drop` + // impl and that would cause a double-panic. + .unwrap_or_else(|e| e.into_inner()); + for (_, reporter) in reporters.drain() { + reporter.task.abort(); + } + } +} diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs new file mode 100644 index 00000000000..9af4cdcd698 --- /dev/null +++ b/ereporter/src/server.rs @@ -0,0 +1,298 @@ +use crate::buffer; +use crate::registry::ReporterRegistry; +use dropshot::{ + ConfigDropshot, EmptyScanParams, HttpError, HttpResponseDeleted, + HttpResponseOk, PaginationParams, Path, Query, RequestContext, ResultsPage, + WhichPage, +}; +use ereporter_api::ListPathParams; +use internal_dns::resolver::Resolver; +use internal_dns::ServiceName; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use omicron_common::backoff; +use omicron_common::backoff::BackoffError; +use slog::debug; +use slog::warn; +use slog_error_chain::SlogInlineError; +use std::net::IpAddr; +use std::net::SocketAddr; +use tokio::sync::oneshot; +use uuid::Uuid; + +#[derive(Clone, serde::Serialize, serde::Deserialize)] +pub struct Config { + pub server_address: SocketAddr, + /// How to discover the Nexus API to register the reporter. + pub registration_address: Option, + // /// The maximum size of Dropshot requests. + pub request_body_max_bytes: usize, +} + +pub struct RunningServer { + _server: dropshot::HttpServer, +} + +#[derive(Clone, Debug)] +pub(crate) struct State { + server_address: SocketAddr, + nexus: NexusDiscovery, +} + +impl ReporterRegistry { + pub fn start_server( + &self, + config: Config, + dns_resolver: Option, + ) -> anyhow::Result { + let log = &self.0.log; + + let server = { + let dropshot_cfg = ConfigDropshot { + bind_address: config.server_address, + request_body_max_bytes: config.request_body_max_bytes, + default_handler_task_mode: dropshot::HandlerTaskMode::Detached, + log_headers: vec![], + }; + let log = log.new(slog::o!("component" => "dropshot")); + let api = ereporter_api::ereporter_api_mod::api_description::< + EreporterApiImpl, + >()?; + dropshot::HttpServerStarter::new( + &dropshot_cfg, + api, + self.clone(), + &log, + ) + .map_err(|e| { + anyhow::anyhow!("could not start dropshot server: {e}") + })? + .start() + }; + + // Create a resolver if needed, or use Nexus's address directly. + let nexus = match (config.registration_address, dns_resolver) { + (Some(addr), _) => { + if addr.port() == 0 { + anyhow::bail!( + "Nexus registration address must have a real port" + ); + } + debug!( + self.0.log, + "Nexus IP provided explicitly, registering with it"; + "addr" => %addr, + ); + NexusDiscovery::Addr(addr) + } + (None, None) => { + // Ensure that we've been provided with an IPv6 address if we're + // using DNS to resolve Nexus. That's required because we need + // to use the /48 to find our DNS server itself. + let IpAddr::V6(our_addr) = config.server_address.ip() else { + anyhow::bail!("server address must be IPv6 in order to resolve Nexus from DNS") + }; + debug!( + self.0.log, + "Nexus IP not provided, will create an internal \ + DNS resolver to resolve it" + ); + + let resolver = Resolver::new_from_ip( + self.0 + .log + .new(slog::o!("component" => "internal-dns-resolver")), + our_addr, + )?; + NexusDiscovery::Dns(resolver) + } + (None, Some(resolver)) => { + debug!( + self.0.log, + "Nexus IP not provided, will use DNS to resolve it" + ); + NexusDiscovery::Dns(resolver) + } + }; + + self.0 + .server_tx + .send(Some(State { + // Use the server's actual address once it has been bound, + // rather than the configured address, since the configured port + // may be :0. + server_address: server.local_addr(), + nexus, + })) + .expect("receivers should never be dropped"); + Ok(RunningServer { _server: server }) + } +} + +struct EreporterApiImpl; +impl ereporter_api::EreporterApi for EreporterApiImpl { + type Context = ReporterRegistry; + + async fn ereports_list( + reqctx: RequestContext, + path: Path, + query: Query>, + ) -> Result>, HttpError> + { + let registry = reqctx.context(); + let pagination = query.into_inner(); + let limit = reqctx.page_limit(&pagination)?.get() as usize; + let ereporter_api::ListPathParams { reporter_id } = path.into_inner(); + + let start_seq = match pagination.page { + WhichPage::First(..) => None, + WhichPage::Next(seq) => Some(seq), + }; + + slog::debug!( + reqctx.log, + "received ereport list request"; + "reporter_id" => %reporter_id, + "start_seq" => ?start_seq, + "limit" => limit, + ); + let worker = registry.get_worker(&reporter_id).ok_or( + HttpError::for_not_found( + Some("NO_REPORTER".to_string()), + format!("no reporter with ID {reporter_id}"), + ), + )?; + let (tx, rx) = oneshot::channel(); + worker + .send(buffer::ServerReq::List { start_seq, limit, tx }) + .await + .map_err(|_| Error::internal_error("server shutting down"))?; + let list = rx.await.map_err(|_| { + // This shouldn't happen! + Error::internal_error("buffer canceled request rudely!") + })?; + let page = ResultsPage::new( + list, + &EmptyScanParams {}, + |entry: &ereporter_api::Entry, _| entry.seq, + )?; + Ok(HttpResponseOk(page)) + } + + async fn ereports_acknowledge( + reqctx: RequestContext, + path: Path, + ) -> Result { + let registry = reqctx.context(); + let ereporter_api::AcknowledgePathParams { reporter_id, seq } = + path.into_inner(); + slog::debug!( + reqctx.log, + "received ereport acknowledge request"; + "reporter_id" => %reporter_id, + "seq" => ?seq, + ); + + let worker = registry.get_worker(&reporter_id).ok_or( + HttpError::for_not_found( + Some("NO_REPORTER".to_string()), + format!("no reporter with ID {reporter_id}"), + ), + )?; + let (tx, rx) = oneshot::channel(); + worker + .send(buffer::ServerReq::TruncateTo { seq, tx }) + .await + .map_err(|_| Error::internal_error("server shutting down"))?; + rx.await.map_err(|_| { + // This shouldn't happen! + Error::internal_error("buffer canceled request rudely!") + })??; + Ok(HttpResponseDeleted()) + } +} + +/// How to discover Nexus' IP for registration. +#[derive(Clone)] +enum NexusDiscovery { + Addr(SocketAddr), + Dns(Resolver), +} + +impl State { + pub(crate) async fn register_reporter( + &self, + log: &slog::Logger, + reporter_id: Uuid, + ) -> Generation { + #[derive(Debug, thiserror::Error, SlogInlineError)] + enum RegistrationError { + #[error(transparent)] + Dns(#[from] internal_dns::resolver::ResolveError), + #[error(transparent)] + Client(#[from] nexus_client::Error), + } + + let info = nexus_client::types::EreporterInfo { + reporter_id, + address: self.server_address.to_string(), + }; + let do_register = || async { + let nexus_addr = self.nexus.nexus_addr().await.map_err(|e| { + BackoffError::transient(RegistrationError::from(e)) + })?; + let nexus_client = nexus_client::Client::new( + &format!("http://{nexus_addr}"), + log.clone(), + ); + let nexus_client::types::EreporterRegistered { seq } = nexus_client + .cpapi_ereporters_post(&info) + .await + .map_err(|e| { + BackoffError::transient(RegistrationError::from(e)) + })? + .into_inner(); + Ok(seq) + }; + let log_failure = |error: RegistrationError, delay| { + warn!( + log, + "failed to register ereporter with Nexus; retrying..."; + "reporter_id" => %reporter_id, + "reporter_address" => %self.server_address, + "error" => error, + "delay" => ?delay, + ); + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_register, + log_failure, + ) + .await + .expect("Expected infinite retry loop registering ereporter") + } +} + +impl NexusDiscovery { + async fn nexus_addr( + &self, + ) -> Result { + match self { + NexusDiscovery::Addr(addr) => Ok(*addr), + NexusDiscovery::Dns(resolver) => resolver + .lookup_socket_v6(ServiceName::Nexus) + .await + .map(Into::into), + } + } +} + +impl std::fmt::Debug for NexusDiscovery { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Addr(addr) => f.debug_tuple("Addr").field(&addr).finish(), + Self::Dns(_) => f.debug_tuple("Dns").finish(), + } + } +} diff --git a/ereporter/test-utils/Cargo.toml b/ereporter/test-utils/Cargo.toml new file mode 100644 index 00000000000..65a4b1e292d --- /dev/null +++ b/ereporter/test-utils/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "ereport-test-utils" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +camino.workspace = true +clap.workspace = true +dropshot.workspace = true +ereporter.workspace = true +ereporter-client.workspace = true +nexus-types.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +uuid.workspace = true +schemars.workspace = true +serde.workspace = true +serde_json.workspace = true +slog.workspace = true +slog-async.workspace = true +slog-term.workspace = true +tokio = { workspace = true, features = ["sync", "macros", "full"] } + +[lints] +workspace = true diff --git a/ereporter/test-utils/src/bin/ereport-demo.rs b/ereporter/test-utils/src/bin/ereport-demo.rs new file mode 100644 index 00000000000..5c1ff02460e --- /dev/null +++ b/ereporter/test-utils/src/bin/ereport-demo.rs @@ -0,0 +1,77 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +use clap::Parser; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use uuid::Uuid; + +/// A demo binary that generates a single ereport and then waits for it to be collected. +#[derive(Clone, Parser)] +struct Args { + #[arg(long, short)] + uuid: Uuid, + + /// The address for the mock Nexus server used to register. + #[arg( + long, + default_value_t = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345) + )] + nexus_address: SocketAddr, + + /// The address for the mock Nexus server used to register. + #[arg( + long, + default_value_t = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0), + )] + server_address: SocketAddr, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let Args { uuid, nexus_address, server_address } = Args::parse(); + let log = { + use omicron_common::FileKv; + use slog::Drain; + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + // let drain = slog::LevelFilter::new(drain, log_level).fuse(); + slog::Logger::root(drain.fuse(), slog::o!(FileKv)) + }; + let registry = ereporter::ReporterRegistry::new( + ereporter::registry::LogConfig::Logger(log.clone()), + 128, + )?; + let reporter = registry.register_reporter(uuid); + let _server = registry.start_server( + ereporter::server::Config { + server_address, + registration_address: Some(nexus_address), + request_body_max_bytes: 1024 * 8, + }, + None, + )?; + + let mut report = reporter + .report("list.suspect") + .await + .map_err(|_| anyhow::anyhow!("couldnt get ye report"))?; + // https://www.youtube.com/watch?v=TSOtPQ-K3Ds + report.fact("de.version", "0"); + report.fact("de.mod-name", "hal-9000-diagnosis"); + report.fact("de.mod-version", "1.0"); + report.fact("de.authority.product-id", "HAL 9000-series computer"); + report.fact("de.authority.server-id", "HAL 9000"); + report.fact("fault-list.0.unit", "AE-35"); + report.fact("fault-list.0.class", "defect.discovery-one.ae-32-fault"); + report.fact("fault-list.0.certainty", "0x64"); + report.fact("fault-list.0.100-failure-in", "72h"); + report.fact( + "fault-list.0.nosub_class", + "ereport.discovery-one.communications-dish.ae-35-unit", + ); + report.submit(); + + tokio::signal::ctrl_c().await?; + Ok(()) +} diff --git a/ereporter/test-utils/src/bin/ingester-tester.rs b/ereporter/test-utils/src/bin/ingester-tester.rs new file mode 100644 index 00000000000..dd93b17a8f5 --- /dev/null +++ b/ereporter/test-utils/src/bin/ingester-tester.rs @@ -0,0 +1,33 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use clap::Parser; +#[derive(Clone, Debug, Parser)] +struct Args { + /// The log level. + #[arg(long, default_value_t = slog::Level::Info, value_parser = parse_log_level)] + log_level: slog::Level, + + #[clap(flatten)] + ingester: ereport_test_utils::IngesterConfig, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let Args { log_level, ingester } = Args::parse(); + let log = { + use omicron_common::FileKv; + use slog::Drain; + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let drain = slog::LevelFilter::new(drain, log_level).fuse(); + slog::Logger::root(drain.fuse(), slog::o!(FileKv)) + }; + ingester.run(log).await +} + +fn parse_log_level(s: &str) -> Result { + s.parse().map_err(|_| format!("invalid log level {s:?}")) +} diff --git a/ereporter/test-utils/src/lib.rs b/ereporter/test-utils/src/lib.rs new file mode 100644 index 00000000000..d59b33ec45a --- /dev/null +++ b/ereporter/test-utils/src/lib.rs @@ -0,0 +1,371 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +use anyhow::Context; +use camino::{Utf8Path, Utf8PathBuf}; +use clap::Parser; +use dropshot::{ + endpoint, HttpError, HttpResponseOk, RequestContext, TypedBody, +}; +use nexus_types::internal_api::params::EreporterInfo; +use omicron_common::api::external::Generation; +use schemars::JsonSchema; +use serde::Serialize; +use std::collections::hash_map::{Entry, HashMap}; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use uuid::Uuid; + +/// Configuration for a standalone ereport ingester. +#[derive(Clone, Debug, Parser)] +pub struct IngesterConfig { + /// Directory in which to store ingested ereports. + pub data_dir: camino::Utf8PathBuf, + + /// The address for the mock Nexus server used to register. + /// + /// This program starts a mock version of Nexus, which is used only to + /// register the producers and collectors. This allows them to operate + /// as they usually would, registering each other with Nexus so that an + /// assignment between them can be made. + #[arg( + long, + default_value_t = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12345, 0, 0) + )] + pub nexus: SocketAddrV6, + + /// Interval at which ereports are ingested. + #[arg(long, default_value_t = 5)] + pub interval_secs: usize, +} + +impl IngesterConfig { + pub async fn run(self, log: slog::Logger) -> anyhow::Result<()> { + let (registration_tx, registrations) = mpsc::channel(128); + let ingester = Ingester { + reporters: HashMap::new(), + log: log.new(slog::o!("component" => "ingester")), + path: self.data_dir, + registrations, + interval: Duration::from_secs(self.interval_secs as u64), + }; + let ingester = tokio::spawn(ingester.run()); + + let _server = { + let apictx = ServerContext { registration_tx }; + let mut api = dropshot::ApiDescription::new(); + api.register(cpapi_ereporters_post)?; + dropshot::HttpServerStarter::new( + &dropshot::ConfigDropshot { + default_handler_task_mode: + dropshot::HandlerTaskMode::Detached, + bind_address: self.nexus.into(), + ..Default::default() + }, + api, + apictx, + &log.new(slog::o!("component" => "standalone-nexus")), + ) + .map_err(|e| { + anyhow::anyhow!("failed to start standalone nexus server: {e}") + })? + .start() + }; + slog::info!( + log, + "created standalone nexus server for ereporter registration"; + "address" => %self.nexus, + ); + + ingester.await.context("ingester task panicked")??; + Ok(()) + } +} + +struct Ingester { + reporters: HashMap, + registrations: mpsc::Receiver, + log: slog::Logger, + path: Utf8PathBuf, + interval: Duration, +} + +struct Reporter { + path: Utf8PathBuf, + clients: HashMap, +} + +struct Registration { + uuid: Uuid, + addr: SocketAddr, + tx: oneshot::Sender, +} + +impl Ingester { + async fn run(mut self) -> anyhow::Result<()> { + if !self.path.exists() { + std::fs::create_dir_all(&self.path).with_context(|| { + format!("couldn't create directory {}", self.path) + })?; + slog::info!(self.log, "created data dir"; "path" => %self.path); + } + let mut interval = time::interval(self.interval); + slog::info!(self.log, "ingesting ereports every {:?}", self.interval); + loop { + tokio::select! { + biased; + req = self.registrations.recv() => { + let Some(reg) = req else { + slog::warn!(self.log, "the registration-request sender has gone away?"); + anyhow::bail!("the registration request sender went away unexpectedly"); + }; + self.register_client(reg).await?; + }, + _ = interval.tick() => { + self.collect().await?; + } + } + } + } + + async fn collect(&mut self) -> anyhow::Result<()> { + slog::debug!(self.log, "collecting ereports..."); + // TODO(eliza): perhaps we should try to do every reporter in parallel + // at some point? + for (id, reporter) in &self.reporters { + slog::debug!(self.log, "collecting ereports from {id}"); + let mut saw_reports = false; + for (addr, client) in &reporter.clients { + let reports = match client.ereports_list(id, None, None).await { + Ok(e) => e.into_inner(), + Err(e) => { + slog::error!(self.log, + "error collecting ereports"; + "reporter_id" => %id, + "reporter_addr" => %addr, + "error" => %e, + ); + continue; + } + }; + for ereporter_client::types::Entry { + seq, + value, + reporter_id, + } in reports.items + { + saw_reports = true; + match value { + ereporter_client::types::EntryKind::Ereport(report) => { + let path = + reporter.path.join(&format!("{seq}.json")); + if path.exists() { + slog::info!( + self.log, + "we are already familiar with ereport \ + {seq} from {reporter_id}, ignoring it"; + "reporter_id" => %reporter_id, + "seq" => %seq, + ); + continue; + } + + slog::info!( + &self.log, + "ereport {seq} from {reporter_id}: {report:#?}"; + "reporter_id" => %reporter_id, + "seq" => %seq, + ); + let mut f = tokio::fs::File::create_new(&path) + .await + .with_context(|| { + format!("failed to create file {path}") + })?; + + let bytes = serde_json::to_vec_pretty(&report) + .with_context(|| format!("failed to serialize ereport {seq} from {reporter_id}"))?; + + f + .write_all(&bytes) + .await + .with_context(|| format!("failed to write ereport {seq} from {reporter_id}"))?; + } + + ereporter_client::types::EntryKind::DataLoss { + dropped, + } => { + slog::warn!(self.log, + "reporter {reporter_id} reports data loss at seq {seq}"; + "reporter_id" => %reporter_id, + "seq" => %seq, + "dropped" => ?dropped, + ); + } + } + } + } + + if saw_reports { + // All ereports ingested for this reporter ID. Now, ack them up to the + // latest seq. + let seq = latest_seq(&reporter.path) + .await + .with_context(|| { + format!("couldn't determine latest seq for {id}") + })? + .unwrap_or_else(|| Generation::new()); + for (addr, client) in &reporter.clients { + match client.ereports_acknowledge(id, &seq).await { + Ok(_) => { + slog::info!( + &self.log, + "acked reports"; + "reporter_id" => %id, + "reporter_addr" => ?addr, + "seq" => %seq, + ); + } + Err(e) => { + slog::warn!( + &self.log, + "failed to ack reports"; + "reporter_id" => %id, + "reporter_addr" => ?addr, + "seq" => %seq, + "error" => %e, + ); + } + } + } + } + } + + Ok(()) + } + + async fn register_client( + &mut self, + Registration { uuid, addr, tx }: Registration, + ) -> anyhow::Result<()> { + let reporter = self.reporters.entry(uuid).or_insert_with(|| { + let path = self.path.join(uuid.to_string()); + Reporter { path, clients: HashMap::new() } + }); + std::fs::create_dir_all(&reporter.path).with_context(|| { + format!( + "couldn't create reporter ereport directory {}", + reporter.path + ) + })?; + let seq = latest_seq(&reporter.path) + .await? + // If there is an existing sequence number for this reporter, return + // the next one; otherwise, start at sequence 0. + .map(|seq| seq.next()) + .unwrap_or_else(|| Generation::new()); + match reporter.clients.entry(addr) { + Entry::Occupied(_) => { + slog::info!( + self.log, + "recovering sequence for reporter"; + "reporter_id" => %uuid, + "address" => %addr, + "seq" => %seq, + ); + } + Entry::Vacant(e) => { + slog::info!( + self.log, + "registered new endpoint for reporter"; + "reporter_id" => %uuid, + "address" => %addr, + "seq" => %seq, + ); + let log = self.log.new(slog::o!( + "reporter_id" => uuid.to_string(), + "reporter_addr" => addr.to_string(), + )); + let client = ereporter_client::Client::new( + &format!("http://{addr}"), + log, + ); + e.insert(client); + } + } + + if tx.send(seq).is_err() { + slog::warn!( + self.log, + "reporter gave up on registration attempt unexpectedly" + ); + } + Ok(()) + } +} + +async fn latest_seq(path: &Utf8PathBuf) -> anyhow::Result> { + let mut dir = tokio::fs::read_dir(path) + .await + .with_context(|| format!("failed to read {path}"))?; + let mut max = None; + while let Some(entry) = dir + .next_entry() + .await + .with_context(|| format!("failed to get next entry in {path}"))? + { + let path = entry.path(); + let path = Utf8Path::from_path(path.as_ref()) + .with_context(|| format!("path {} was not utf8", path.display()))?; + if path.is_dir() { + continue; + } + if let Some(file) = path.file_stem() { + match file.parse::() { + Ok(seq) => max = std::cmp::max(Some(seq), max), + Err(_) => { + continue; + } + } + } + } + + Ok(max.map(Generation::from_u32)) +} + +#[derive(Clone)] +struct ServerContext { + registration_tx: mpsc::Sender, +} + +/// Register an error reporter with Nexus, returning the next sequence +/// number for an error report from that reporter. +#[endpoint { + method = POST, + path = "/ereport/reporters", +}] +async fn cpapi_ereporters_post( + rqctx: RequestContext, + identity: TypedBody, +) -> Result, HttpError> { + let ctx = rqctx.context(); + let EreporterInfo { reporter_id, address } = identity.into_inner(); + let (tx, rx) = oneshot::channel(); + ctx.registration_tx + .send(Registration { addr: address, uuid: reporter_id, tx }) + .await + .expect("the main task should not have gone away"); + let seq = rx.await.expect("the main task should not have given up on us"); + Ok(HttpResponseOk(EreporterRegistered { seq })) +} + +/// Response to error reporter registration requests. +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct EreporterRegistered { + /// The starting sequence number of the next error report from this + /// reporter. If the reporter has not been seen by Nexus previously, this + /// may be 0. + pub seq: Generation, +} diff --git a/nexus/internal-api/src/lib.rs b/nexus/internal-api/src/lib.rs index 044537946b2..e80fb80f456 100644 --- a/nexus/internal-api/src/lib.rs +++ b/nexus/internal-api/src/lib.rs @@ -20,15 +20,16 @@ use nexus_types::{ }, internal_api::{ params::{ - InstanceMigrateRequest, OximeterInfo, RackInitializationRequest, - SledAgentInfo, SwitchPutRequest, SwitchPutResponse, + EreporterInfo, InstanceMigrateRequest, OximeterInfo, + RackInitializationRequest, SledAgentInfo, SwitchPutRequest, + SwitchPutResponse, }, views::{BackgroundTask, DemoSaga, Ipv4NatEntryView, Saga}, }, }; use omicron_common::{ api::{ - external::{http_pagination::PaginatedById, Instance}, + external::{http_pagination::PaginatedById, Generation, Instance}, internal::nexus::{ DiskRuntimeState, DownstairsClientStopRequest, DownstairsClientStopped, ProducerEndpoint, @@ -530,6 +531,19 @@ pub trait NexusInternalApi { path_params: Path, query_params: Query, ) -> Result>, HttpError>; + + // Error reports + + /// Register an error reporter with Nexus, returning the next sequence + /// number for an error report from that reporter. + #[endpoint { + method = POST, + path = "/ereport/reporters", + }] + async fn cpapi_ereporters_post( + request_context: RequestContext, + identity: TypedBody, + ) -> Result, HttpError>; } /// Path parameters for Sled Agent requests (internal API) @@ -649,3 +663,12 @@ pub struct SledId { pub struct ProbePathParam { pub sled: Uuid, } + +/// Response to error reporter registration requests. +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct EreporterRegistered { + /// The starting sequence number of the next error report from this + /// reporter. If the reporter has not been seen by Nexus previously, this + /// may be 0. + pub seq: Generation, +} diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index 2829fdb2a63..5a2f0c040d3 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -30,6 +30,7 @@ use nexus_types::external_api::params::UninitializedSledId; use nexus_types::external_api::shared::ProbeInfo; use nexus_types::external_api::shared::UninitializedSled; use nexus_types::external_api::views::SledPolicy; +use nexus_types::internal_api::params::EreporterInfo; use nexus_types::internal_api::params::InstanceMigrateRequest; use nexus_types::internal_api::params::SledAgentInfo; use nexus_types::internal_api::params::SwitchPutRequest; @@ -939,4 +940,23 @@ impl NexusInternalApi for NexusInternalApiImpl { .instrument_dropshot_handler(&rqctx, handler) .await } + + async fn cpapi_ereporters_post( + rqctx: RequestContext, + _identity: TypedBody, + ) -> Result, HttpError> { + let apictx = &rqctx.context().context; + let handler = async { + // TODO(eliza): ACTUALLY IMPLEMENT THIS PART LOL + Err(HttpError::for_unavail( + Some("NOT YET IMPLEMENTED LMAO".to_string()), + "TODO eliza actually implement this part lol lmao :)" + .to_string(), + )) + }; + apictx + .internal_latencies + .instrument_dropshot_handler(&rqctx, handler) + .await + } } diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index c803f003f1d..9eba13ca009 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -214,3 +214,14 @@ pub struct InstanceMigrateRequest { /// The ID of the sled to which to migrate the target instance. pub dst_sled_id: Uuid, } + +/// Message sent by an error reporter to register itself with Nexus. +#[derive(Debug, Clone, Copy, JsonSchema, Serialize, Deserialize)] +pub struct EreporterInfo { + /// The UUID of the reporting entity. + pub reporter_id: Uuid, + + /// The address on which the error reporter server listens for ingestion + /// requests. + pub address: SocketAddr, +} diff --git a/openapi/ereporter.json b/openapi/ereporter.json new file mode 100644 index 00000000000..5b0c838c47a --- /dev/null +++ b/openapi/ereporter.json @@ -0,0 +1,265 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "Oxide Error Reporter API", + "description": "API for collecting error reports from a reporter.", + "contact": { + "url": "https://oxide.computer", + "email": "api@oxide.computer" + }, + "version": "0.0.1" + }, + "paths": { + "/ereports/{reporter_id}": { + "get": { + "summary": "Get a list of ereports from `reporter_id`, paginated by sequence number.", + "operationId": "ereports_list", + "parameters": [ + { + "in": "path", + "name": "reporter_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EntryResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": { + "required": [] + } + } + }, + "/ereports/{reporter_id}/{seq}": { + "delete": { + "summary": "Informs the reporter with the given `reporter_id` that its ereports up", + "description": "to the sequence number `seq` have been successfully ingested into persistant storage.\n\nThe reporter may now freely discard ereports with sequence numbers less than or equal to `seq`.", + "operationId": "ereports_acknowledge", + "parameters": [ + { + "in": "path", + "name": "reporter_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, + { + "in": "path", + "name": "seq", + "required": true, + "schema": { + "$ref": "#/components/schemas/Generation" + } + } + ], + "responses": { + "204": { + "description": "successful deletion" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + } + }, + "components": { + "schemas": { + "Entry": { + "description": "An entry in the ereport batch returned by a reporter.", + "type": "object", + "properties": { + "reporter_id": { + "description": "UUID of the entity that generated this ereport.", + "type": "string", + "format": "uuid" + }, + "seq": { + "description": "The ereport's sequence number, unique with regards to ereports generated by the entity with the `reporter_id`.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + }, + "value": { + "$ref": "#/components/schemas/EntryKind" + } + }, + "required": [ + "reporter_id", + "seq", + "value" + ] + }, + "EntryKind": { + "description": "Kinds of entry in a batch.", + "oneOf": [ + { + "description": "An ereport.", + "type": "object", + "properties": { + "ereport": { + "$ref": "#/components/schemas/Ereport" + } + }, + "required": [ + "ereport" + ], + "additionalProperties": false + }, + { + "description": "Ereports may have been lost.", + "type": "object", + "properties": { + "data_loss": { + "type": "object", + "properties": { + "dropped": { + "nullable": true, + "description": "The number of ereports that were discarded, if it is known.\n\nIf ereports are dropped because a buffer has reached its capacity, the reporter is strongly encouraged to attempt to count the number of ereports lost. In other cases, such as a reporter crashing and restarting, the reporter may not be capable of determining the number of ereports that were lost, or even *if* data loss actually occurred. Therefore, a `None` here indicates *possible* data loss, while a `Some(u32)` indicates *known* data loss.", + "type": "integer", + "format": "uint32", + "minimum": 0 + } + } + } + }, + "required": [ + "data_loss" + ], + "additionalProperties": false + } + ] + }, + "EntryResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/Entry" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, + "Ereport": { + "description": "An error report.", + "type": "object", + "properties": { + "class": { + "description": "A string indicating the kind of ereport.\n\nThis may be used by diagnosis engines as an indication of what `facts` to expect.", + "type": "string" + }, + "facts": { + "description": "The set of facts (key-value pairs) associated with this ereport.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "time_created": { + "description": "The UTC timestamp when this ereport was observed, as determined by the reporter.", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "class", + "facts", + "time_created" + ] + }, + "Error": { + "description": "Error information from a response.", + "type": "object", + "properties": { + "error_code": { + "type": "string" + }, + "message": { + "type": "string" + }, + "request_id": { + "type": "string" + } + }, + "required": [ + "message", + "request_id" + ] + }, + "Generation": { + "description": "Generation numbers stored in the database, used for optimistic concurrency control", + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "responses": { + "Error": { + "description": "Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + } + } + } + } + } +} diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 9226b9d3190..0156e602246 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -746,6 +746,41 @@ } } }, + "/ereport/reporters": { + "post": { + "summary": "Register an error reporter with Nexus, returning the next sequence", + "description": "number for an error report from that reporter.", + "operationId": "cpapi_ereporters_post", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterInfo" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterRegistered" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/instances/{instance_id}/migrate": { "post": { "operationId": "instance_migrate", @@ -3264,6 +3299,42 @@ "secs" ] }, + "EreporterInfo": { + "description": "Message sent by an error reporter to register itself with Nexus.", + "type": "object", + "properties": { + "address": { + "description": "The address on which the error reporter server listens for ingestion requests.", + "type": "string" + }, + "reporter_id": { + "description": "The UUID of the reporting entity.", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "address", + "reporter_id" + ] + }, + "EreporterRegistered": { + "description": "Response to error reporter registration requests.", + "type": "object", + "properties": { + "seq": { + "description": "The starting sequence number of the next error report from this reporter. If the reporter has not been seen by Nexus previously, this may be 0.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + } + }, + "required": [ + "seq" + ] + }, "Error": { "description": "Error information from a response.", "type": "object",