From 08513397b3e9e52633b6fbb72aeb95dd426e164c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Oct 2024 10:56:57 -0700 Subject: [PATCH 01/17] [ereport] initial sketch of reporter API --- Cargo.lock | 21 +++ Cargo.toml | 6 + clients/ereporter-client/Cargo.toml | 19 +++ clients/ereporter-client/src/lib.rs | 28 ++++ dev-tools/openapi-manager/Cargo.toml | 1 + dev-tools/openapi-manager/src/spec.rs | 9 ++ ereporter/Cargo.toml | 10 ++ ereporter/api/Cargo.toml | 16 +++ ereporter/api/src/lib.rs | 66 +++++++++ ereporter/src/lib.rs | 6 + openapi/ereporter.json | 194 ++++++++++++++++++++++++++ 11 files changed, 376 insertions(+) create mode 100644 clients/ereporter-client/Cargo.toml create mode 100644 clients/ereporter-client/src/lib.rs create mode 100644 ereporter/Cargo.toml create mode 100644 ereporter/api/Cargo.toml create mode 100644 ereporter/api/src/lib.rs create mode 100644 ereporter/src/lib.rs create mode 100644 openapi/ereporter.json diff --git a/Cargo.lock b/Cargo.lock index 5d1c8e90d62..98ef0916c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2717,6 +2717,26 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "ereporter" +version = "0.1.0" +dependencies = [ + "ereporter-api", +] + +[[package]] +name = "ereporter-api" +version = "0.1.0" +dependencies = [ + "chrono", + "dropshot", + "omicron-common", + "omicron-workspace-hack", + "schemars", + "serde", + "uuid", +] + [[package]] name = "errno" version = "0.3.9" @@ -7152,6 +7172,7 @@ dependencies = [ "cockroach-admin-api", "dns-server-api", "dropshot", + "ereporter-api", "fs-err", "gateway-api", "indent_write", diff --git a/Cargo.toml b/Cargo.toml index 7d096c67878..292b31d296a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ members = [ "dns-server", "dns-server-api", "end-to-end-tests", + "ereporter", + "ereporter/api", "gateway", "gateway-api", "gateway-cli", @@ -164,6 +166,8 @@ default-members = [ "dns-server", "dns-server-api", "end-to-end-tests", + "ereporter", + "ereporter/api", "gateway", "gateway-api", "gateway-cli", @@ -346,6 +350,8 @@ dpd-client = { path = "clients/dpd-client" } dropshot = { version = "0.12.0", features = [ "usdt-probes" ] } dyn-clone = "1.0.17" either = "1.13.0" +ereporter-api = { path = "ereporter/api" } +ereporter = { path = "ereporter" } expectorate = "1.1.0" fatfs = "0.3.6" filetime = "0.2.25" diff --git a/clients/ereporter-client/Cargo.toml b/clients/ereporter-client/Cargo.toml new file mode 100644 index 00000000000..52cc0073da7 --- /dev/null +++ b/clients/ereporter-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "ereporter-client" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +chrono.workspace = true +futures.workspace = true +omicron-common.workspace = true +progenitor.workspace = true +reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } +serde.workspace = true +slog.workspace = true +uuid.workspace = true +omicron-workspace-hack.workspace = true diff --git a/clients/ereporter-client/src/lib.rs b/clients/ereporter-client/src/lib.rs new file mode 100644 index 00000000000..35da6012b6f --- /dev/null +++ b/clients/ereporter-client/src/lib.rs @@ -0,0 +1,28 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2021 Oxide Computer Company + +//! Interface for API requests to an ereporter. + +progenitor::generate_api!( + spec = "../../openapi/ereporter.json", + inner_type = slog::Logger, + pre_hook = (|log: &slog::Logger, request: &reqwest::Request| { + slog::debug!(log, "client request"; + "method" => %request.method(), + "uri" => %request.url(), + "body" => ?&request.body(), + ); + }), + post_hook = (|log: &slog::Logger, result: &Result<_, _>| { + slog::debug!(log, "client response"; "result" => ?result); + }), +); + +impl omicron_common::api::external::ClientError for types::Error { + fn message(&self) -> String { + self.message.clone() + } +} diff --git a/dev-tools/openapi-manager/Cargo.toml b/dev-tools/openapi-manager/Cargo.toml index 211e1340161..b751b2aec40 100644 --- a/dev-tools/openapi-manager/Cargo.toml +++ b/dev-tools/openapi-manager/Cargo.toml @@ -17,6 +17,7 @@ cockroach-admin-api.workspace = true clap.workspace = true dns-server-api.workspace = true dropshot.workspace = true +ereporter-api.workspace = true fs-err.workspace = true gateway-api.workspace = true indent_write.workspace = true diff --git a/dev-tools/openapi-manager/src/spec.rs b/dev-tools/openapi-manager/src/spec.rs index 7d734218fc8..545e20ab325 100644 --- a/dev-tools/openapi-manager/src/spec.rs +++ b/dev-tools/openapi-manager/src/spec.rs @@ -129,6 +129,15 @@ pub fn all_apis() -> Vec { filename: "wicketd.json", extra_validation: None, }, + ApiSpec { + title: "Oxide Error Reporter API", + version: "0.0.1", + description: "API for collecting error reports from a reporter.", + boundary: ApiBoundary::Internal, + api_description: ereporter_api::ereporter_api_mod::stub_api_description, + filename: "ereporter.json", + extra_validation: None, + }, // Add your APIs here! Please keep this list sorted by filename. ] } diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml new file mode 100644 index 00000000000..a9e7a4c3998 --- /dev/null +++ b/ereporter/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ereporter" +version = "0.1.0" +edition = "2021" + +[dependencies] +ereporter-api = { workspace = true } + +[lints] +workspace = true diff --git a/ereporter/api/Cargo.toml b/ereporter/api/Cargo.toml new file mode 100644 index 00000000000..23df8a790ca --- /dev/null +++ b/ereporter/api/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ereporter-api" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +chrono.workspace = true +dropshot.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +schemars.workspace = true +serde.workspace = true +uuid.workspace = true diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs new file mode 100644 index 00000000000..85039be1ac6 --- /dev/null +++ b/ereporter/api/src/lib.rs @@ -0,0 +1,66 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use chrono::{DateTime, Utc}; +use dropshot::{ + EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk, + PaginationParams, Query, RequestContext, ResultsPage, +}; +use omicron_common::api::external::Generation; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use uuid::Uuid; + +#[dropshot::api_description] +pub trait EreporterApi { + type Context; + + /// Get a list of ereports, paginated by sequence number. + #[endpoint { + method = GET, + path = "/ereports" + }] + async fn ereports_list( + request_context: RequestContext, + query: Query>, + ) -> Result>, HttpError>; + + /// Informs the reporter that it may freely discard ereports with sequence + /// numbers less than or equal to `seq`. + #[endpoint { + method = DELETE, + path = "/ereports/{seq}" + }] + async fn ereports_truncate( + request_context: RequestContext, + path: dropshot::Path, + ) -> Result; +} + +/// Path parameter to select a sequence number for +/// [`EreporterApi::ereports_truncate`]. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct SeqPathParam { + pub seq: Generation, +} + +/// An error report. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct Ereport { + /// UUID of the entity that generated this ereport. + pub reporter_id: Uuid, + /// The ereport's sequence number, unique with regards to ereports generated + /// by the entity with the `reporter_id`. + pub seq: Generation, + /// A string indicating the kind of ereport. + /// + /// This may be used by diagnosis engines as an indication of what `facts` + /// to expect. + pub class: String, + /// The UTC timestamp when this ereport was observed, as determined by the reporter. + pub time_created: DateTime, + /// The set of facts (key-value pairs) associated with this ereport. + pub facts: BTreeMap, +} diff --git a/ereporter/src/lib.rs b/ereporter/src/lib.rs new file mode 100644 index 00000000000..e4694062382 --- /dev/null +++ b/ereporter/src/lib.rs @@ -0,0 +1,6 @@ +use std::collections::BTreeMap; + +pub struct Ereport { + class: String, + facts: BTreeMap, +} diff --git a/openapi/ereporter.json b/openapi/ereporter.json new file mode 100644 index 00000000000..b160d577fae --- /dev/null +++ b/openapi/ereporter.json @@ -0,0 +1,194 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "Oxide Error Reporter API", + "description": "API for collecting error reports from a reporter.", + "contact": { + "url": "https://oxide.computer", + "email": "api@oxide.computer" + }, + "version": "0.0.1" + }, + "paths": { + "/ereports": { + "get": { + "summary": "Get a list of ereports , paginated by sequence number.", + "operationId": "ereports_list", + "parameters": [ + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreportResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": { + "required": [] + } + } + }, + "/ereports/{seq}": { + "delete": { + "summary": "Truncate ereports with sequence numbers less than or equal to `seq`.", + "operationId": "ereports_truncate", + "parameters": [ + { + "in": "path", + "name": "seq", + "required": true, + "schema": { + "$ref": "#/components/schemas/Generation" + } + } + ], + "responses": { + "204": { + "description": "successful deletion" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + } + }, + "components": { + "schemas": { + "Ereport": { + "description": "An error report.", + "type": "object", + "properties": { + "class": { + "description": "A string indicating the kind of ereport.\n\nThis may be used by diagnosis engines as an indication of what `facts` to expect.", + "type": "string" + }, + "facts": { + "description": "The set of facts (key-value pairs) associated with this ereport.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "reporter_id": { + "description": "UUID of the entity that generated this ereport.", + "type": "string", + "format": "uuid" + }, + "seq": { + "description": "The ereport's sequence number, unique with regards to ereports generated by the entity with the `reporter_id`.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + }, + "time_created": { + "description": "The UTC timestamp when this ereport was observed, as determined by the reporter.", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "class", + "facts", + "reporter_id", + "seq", + "time_created" + ] + }, + "EreportResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/Ereport" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, + "Error": { + "description": "Error information from a response.", + "type": "object", + "properties": { + "error_code": { + "type": "string" + }, + "message": { + "type": "string" + }, + "request_id": { + "type": "string" + } + }, + "required": [ + "message", + "request_id" + ] + }, + "Generation": { + "description": "Generation numbers stored in the database, used for optimistic concurrency control", + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "responses": { + "Error": { + "description": "Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + } + } + } + } + } +} From 0a74e78f5e862abd9297564f8eb07b3c9aa3a26e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 12 Oct 2024 11:23:13 -0700 Subject: [PATCH 02/17] [ereport] sketch out reporter lib --- Cargo.lock | 8 ++ ereporter/Cargo.toml | 8 ++ ereporter/api/src/lib.rs | 4 +- ereporter/src/buffer.rs | 128 ++++++++++++++++++++++++++++++ ereporter/src/lib.rs | 85 +++++++++++++++++++- ereporter/src/server.rs | 167 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 394 insertions(+), 6 deletions(-) create mode 100644 ereporter/src/buffer.rs create mode 100644 ereporter/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 98ef0916c12..ad3e5e723ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2721,7 +2721,15 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" name = "ereporter" version = "0.1.0" dependencies = [ + "anyhow", + "chrono", + "dropshot", "ereporter-api", + "omicron-common", + "omicron-workspace-hack", + "slog", + "tokio", + "uuid", ] [[package]] diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml index a9e7a4c3998..de065e48daa 100644 --- a/ereporter/Cargo.toml +++ b/ereporter/Cargo.toml @@ -4,7 +4,15 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow.workspace = true +chrono.workspace = true +dropshot.workspace = true ereporter-api = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +uuid.workspace = true +slog.workspace = true [lints] workspace = true diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs index 85039be1ac6..f8ee95a4feb 100644 --- a/ereporter/api/src/lib.rs +++ b/ereporter/api/src/lib.rs @@ -10,7 +10,7 @@ use dropshot::{ use omicron_common::api::external::Generation; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use std::collections::HashMap; use uuid::Uuid; #[dropshot::api_description] @@ -62,5 +62,5 @@ pub struct Ereport { /// The UTC timestamp when this ereport was observed, as determined by the reporter. pub time_created: DateTime, /// The set of facts (key-value pairs) associated with this ereport. - pub facts: BTreeMap, + pub facts: HashMap, } diff --git a/ereporter/src/buffer.rs b/ereporter/src/buffer.rs new file mode 100644 index 00000000000..93e04d87e1c --- /dev/null +++ b/ereporter/src/buffer.rs @@ -0,0 +1,128 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::EreportData; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use std::collections::VecDeque; +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +pub(crate) enum ServerReq { + TruncateTo { + seq: Generation, + tx: oneshot::Sender>, + }, + List { + start_seq: Option, + limit: usize, + tx: oneshot::Sender>, + }, +} + +pub(crate) struct Buffer { + pub(crate) seq: Generation, + pub(crate) buf: VecDeque, + pub(crate) log: slog::Logger, + pub(crate) id: Uuid, + pub(crate) ereports: mpsc::Receiver, + pub(crate) requests: mpsc::Receiver, +} + +impl Buffer { + pub(crate) async fn run(mut self) { + while let Some(req) = self.requests.recv().await { + match req { + // Asked to list ereports! + ServerReq::List { start_seq, limit, tx } => { + // First, grab any new ereports and stick them in our cache. + while let Ok(ereport) = self.ereports.try_recv() { + self.push_ereport(ereport); + } + + let mut list = { + let cap = std::cmp::min(limit, self.buf.len()); + Vec::with_capacity(cap) + }; + + match start_seq { + // Start at lowest sequence number. + None => { + list.extend( + self.buf.iter().by_ref().take(limit).cloned(), + ); + } + Some(seq) => { + todo!( + "eliza: draw the rest of the pagination {seq}" + ) + } + } + slog::info!( + self.log, + "produced ereport batch from {start_seq:?}"; + "start" => ?start_seq, + "len" => list.len(), + "limit" => limit + ); + if tx.send(list).is_err() { + slog::warn!(self.log, "client canceled list request"); + } + } + ServerReq::TruncateTo { seq, tx } if seq > self.seq => { + if tx.send(Err(Error::invalid_value( + "seq", + "cannot truncate to a sequence number greater than the current maximum" + ))).is_err() { + // If the receiver no longer cares about the response to + // this request, no biggie. + slog::warn!(self.log, "client canceled truncate request"); + } + } + ServerReq::TruncateTo { seq, tx } => { + let prev_len = self.buf.len(); + self.buf.retain(|ereport| ereport.seq <= seq); + + slog::info!( + self.log, + "truncated ereports up to {seq}"; + "seq" => ?seq, + "dropped" => prev_len - self.buf.len(), + "remaining" => self.buf.len(), + ); + + if tx.send(Ok(())).is_err() { + // If the receiver no longer cares about the response to + // this request, no biggie. + slog::warn!( + self.log, + "client canceled truncate request" + ); + } + todo!() + } + } + } + + slog::info!(self.log, "server requests channel closed, shutting down"); + } + + fn push_ereport(&mut self, ereport: EreportData) { + let EreportData { facts, class, time_created } = ereport; + let seq = self.seq; + self.buf.push_back(ereporter_api::Ereport { + facts, + class, + time_created, + reporter_id: self.id, + seq, + }); + self.seq = self.seq.next(); + slog::trace!( + self.log, + "recorded ereport"; + "seq" => %seq, + ); + } +} diff --git a/ereporter/src/lib.rs b/ereporter/src/lib.rs index e4694062382..e9f544a64f5 100644 --- a/ereporter/src/lib.rs +++ b/ereporter/src/lib.rs @@ -1,6 +1,83 @@ -use std::collections::BTreeMap; +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. -pub struct Ereport { - class: String, - facts: BTreeMap, +use chrono::DateTime; +use chrono::Utc; +use std::collections::HashMap; +use tokio::sync::mpsc; + +mod buffer; +pub mod server; + +#[must_use = "an `EreportBuilder` does nothing unless `submit()` is called"] +pub struct EreportBuilder { + data: EreportData, + permit: mpsc::OwnedPermit, +} + +impl EreportBuilder { + /// Reserve capacity for at least `facts` facts. + pub fn reserve(&mut self, facts: usize) { + self.data.facts.reserve(facts); + } + + pub fn time_created( + &mut self, + time_created: chrono::DateTime, + ) -> &mut Self { + self.data.time_created = time_created; + self + } + + pub fn fact( + &mut self, + name: impl ToString, + value: impl ToString, + ) -> Option { + self.data.facts.insert(name.to_string(), value.to_string()) + } + + pub fn facts( + &mut self, + facts: impl Iterator, + ) { + self.data + .facts + .extend(facts.map(|(k, v)| (k.to_string(), v.to_string()))) + } + + pub fn submit(self) { + self.permit.send(self.data); + } +} + +/// A reporter handle used to generate ereports. +#[derive(Clone, Debug)] +pub struct Reporter(pub(crate) tokio::sync::mpsc::Sender); + +impl Reporter { + /// Begin constructing a new ereport, returning an [`EreportBuilder`]. + pub async fn report( + &mut self, + class: impl ToString, + ) -> Result { + let time_created = Utc::now(); + let permit = self.0.clone().reserve_owned().await.map_err(|_| ())?; + Ok(EreportBuilder { + data: EreportData { + class: class.to_string(), + time_created, + facts: HashMap::new(), + }, + permit, + }) + } +} + +/// An ereport. +pub(crate) struct EreportData { + pub(crate) class: String, + pub(crate) facts: HashMap, + pub(crate) time_created: DateTime, } diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs new file mode 100644 index 00000000000..d43623273a2 --- /dev/null +++ b/ereporter/src/server.rs @@ -0,0 +1,167 @@ +use crate::buffer; +use crate::EreportData; +use dropshot::{ + ConfigLogging, EmptyScanParams, HttpError, HttpResponseDeleted, + HttpResponseOk, PaginationParams, Query, RequestContext, ResultsPage, + WhichPage, +}; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use slog::Logger; +use std::collections::VecDeque; +use std::net::SocketAddr; +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +pub struct ServerStarter { + config: Config, + ctx: ServerContext, + ereports: mpsc::Receiver, + requests: mpsc::Receiver, +} + +pub struct RunningServer { + // Handle to the buffer task. + buffer_task: tokio::task::JoinHandle<()>, + // TODO(eliza): hang onto the running dropshot stuff. +} + +pub struct ReporterIdentity { + /// The UUID of the reporter endpoint. + pub id: Uuid, + /// The socket address on which to bind the reporter endpoint. + pub addr: SocketAddr, +} + +pub struct Config { + pub reporter: ReporterIdentity, + /// The address at which we attempt to register as a producer. + /// + /// If the address is not provided, the address of Nexus will be resolved + /// using internal DNS, based on the local address of the server being + /// configured. + pub registration_address: Option, + /// The maximum size of Dropshot requests. + pub request_body_max_bytes: usize, + /// The maximum number of ereports to buffer before exerting backpressure on producers. + pub buffer_capacity: usize, + pub request_channel_capacity: usize, + /// The logging configuration or actual logger used to emit logs. + pub log: LogConfig, +} + +/// Either configuration for building a logger, or an actual logger already +/// instantiated. +/// +/// This can be used to start a [`Server`] with a new logger or a child of a +/// parent logger if desired. +#[derive(Debug, Clone)] +pub enum LogConfig { + /// Configuration for building a new logger. + Config(ConfigLogging), + /// An explicit logger to use. + Logger(Logger), +} + +#[derive(Clone)] +struct ServerContext { + tx: mpsc::Sender, +} + +struct EreporterApiImpl; + +impl ServerStarter { + pub fn new(config: Config) -> (crate::Reporter, Self) { + let (ereport_tx, ereports) = mpsc::channel(config.buffer_capacity); + let (tx, requests) = mpsc::channel(128); + let this = + Self { config, ereports, ctx: ServerContext { tx }, requests }; + (crate::Reporter(ereport_tx), this) + } + + pub async fn start(self) -> anyhow::Result { + let Self { config, ctx, ereports, requests } = self; + let log = todo!("eliza: log config"); + // TODO: + // 1. discover nexus + // 2. register server and recover sequence number + let seq = todo!("eliza: discover sequence number"); + // 3. spawn buffer task + let buffer_task = tokio::spawn( + crate::buffer::Buffer { + seq, + buf: VecDeque::with_capacity(config.buffer_capacity), + log, + id: config.reporter.id, + ereports, + requests, + } + .run(), + ); + // 4. spawn dropshot server + todo!("eliza: dropshot server"); + + Ok(RunningServer { buffer_task }) + } +} + +impl ereporter_api::EreporterApi for EreporterApiImpl { + type Context = ServerContext; + + async fn ereports_list( + reqctx: RequestContext, + query: Query>, + ) -> Result>, HttpError> + { + let ctx = reqctx.context(); + + let pagination = query.into_inner(); + let limit = reqctx.page_limit(&pagination)?.get() as usize; + + let start_seq = match pagination.page { + WhichPage::First(..) => None, + WhichPage::Next(seq) => Some(seq), + }; + + slog::debug!( + reqctx.log, + "received ereport list request"; + "start_seq" => ?start_seq, + "limit" => limit, + ); + let (tx, rx) = oneshot::channel(); + ctx.tx + .send(buffer::ServerReq::List { start_seq, limit, tx }) + .await + .map_err(|_| Error::internal_error("server shutting down"))?; + let list = rx.await.map_err(|_| { + // This shouldn't happen! + Error::internal_error("buffer canceled request rudely!") + })?; + let page = ResultsPage::new( + list, + &EmptyScanParams {}, + |ereport: &ereporter_api::Ereport, _| ereport.seq, + )?; + Ok(HttpResponseOk(page)) + } + + async fn ereports_truncate( + reqctx: RequestContext, + path: dropshot::Path, + ) -> Result { + let ctx = reqctx.context(); + let ereporter_api::SeqPathParam { seq } = path.into_inner(); + slog::debug!(reqctx.log, "received ereport truncate request"; "seq" => ?seq); + let (tx, rx) = oneshot::channel(); + ctx.tx + .send(buffer::ServerReq::TruncateTo { seq, tx }) + .await + .map_err(|_| Error::internal_error("server shutting down"))?; + rx.await.map_err(|_| { + // This shouldn't happen! + Error::internal_error("buffer canceled request rudely!") + })??; + Ok(HttpResponseDeleted()) + } +} From a6dd46d3eee7cd7f3e938e4acaf6a6066e714584 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 12 Oct 2024 12:41:38 -0700 Subject: [PATCH 03/17] [ereport] a bunch more machinery --- Cargo.lock | 4 + ereporter/Cargo.toml | 8 +- ereporter/src/server.rs | 203 +++++++++++++++++++-- nexus/internal-api/src/lib.rs | 29 ++- nexus/src/internal_api/http_entrypoints.rs | 20 ++ nexus/types/src/internal_api/params.rs | 11 ++ openapi/ereporter.json | 5 +- openapi/nexus-internal.json | 71 +++++++ 8 files changed, 326 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad3e5e723ef..b5c9616095c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2725,9 +2725,13 @@ dependencies = [ "chrono", "dropshot", "ereporter-api", + "internal-dns", + "nexus-client", "omicron-common", "omicron-workspace-hack", + "serde", "slog", + "slog-dtrace", "tokio", "uuid", ] diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml index de065e48daa..09b7bb4a91d 100644 --- a/ereporter/Cargo.toml +++ b/ereporter/Cargo.toml @@ -7,12 +7,16 @@ edition = "2021" anyhow.workspace = true chrono.workspace = true dropshot.workspace = true -ereporter-api = { workspace = true } -tokio = { workspace = true, features = ["sync"] } +ereporter-api.workspace = true +internal-dns.workspace = true +nexus-client.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true uuid.workspace = true +serde.workspace = true slog.workspace = true +slog-dtrace.workspace = true +tokio = { workspace = true, features = ["sync"] } [lints] workspace = true diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs index d43623273a2..25213f530e6 100644 --- a/ereporter/src/server.rs +++ b/ereporter/src/server.rs @@ -1,23 +1,40 @@ use crate::buffer; use crate::EreportData; use dropshot::{ - ConfigLogging, EmptyScanParams, HttpError, HttpResponseDeleted, - HttpResponseOk, PaginationParams, Query, RequestContext, ResultsPage, - WhichPage, + EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk, + PaginationParams, Query, RequestContext, ResultsPage, WhichPage, }; +use internal_dns::resolver::Resolver; +use internal_dns::ServiceName; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::backoff; +use omicron_common::backoff::BackoffError; +use omicron_common::FileKv; +use slog::debug; +use slog::error; +use slog::warn; +use slog::Drain; use slog::Logger; use std::collections::VecDeque; +use std::net::IpAddr; use std::net::SocketAddr; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +// Our public interface depends directly or indirectly on these types; we +// export them so that consumers need not depend on dropshot themselves and +// to simplify how we stage incompatible upgrades. +pub use dropshot::ConfigLogging; +pub use dropshot::ConfigLoggingIfExists; +pub use dropshot::ConfigLoggingLevel; + pub struct ServerStarter { config: Config, ctx: ServerContext, ereports: mpsc::Receiver, requests: mpsc::Receiver, + dns_resolver: Option, } pub struct RunningServer { @@ -26,20 +43,17 @@ pub struct RunningServer { // TODO(eliza): hang onto the running dropshot stuff. } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct ReporterIdentity { - /// The UUID of the reporter endpoint. + /// UUID of the reporter. pub id: Uuid, - /// The socket address on which to bind the reporter endpoint. - pub addr: SocketAddr, + /// The address to listen for ereport collection requests on. + pub address: SocketAddr, } pub struct Config { pub reporter: ReporterIdentity, - /// The address at which we attempt to register as a producer. - /// - /// If the address is not provided, the address of Nexus will be resolved - /// using internal DNS, based on the local address of the server being - /// configured. + /// How to discover the Nexus API to register the reporter. pub registration_address: Option, /// The maximum size of Dropshot requests. pub request_body_max_bytes: usize, @@ -63,6 +77,15 @@ pub enum LogConfig { Logger(Logger), } +/// How to discover Nexus' IP for registration. +#[derive(Clone)] +enum NexusDiscovery { + /// Use the provided socket address for the Nexus API. + Addr(SocketAddr), + /// Discover Nexus from internal DNS + Dns(Resolver), +} + #[derive(Clone)] struct ServerContext { tx: mpsc::Sender, @@ -74,18 +97,93 @@ impl ServerStarter { pub fn new(config: Config) -> (crate::Reporter, Self) { let (ereport_tx, ereports) = mpsc::channel(config.buffer_capacity); let (tx, requests) = mpsc::channel(128); - let this = - Self { config, ereports, ctx: ServerContext { tx }, requests }; + let this = Self { + config, + ereports, + ctx: ServerContext { tx }, + requests, + dns_resolver: None, + }; (crate::Reporter(ereport_tx), this) } + /// Use the provided internal DNS resolver rather than creating a new one. + pub fn with_resolver(self, resolver: Resolver) -> Self { + Self { dns_resolver: Some(resolver), ..self } + } + pub async fn start(self) -> anyhow::Result { - let Self { config, ctx, ereports, requests } = self; - let log = todo!("eliza: log config"); - // TODO: + let Self { config, ctx, ereports, requests, dns_resolver } = self; + let log = { + let base_logger = match config.log { + LogConfig::Config(conf) => conf.to_logger("ereporter")?, + LogConfig::Logger(log) => log.clone(), + }; + let (drain, registration) = slog_dtrace::with_drain(base_logger); + let log = Logger::root(drain.fuse(), slog::o!(FileKv)); + if let slog_dtrace::ProbeRegistration::Failed(e) = registration { + error!(log, "failed to register DTrace probes: {e}",); + } else { + debug!(log, "registered DTrace probes"); + } + log + }; + // 1. discover nexus + + // Create a resolver if needed, or use Nexus's address directly. + let discovery = match (config.registration_address, dns_resolver) { + (Some(addr), _) => { + if addr.port() == 0 { + anyhow::bail!( + "Nexus registration address must have a real port" + ); + } + debug!( + log, + "Nexus IP provided explicitly, registering with it"; + "addr" => %addr, + ); + NexusDiscovery::Addr(addr) + } + (None, None) => { + // Ensure that we've been provided with an IPv6 address if we're + // using DNS to resolve Nexus. That's required because we need + // to use the /48 to find our DNS server itself. + let IpAddr::V6(our_addr) = config.reporter.address.ip() else { + anyhow::bail!("server address must be IPv6 in order to resolve Nexus from DNS") + }; + debug!( + log, + "Nexus IP not provided, will create an internal \ + DNS resolver to resolve it" + ); + + let resolver = Resolver::new_from_ip( + log.new(slog::o!("component" => "internal-dns-resolver")), + our_addr, + )?; + NexusDiscovery::Dns(resolver) + } + (None, Some(resolver)) => { + debug!( + log, + "Nexus IP not provided, will use DNS to resolve it" + ); + NexusDiscovery::Dns(resolver) + } + }; + let nexus_addr = discovery.nexus_addr(&log).await; + let nexus_client = nexus_client::Client::new( + &format!("http://{nexus_addr}"), + log.clone(), + ); + // 2. register server and recover sequence number - let seq = todo!("eliza: discover sequence number"); + // TODO(eliza): perhaps registrations should be periodically refreshed? + let nexus_client::types::EreporterRegistered { seq } = + config.reporter.register(&log, &nexus_client).await; + // 3. spawn buffer task let buffer_task = tokio::spawn( crate::buffer::Buffer { @@ -98,13 +196,82 @@ impl ServerStarter { } .run(), ); + // 4. spawn dropshot server - todo!("eliza: dropshot server"); + // TODO(eliza): actually do that Ok(RunningServer { buffer_task }) } } +impl NexusDiscovery { + async fn nexus_addr(&self, log: &Logger) -> SocketAddr { + match self { + Self::Addr(addr) => *addr, + Self::Dns(resolver) => { + let log_failure = |error, delay| { + warn!( + log, + "failed to lookup Nexus IP, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let do_lookup = || async { + resolver + .lookup_socket_v6(ServiceName::Nexus) + .await + .map_err(|e| BackoffError::transient(e.to_string())) + .map(Into::into) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_lookup, + log_failure, + ) + .await + .expect("Expected infinite retry loop resolving Nexus address") + } + } + } +} + +impl ReporterIdentity { + async fn register( + &self, + log: &Logger, + client: &nexus_client::Client, + ) -> nexus_client::types::EreporterRegistered { + let log_failure = |error, delay| { + warn!( + log, + "failed to register ereporter with Nexus, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let info = nexus_client::types::EreporterInfo { + address: self.address.to_string(), + reporter_id: self.id, + }; + + let do_register = || async { + client + .cpapi_ereporters_post(&info) + .await + .map(|response| response.into_inner()) + .map_err(|e| BackoffError::transient(e)) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_register, + log_failure, + ) + .await + .expect("Expected infinite retry loop registering ereporter") + } +} + impl ereporter_api::EreporterApi for EreporterApiImpl { type Context = ServerContext; diff --git a/nexus/internal-api/src/lib.rs b/nexus/internal-api/src/lib.rs index 044537946b2..e80fb80f456 100644 --- a/nexus/internal-api/src/lib.rs +++ b/nexus/internal-api/src/lib.rs @@ -20,15 +20,16 @@ use nexus_types::{ }, internal_api::{ params::{ - InstanceMigrateRequest, OximeterInfo, RackInitializationRequest, - SledAgentInfo, SwitchPutRequest, SwitchPutResponse, + EreporterInfo, InstanceMigrateRequest, OximeterInfo, + RackInitializationRequest, SledAgentInfo, SwitchPutRequest, + SwitchPutResponse, }, views::{BackgroundTask, DemoSaga, Ipv4NatEntryView, Saga}, }, }; use omicron_common::{ api::{ - external::{http_pagination::PaginatedById, Instance}, + external::{http_pagination::PaginatedById, Generation, Instance}, internal::nexus::{ DiskRuntimeState, DownstairsClientStopRequest, DownstairsClientStopped, ProducerEndpoint, @@ -530,6 +531,19 @@ pub trait NexusInternalApi { path_params: Path, query_params: Query, ) -> Result>, HttpError>; + + // Error reports + + /// Register an error reporter with Nexus, returning the next sequence + /// number for an error report from that reporter. + #[endpoint { + method = POST, + path = "/ereport/reporters", + }] + async fn cpapi_ereporters_post( + request_context: RequestContext, + identity: TypedBody, + ) -> Result, HttpError>; } /// Path parameters for Sled Agent requests (internal API) @@ -649,3 +663,12 @@ pub struct SledId { pub struct ProbePathParam { pub sled: Uuid, } + +/// Response to error reporter registration requests. +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct EreporterRegistered { + /// The starting sequence number of the next error report from this + /// reporter. If the reporter has not been seen by Nexus previously, this + /// may be 0. + pub seq: Generation, +} diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index 2829fdb2a63..5a2f0c040d3 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -30,6 +30,7 @@ use nexus_types::external_api::params::UninitializedSledId; use nexus_types::external_api::shared::ProbeInfo; use nexus_types::external_api::shared::UninitializedSled; use nexus_types::external_api::views::SledPolicy; +use nexus_types::internal_api::params::EreporterInfo; use nexus_types::internal_api::params::InstanceMigrateRequest; use nexus_types::internal_api::params::SledAgentInfo; use nexus_types::internal_api::params::SwitchPutRequest; @@ -939,4 +940,23 @@ impl NexusInternalApi for NexusInternalApiImpl { .instrument_dropshot_handler(&rqctx, handler) .await } + + async fn cpapi_ereporters_post( + rqctx: RequestContext, + _identity: TypedBody, + ) -> Result, HttpError> { + let apictx = &rqctx.context().context; + let handler = async { + // TODO(eliza): ACTUALLY IMPLEMENT THIS PART LOL + Err(HttpError::for_unavail( + Some("NOT YET IMPLEMENTED LMAO".to_string()), + "TODO eliza actually implement this part lol lmao :)" + .to_string(), + )) + }; + apictx + .internal_latencies + .instrument_dropshot_handler(&rqctx, handler) + .await + } } diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index c803f003f1d..9eba13ca009 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -214,3 +214,14 @@ pub struct InstanceMigrateRequest { /// The ID of the sled to which to migrate the target instance. pub dst_sled_id: Uuid, } + +/// Message sent by an error reporter to register itself with Nexus. +#[derive(Debug, Clone, Copy, JsonSchema, Serialize, Deserialize)] +pub struct EreporterInfo { + /// The UUID of the reporting entity. + pub reporter_id: Uuid, + + /// The address on which the error reporter server listens for ingestion + /// requests. + pub address: SocketAddr, +} diff --git a/openapi/ereporter.json b/openapi/ereporter.json index b160d577fae..81c50dc308a 100644 --- a/openapi/ereporter.json +++ b/openapi/ereporter.json @@ -12,7 +12,7 @@ "paths": { "/ereports": { "get": { - "summary": "Get a list of ereports , paginated by sequence number.", + "summary": "Get a list of ereports, paginated by sequence number.", "operationId": "ereports_list", "parameters": [ { @@ -61,7 +61,8 @@ }, "/ereports/{seq}": { "delete": { - "summary": "Truncate ereports with sequence numbers less than or equal to `seq`.", + "summary": "Informs the reporter that it may freely discard ereports with sequence", + "description": "numbers less than or equal to `seq`.", "operationId": "ereports_truncate", "parameters": [ { diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 9226b9d3190..0156e602246 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -746,6 +746,41 @@ } } }, + "/ereport/reporters": { + "post": { + "summary": "Register an error reporter with Nexus, returning the next sequence", + "description": "number for an error report from that reporter.", + "operationId": "cpapi_ereporters_post", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterInfo" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterRegistered" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/instances/{instance_id}/migrate": { "post": { "operationId": "instance_migrate", @@ -3264,6 +3299,42 @@ "secs" ] }, + "EreporterInfo": { + "description": "Message sent by an error reporter to register itself with Nexus.", + "type": "object", + "properties": { + "address": { + "description": "The address on which the error reporter server listens for ingestion requests.", + "type": "string" + }, + "reporter_id": { + "description": "The UUID of the reporting entity.", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "address", + "reporter_id" + ] + }, + "EreporterRegistered": { + "description": "Response to error reporter registration requests.", + "type": "object", + "properties": { + "seq": { + "description": "The starting sequence number of the next error report from this reporter. If the reporter has not been seen by Nexus previously, this may be 0.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + } + }, + "required": [ + "seq" + ] + }, "Error": { "description": "Error information from a response.", "type": "object", From 4a170d76f56cde942ac51a81d1e8e69f6f41b37e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 15 Oct 2024 10:02:44 -0700 Subject: [PATCH 04/17] [ereporter] data loss as a first class citizen --- ereporter/api/src/lib.rs | 34 ++++++++++++-- ereporter/src/buffer.rs | 16 ++++--- openapi/ereporter.json | 98 ++++++++++++++++++++++++++++++---------- 3 files changed, 115 insertions(+), 33 deletions(-) diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs index f8ee95a4feb..94936eb3d78 100644 --- a/ereporter/api/src/lib.rs +++ b/ereporter/api/src/lib.rs @@ -25,7 +25,7 @@ pub trait EreporterApi { async fn ereports_list( request_context: RequestContext, query: Query>, - ) -> Result>, HttpError>; + ) -> Result>, HttpError>; /// Informs the reporter that it may freely discard ereports with sequence /// numbers less than or equal to `seq`. @@ -46,14 +46,42 @@ pub struct SeqPathParam { pub seq: Generation, } -/// An error report. +/// An entry in the ereport batch returned by a reporter. #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] -pub struct Ereport { +pub struct Entry { /// UUID of the entity that generated this ereport. pub reporter_id: Uuid, /// The ereport's sequence number, unique with regards to ereports generated /// by the entity with the `reporter_id`. pub seq: Generation, + + pub value: EntryKind, +} + +/// Kinds of entry in a batch. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum EntryKind { + /// An ereport. + Ereport(Ereport), + /// Ereports may have been lost. + DataLoss { + /// The number of ereports that were discarded, if it is known. + /// + /// If ereports are dropped because a buffer has reached its capacity, + /// the reporter is strongly encouraged to attempt to count the number + /// of ereports lost. In other cases, such as a reporter crashing and + /// restarting, the reporter may not be capable of determining the + /// number of ereports that were lost, or even *if* data loss actually + /// occurred. Therefore, a `None` here indicates *possible* data loss, + /// while a `Some(u32)` indicates *known* data loss. + dropped: Option, + }, +} + +/// An error report. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct Ereport { /// A string indicating the kind of ereport. /// /// This may be used by diagnosis engines as an indication of what `facts` diff --git a/ereporter/src/buffer.rs b/ereporter/src/buffer.rs index 93e04d87e1c..28d17113b09 100644 --- a/ereporter/src/buffer.rs +++ b/ereporter/src/buffer.rs @@ -17,13 +17,13 @@ pub(crate) enum ServerReq { List { start_seq: Option, limit: usize, - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, } pub(crate) struct Buffer { pub(crate) seq: Generation, - pub(crate) buf: VecDeque, + pub(crate) buf: VecDeque, pub(crate) log: slog::Logger, pub(crate) id: Uuid, pub(crate) ereports: mpsc::Receiver, @@ -111,12 +111,14 @@ impl Buffer { fn push_ereport(&mut self, ereport: EreportData) { let EreportData { facts, class, time_created } = ereport; let seq = self.seq; - self.buf.push_back(ereporter_api::Ereport { - facts, - class, - time_created, - reporter_id: self.id, + self.buf.push_back(ereporter_api::Entry { seq, + reporter_id: self.id, + value: ereporter_api::EntryKind::Ereport(ereporter_api::Ereport { + facts, + class, + time_created, + }), }); self.seq = self.seq.next(); slog::trace!( diff --git a/openapi/ereporter.json b/openapi/ereporter.json index 81c50dc308a..624a7d9ecc3 100644 --- a/openapi/ereporter.json +++ b/openapi/ereporter.json @@ -42,7 +42,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/EreportResultsPage" + "$ref": "#/components/schemas/EntryResultsPage" } } } @@ -90,21 +90,10 @@ }, "components": { "schemas": { - "Ereport": { - "description": "An error report.", + "Entry": { + "description": "An entry in the ereport batch returned by a reporter.", "type": "object", "properties": { - "class": { - "description": "A string indicating the kind of ereport.\n\nThis may be used by diagnosis engines as an indication of what `facts` to expect.", - "type": "string" - }, - "facts": { - "description": "The set of facts (key-value pairs) associated with this ereport.", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, "reporter_id": { "description": "UUID of the entity that generated this ereport.", "type": "string", @@ -118,21 +107,57 @@ } ] }, - "time_created": { - "description": "The UTC timestamp when this ereport was observed, as determined by the reporter.", - "type": "string", - "format": "date-time" + "value": { + "$ref": "#/components/schemas/EntryKind" } }, "required": [ - "class", - "facts", "reporter_id", "seq", - "time_created" + "value" + ] + }, + "EntryKind": { + "description": "Kinds of entry in a batch.", + "oneOf": [ + { + "description": "An ereport.", + "type": "object", + "properties": { + "ereport": { + "$ref": "#/components/schemas/Ereport" + } + }, + "required": [ + "ereport" + ], + "additionalProperties": false + }, + { + "description": "Ereports may have been lost.", + "type": "object", + "properties": { + "data_loss": { + "type": "object", + "properties": { + "dropped": { + "nullable": true, + "description": "The number of ereports that were discarded, if it is known.\n\nIf ereports are dropped because a buffer has reached its capacity, the reporter is strongly encouraged to attempt to count the number of ereports lost. In other cases, such as a reporter crashing and restarting, the reporter may not be capable of determining the number of ereports that were lost, or even *if* data loss actually occurred. Therefore, a `None` here indicates *possible* data loss, while a `Some(u32)` indicates *known* data loss.", + "type": "integer", + "format": "uint32", + "minimum": 0 + } + } + } + }, + "required": [ + "data_loss" + ], + "additionalProperties": false + } ] }, - "EreportResultsPage": { + "EntryResultsPage": { "description": "A single page of results", "type": "object", "properties": { @@ -140,7 +165,7 @@ "description": "list of items on this page of results", "type": "array", "items": { - "$ref": "#/components/schemas/Ereport" + "$ref": "#/components/schemas/Entry" } }, "next_page": { @@ -153,6 +178,33 @@ "items" ] }, + "Ereport": { + "description": "An error report.", + "type": "object", + "properties": { + "class": { + "description": "A string indicating the kind of ereport.\n\nThis may be used by diagnosis engines as an indication of what `facts` to expect.", + "type": "string" + }, + "facts": { + "description": "The set of facts (key-value pairs) associated with this ereport.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "time_created": { + "description": "The UTC timestamp when this ereport was observed, as determined by the reporter.", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "class", + "facts", + "time_created" + ] + }, "Error": { "description": "Error information from a response.", "type": "object", From 673259fb6ff01c76f419079ff0b4e827c249f100 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 15 Oct 2024 13:35:08 -0700 Subject: [PATCH 05/17] allow having multiple UUIDs --- ereporter/api/src/lib.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs index 94936eb3d78..0ec3858f6f0 100644 --- a/ereporter/api/src/lib.rs +++ b/ereporter/api/src/lib.rs @@ -20,29 +20,36 @@ pub trait EreporterApi { /// Get a list of ereports, paginated by sequence number. #[endpoint { method = GET, - path = "/ereports" + path = "/ereports/{reporter_id}" }] async fn ereports_list( request_context: RequestContext, + path: dropshot::Path, query: Query>, ) -> Result>, HttpError>; - /// Informs the reporter that it may freely discard ereports with sequence - /// numbers less than or equal to `seq`. + /// Informs the reporter with the given UUID that it may freely discard + /// ereports with sequence numbers less than or equal to `seq`. #[endpoint { method = DELETE, - path = "/ereports/{seq}" + path = "/ereports/{reporter_id}/{seq}" }] async fn ereports_truncate( request_context: RequestContext, - path: dropshot::Path, + path: dropshot::Path, ) -> Result; } -/// Path parameter to select a sequence number for -/// [`EreporterApi::ereports_truncate`]. +/// Path parameters to the [`EreporterAPi::ereports_list`] endpoint. #[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] -pub struct SeqPathParam { +pub struct ListPathParams { + pub reporter_id: Uuid, +} + +/// Path parameters to the [`EreporterApi::ereports_truncate`] endpoint. +#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] +pub struct TruncatePathParams { + pub reporter_id: Uuid, pub seq: Generation, } From c96876d0ea89515981f280a22220b09b2b8f9c73 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 16 Oct 2024 09:40:49 -0700 Subject: [PATCH 06/17] [ereport] rename truncate to acknowledge thanks @cbiffle for the suggestion --- ereporter/api/src/lib.rs | 18 +++++++++++------- openapi/ereporter.json | 30 ++++++++++++++++++++++++------ 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs index 0ec3858f6f0..1ff5c1c43e2 100644 --- a/ereporter/api/src/lib.rs +++ b/ereporter/api/src/lib.rs @@ -17,7 +17,7 @@ use uuid::Uuid; pub trait EreporterApi { type Context; - /// Get a list of ereports, paginated by sequence number. + /// Get a list of ereports from `reporter_id`, paginated by sequence number. #[endpoint { method = GET, path = "/ereports/{reporter_id}" @@ -28,15 +28,19 @@ pub trait EreporterApi { query: Query>, ) -> Result>, HttpError>; - /// Informs the reporter with the given UUID that it may freely discard - /// ereports with sequence numbers less than or equal to `seq`. + /// Informs the reporter with the given `reporter_id` that its ereports up + /// to the sequence number `seq` have been successfully ingested into + /// persistant storage. + /// + /// The reporter may now freely discard ereports with sequence numbers less + /// than or equal to `seq`. #[endpoint { method = DELETE, path = "/ereports/{reporter_id}/{seq}" }] - async fn ereports_truncate( + async fn ereports_acknowledge( request_context: RequestContext, - path: dropshot::Path, + path: dropshot::Path, ) -> Result; } @@ -46,9 +50,9 @@ pub struct ListPathParams { pub reporter_id: Uuid, } -/// Path parameters to the [`EreporterApi::ereports_truncate`] endpoint. +/// Path parameters to the [`EreporterApi::ereports_acknowledge`] endpoint. #[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] -pub struct TruncatePathParams { +pub struct AcknowledgePathParams { pub reporter_id: Uuid, pub seq: Generation, } diff --git a/openapi/ereporter.json b/openapi/ereporter.json index 624a7d9ecc3..5b0c838c47a 100644 --- a/openapi/ereporter.json +++ b/openapi/ereporter.json @@ -10,11 +10,20 @@ "version": "0.0.1" }, "paths": { - "/ereports": { + "/ereports/{reporter_id}": { "get": { - "summary": "Get a list of ereports, paginated by sequence number.", + "summary": "Get a list of ereports from `reporter_id`, paginated by sequence number.", "operationId": "ereports_list", "parameters": [ + { + "in": "path", + "name": "reporter_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, { "in": "query", "name": "limit", @@ -59,12 +68,21 @@ } } }, - "/ereports/{seq}": { + "/ereports/{reporter_id}/{seq}": { "delete": { - "summary": "Informs the reporter that it may freely discard ereports with sequence", - "description": "numbers less than or equal to `seq`.", - "operationId": "ereports_truncate", + "summary": "Informs the reporter with the given `reporter_id` that its ereports up", + "description": "to the sequence number `seq` have been successfully ingested into persistant storage.\n\nThe reporter may now freely discard ereports with sequence numbers less than or equal to `seq`.", + "operationId": "ereports_acknowledge", "parameters": [ + { + "in": "path", + "name": "reporter_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, { "in": "path", "name": "seq", From 3d940578d65203ccef056e366443ed49ba5979f0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 16 Oct 2024 12:30:47 -0700 Subject: [PATCH 07/17] [ereport] redesign registry to support multiple producers --- Cargo.lock | 2 + ereporter/Cargo.toml | 2 + ereporter/src/buffer.rs | 63 +++++- ereporter/src/lib.rs | 4 +- ereporter/src/proxy.rs | 0 ereporter/src/registry.rs | 135 +++++++++++++ ereporter/src/server.rs | 392 +++++++++++++++++--------------------- 7 files changed, 370 insertions(+), 228 deletions(-) create mode 100644 ereporter/src/proxy.rs create mode 100644 ereporter/src/registry.rs diff --git a/Cargo.lock b/Cargo.lock index b5c9616095c..c15f9faf0a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,6 +2732,8 @@ dependencies = [ "serde", "slog", "slog-dtrace", + "slog-error-chain", + "thiserror", "tokio", "uuid", ] diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml index 09b7bb4a91d..04f3376dc63 100644 --- a/ereporter/Cargo.toml +++ b/ereporter/Cargo.toml @@ -15,7 +15,9 @@ omicron-workspace-hack.workspace = true uuid.workspace = true serde.workspace = true slog.workspace = true +slog-error-chain.workspace = true slog-dtrace.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = ["sync"] } [lints] diff --git a/ereporter/src/buffer.rs b/ereporter/src/buffer.rs index 28d17113b09..217f85fba36 100644 --- a/ereporter/src/buffer.rs +++ b/ereporter/src/buffer.rs @@ -2,11 +2,12 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use crate::server; use crate::EreportData; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use std::collections::VecDeque; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use uuid::Uuid; pub(crate) enum ServerReq { @@ -21,16 +22,58 @@ pub(crate) enum ServerReq { }, } -pub(crate) struct Buffer { - pub(crate) seq: Generation, - pub(crate) buf: VecDeque, - pub(crate) log: slog::Logger, - pub(crate) id: Uuid, - pub(crate) ereports: mpsc::Receiver, - pub(crate) requests: mpsc::Receiver, +pub(crate) struct BufferWorker { + seq: Generation, + buf: VecDeque, + log: slog::Logger, + id: Uuid, + ereports: mpsc::Receiver, + requests: mpsc::Receiver, } -impl Buffer { +#[derive(Debug)] +pub(crate) struct Handle { + pub(crate) requests: mpsc::Sender, + pub(crate) ereports: mpsc::Sender, + pub(crate) task: tokio::task::JoinHandle<()>, +} + +impl BufferWorker { + pub(crate) fn spawn( + id: Uuid, + log: &slog::Logger, + buffer_capacity: usize, + mut server: watch::Receiver>, + ) -> Handle { + let (requests_tx, requests) = mpsc::channel(128); + let (ereports_tx, ereports) = mpsc::channel(buffer_capacity); + let log = log.new(slog::o!("reporter_id" => id.to_string())); + let task = tokio::task::spawn(async move { + // Wait for the server to come up, and then register the reporter. + let seq = loop { + let state = server.borrow_and_update().as_ref().cloned(); + if let Some(server) = state { + break server.register_reporter(&log, id).await; + } + if server.changed().await.is_err() { + slog::warn!(log, "server disappeared surprisingly before we could register the reporter!"); + return; + } + }; + // Start running the buffer worker. + let worker = Self { + seq, + buf: VecDeque::with_capacity(buffer_capacity), + log, + id, + ereports, + requests, + }; + worker.run().await + }); + Handle { ereports: ereports_tx, requests: requests_tx, task } + } + pub(crate) async fn run(mut self) { while let Some(req) = self.requests.recv().await { match req { @@ -120,7 +163,7 @@ impl Buffer { time_created, }), }); - self.seq = self.seq.next(); + self.seq = seq.next(); slog::trace!( self.log, "recorded ereport"; diff --git a/ereporter/src/lib.rs b/ereporter/src/lib.rs index e9f544a64f5..b43547ae1b5 100644 --- a/ereporter/src/lib.rs +++ b/ereporter/src/lib.rs @@ -8,6 +8,8 @@ use std::collections::HashMap; use tokio::sync::mpsc; mod buffer; +pub mod proxy; +pub mod registry; pub mod server; #[must_use = "an `EreportBuilder` does nothing unless `submit()` is called"] @@ -54,7 +56,7 @@ impl EreportBuilder { /// A reporter handle used to generate ereports. #[derive(Clone, Debug)] -pub struct Reporter(pub(crate) tokio::sync::mpsc::Sender); +pub struct Reporter(pub(crate) mpsc::Sender); impl Reporter { /// Begin constructing a new ereport, returning an [`EreportBuilder`]. diff --git a/ereporter/src/proxy.rs b/ereporter/src/proxy.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ereporter/src/registry.rs b/ereporter/src/registry.rs new file mode 100644 index 00000000000..5b5d874f230 --- /dev/null +++ b/ereporter/src/registry.rs @@ -0,0 +1,135 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::buffer; +use crate::server; +use crate::Reporter; +use omicron_common::FileKv; +use slog::debug; +use slog::error; +use slog::Drain; +use slog::Logger; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tokio::sync::mpsc; +use tokio::sync::watch; +use uuid::Uuid; + +// Our public interface depends directly or indirectly on these types; we +// export them so that consumers need not depend on dropshot themselves and +// to simplify how we stage incompatible upgrades. +pub use dropshot::ConfigLogging; +pub use dropshot::ConfigLoggingIfExists; +pub use dropshot::ConfigLoggingLevel; + +#[derive(Clone, Debug)] +pub struct ReporterRegistry(pub(crate) Arc); + +/// Either configuration for building a logger, or an actual logger already +/// instantiated. +/// +/// This can be used to start a [`Server`] with a new logger or a child of a +/// parent logger if desired. +#[derive(Debug, Clone)] +pub enum LogConfig { + /// Configuration for building a new logger. + Config(ConfigLogging), + /// An explicit logger to use. + Logger(slog::Logger), +} + +#[derive(Debug)] +pub(crate) struct RegistryInner { + reporters: RwLock>, + buffer_capacity: usize, + // Used for registering reporters as they are inserted. + pub(crate) log: slog::Logger, + pub(crate) server_tx: watch::Sender>, + server_state: watch::Receiver>, +} + +impl ReporterRegistry { + pub fn new(log: LogConfig, buffer_capacity: usize) -> anyhow::Result { + let log = { + let base_logger = match log { + LogConfig::Config(conf) => conf.to_logger("ereporter")?, + LogConfig::Logger(log) => log.clone(), + }; + let (drain, registration) = slog_dtrace::with_drain(base_logger); + let log = Logger::root(drain.fuse(), slog::o!(FileKv)); + if let slog_dtrace::ProbeRegistration::Failed(e) = registration { + error!(log, "failed to register DTrace probes: {e}",); + } else { + debug!(log, "registered DTrace probes"); + } + log + }; + + anyhow::ensure!( + buffer_capacity > 0, + "a 0-capacity ereport buffer is nonsensical" + ); + + let (server_tx, server_state) = tokio::sync::watch::channel(None); + Ok(Self(Arc::new(RegistryInner { + reporters: RwLock::new(HashMap::new()), + buffer_capacity, + log, + server_tx, + server_state, + }))) + } + + pub fn register_reporter(&self, id: Uuid) -> Reporter { + let mut reporters = self.0.reporters.write().unwrap(); + let ereports = reporters + .entry(id) + .or_insert_with(|| { + buffer::BufferWorker::spawn( + id, + &self.0.log, + self.0.buffer_capacity, + self.0.server_state.clone(), + ) + }) + .ereports + .clone(); + Reporter(ereports) + } + + pub fn register_proxy(&self, _id: Uuid, _proxy: ()) { + unimplemented!( + "TODO(eliza): this will eventually take a trait representing how \ + to proxy requests to a non-local reporter, for use by e.g. MGS", + ); + } + + pub(crate) fn get_worker( + &self, + id: &Uuid, + ) -> Option> { + self.0 + .reporters + .read() + .unwrap() + .get(id) + .map(|handle| &handle.requests) + .cloned() + } +} + +impl Drop for RegistryInner { + // Abbort all spawned buffer tasks when the reporter registry is dropped. + fn drop(&mut self) { + let reporters = self + .reporters + .get_mut() + // if the lock is poisoned, don't panic, because we are in a `Drop` + // impl and that would cause a double-panic. + .unwrap_or_else(|e| e.into_inner()); + for (_, reporter) in reporters.drain() { + reporter.task.abort(); + } + } +} diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs index 25213f530e6..c545fcae48e 100644 --- a/ereporter/src/server.rs +++ b/ereporter/src/server.rs @@ -1,138 +1,77 @@ use crate::buffer; -use crate::EreportData; +use crate::registry::ReporterRegistry; use dropshot::{ - EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk, - PaginationParams, Query, RequestContext, ResultsPage, WhichPage, + ConfigDropshot, EmptyScanParams, HttpError, HttpResponseDeleted, + HttpResponseOk, PaginationParams, Path, Query, RequestContext, ResultsPage, + WhichPage, }; +use ereporter_api::ListPathParams; use internal_dns::resolver::Resolver; use internal_dns::ServiceName; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::backoff; use omicron_common::backoff::BackoffError; -use omicron_common::FileKv; use slog::debug; -use slog::error; use slog::warn; -use slog::Drain; -use slog::Logger; -use std::collections::VecDeque; +use slog_error_chain::SlogInlineError; use std::net::IpAddr; use std::net::SocketAddr; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use uuid::Uuid; -// Our public interface depends directly or indirectly on these types; we -// export them so that consumers need not depend on dropshot themselves and -// to simplify how we stage incompatible upgrades. -pub use dropshot::ConfigLogging; -pub use dropshot::ConfigLoggingIfExists; -pub use dropshot::ConfigLoggingLevel; - -pub struct ServerStarter { - config: Config, - ctx: ServerContext, - ereports: mpsc::Receiver, - requests: mpsc::Receiver, - dns_resolver: Option, -} - -pub struct RunningServer { - // Handle to the buffer task. - buffer_task: tokio::task::JoinHandle<()>, - // TODO(eliza): hang onto the running dropshot stuff. -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct ReporterIdentity { - /// UUID of the reporter. - pub id: Uuid, - /// The address to listen for ereport collection requests on. - pub address: SocketAddr, -} - +#[derive(Clone, serde::Serialize, serde::Deserialize)] pub struct Config { - pub reporter: ReporterIdentity, + pub server_address: SocketAddr, /// How to discover the Nexus API to register the reporter. pub registration_address: Option, - /// The maximum size of Dropshot requests. + // /// The maximum size of Dropshot requests. pub request_body_max_bytes: usize, - /// The maximum number of ereports to buffer before exerting backpressure on producers. - pub buffer_capacity: usize, - pub request_channel_capacity: usize, - /// The logging configuration or actual logger used to emit logs. - pub log: LogConfig, } -/// Either configuration for building a logger, or an actual logger already -/// instantiated. -/// -/// This can be used to start a [`Server`] with a new logger or a child of a -/// parent logger if desired. -#[derive(Debug, Clone)] -pub enum LogConfig { - /// Configuration for building a new logger. - Config(ConfigLogging), - /// An explicit logger to use. - Logger(Logger), -} - -/// How to discover Nexus' IP for registration. -#[derive(Clone)] -enum NexusDiscovery { - /// Use the provided socket address for the Nexus API. - Addr(SocketAddr), - /// Discover Nexus from internal DNS - Dns(Resolver), +pub struct RunningServer { + _server: dropshot::HttpServer, } -#[derive(Clone)] -struct ServerContext { - tx: mpsc::Sender, +#[derive(Clone, Debug)] +pub(crate) struct State { + server_address: SocketAddr, + nexus: NexusDiscovery, } -struct EreporterApiImpl; - -impl ServerStarter { - pub fn new(config: Config) -> (crate::Reporter, Self) { - let (ereport_tx, ereports) = mpsc::channel(config.buffer_capacity); - let (tx, requests) = mpsc::channel(128); - let this = Self { - config, - ereports, - ctx: ServerContext { tx }, - requests, - dns_resolver: None, - }; - (crate::Reporter(ereport_tx), this) - } - - /// Use the provided internal DNS resolver rather than creating a new one. - pub fn with_resolver(self, resolver: Resolver) -> Self { - Self { dns_resolver: Some(resolver), ..self } - } +impl ReporterRegistry { + pub fn start_server( + &self, + config: Config, + dns_resolver: Option, + ) -> anyhow::Result { + let log = &self.0.log; - pub async fn start(self) -> anyhow::Result { - let Self { config, ctx, ereports, requests, dns_resolver } = self; - let log = { - let base_logger = match config.log { - LogConfig::Config(conf) => conf.to_logger("ereporter")?, - LogConfig::Logger(log) => log.clone(), + let _server = { + let dropshot_cfg = ConfigDropshot { + bind_address: config.server_address, + request_body_max_bytes: config.request_body_max_bytes, + default_handler_task_mode: dropshot::HandlerTaskMode::Detached, + log_headers: vec![], }; - let (drain, registration) = slog_dtrace::with_drain(base_logger); - let log = Logger::root(drain.fuse(), slog::o!(FileKv)); - if let slog_dtrace::ProbeRegistration::Failed(e) = registration { - error!(log, "failed to register DTrace probes: {e}",); - } else { - debug!(log, "registered DTrace probes"); - } - log + let log = log.new(slog::o!("component" => "dropshot")); + let api = ereporter_api::ereporter_api_mod::api_description::< + EreporterApiImpl, + >()?; + dropshot::HttpServerStarter::new( + &dropshot_cfg, + api, + self.clone(), + &log, + ) + .map_err(|e| { + anyhow::anyhow!("could not start dropshot server: {e}") + })? + .start() }; - // 1. discover nexus - // Create a resolver if needed, or use Nexus's address directly. - let discovery = match (config.registration_address, dns_resolver) { + let nexus = match (config.registration_address, dns_resolver) { (Some(addr), _) => { if addr.port() == 0 { anyhow::bail!( @@ -140,7 +79,7 @@ impl ServerStarter { ); } debug!( - log, + self.0.log, "Nexus IP provided explicitly, registering with it"; "addr" => %addr, ); @@ -150,140 +89,54 @@ impl ServerStarter { // Ensure that we've been provided with an IPv6 address if we're // using DNS to resolve Nexus. That's required because we need // to use the /48 to find our DNS server itself. - let IpAddr::V6(our_addr) = config.reporter.address.ip() else { + let IpAddr::V6(our_addr) = config.server_address.ip() else { anyhow::bail!("server address must be IPv6 in order to resolve Nexus from DNS") }; debug!( - log, + self.0.log, "Nexus IP not provided, will create an internal \ DNS resolver to resolve it" ); let resolver = Resolver::new_from_ip( - log.new(slog::o!("component" => "internal-dns-resolver")), + self.0 + .log + .new(slog::o!("component" => "internal-dns-resolver")), our_addr, )?; NexusDiscovery::Dns(resolver) } (None, Some(resolver)) => { debug!( - log, + self.0.log, "Nexus IP not provided, will use DNS to resolve it" ); NexusDiscovery::Dns(resolver) } }; - let nexus_addr = discovery.nexus_addr(&log).await; - let nexus_client = nexus_client::Client::new( - &format!("http://{nexus_addr}"), - log.clone(), - ); - - // 2. register server and recover sequence number - // TODO(eliza): perhaps registrations should be periodically refreshed? - let nexus_client::types::EreporterRegistered { seq } = - config.reporter.register(&log, &nexus_client).await; - - // 3. spawn buffer task - let buffer_task = tokio::spawn( - crate::buffer::Buffer { - seq, - buf: VecDeque::with_capacity(config.buffer_capacity), - log, - id: config.reporter.id, - ereports, - requests, - } - .run(), - ); - - // 4. spawn dropshot server - // TODO(eliza): actually do that - - Ok(RunningServer { buffer_task }) - } -} - -impl NexusDiscovery { - async fn nexus_addr(&self, log: &Logger) -> SocketAddr { - match self { - Self::Addr(addr) => *addr, - Self::Dns(resolver) => { - let log_failure = |error, delay| { - warn!( - log, - "failed to lookup Nexus IP, will retry"; - "delay" => ?delay, - "error" => ?error, - ); - }; - let do_lookup = || async { - resolver - .lookup_socket_v6(ServiceName::Nexus) - .await - .map_err(|e| BackoffError::transient(e.to_string())) - .map(Into::into) - }; - backoff::retry_notify( - backoff::retry_policy_internal_service(), - do_lookup, - log_failure, - ) - .await - .expect("Expected infinite retry loop resolving Nexus address") - } - } - } -} - -impl ReporterIdentity { - async fn register( - &self, - log: &Logger, - client: &nexus_client::Client, - ) -> nexus_client::types::EreporterRegistered { - let log_failure = |error, delay| { - warn!( - log, - "failed to register ereporter with Nexus, will retry"; - "delay" => ?delay, - "error" => ?error, - ); - }; - let info = nexus_client::types::EreporterInfo { - address: self.address.to_string(), - reporter_id: self.id, - }; - let do_register = || async { - client - .cpapi_ereporters_post(&info) - .await - .map(|response| response.into_inner()) - .map_err(|e| BackoffError::transient(e)) - }; - backoff::retry_notify( - backoff::retry_policy_internal_service(), - do_register, - log_failure, - ) - .await - .expect("Expected infinite retry loop registering ereporter") + self.0 + .server_tx + .send(Some(State { server_address: config.server_address, nexus })) + .expect("receivers should never be dropped"); + Ok(RunningServer { _server }) } } +struct EreporterApiImpl; impl ereporter_api::EreporterApi for EreporterApiImpl { - type Context = ServerContext; + type Context = ReporterRegistry; async fn ereports_list( reqctx: RequestContext, + path: Path, query: Query>, - ) -> Result>, HttpError> + ) -> Result>, HttpError> { - let ctx = reqctx.context(); - + let registry = reqctx.context(); let pagination = query.into_inner(); let limit = reqctx.page_limit(&pagination)?.get() as usize; + let ereporter_api::ListPathParams { reporter_id } = path.into_inner(); let start_seq = match pagination.page { WhichPage::First(..) => None, @@ -293,11 +146,18 @@ impl ereporter_api::EreporterApi for EreporterApiImpl { slog::debug!( reqctx.log, "received ereport list request"; + "reporter_id" => %reporter_id, "start_seq" => ?start_seq, "limit" => limit, ); + let worker = registry.get_worker(&reporter_id).ok_or( + HttpError::for_not_found( + Some("NO_REPORTER".to_string()), + format!("no reporter with ID {reporter_id}"), + ), + )?; let (tx, rx) = oneshot::channel(); - ctx.tx + worker .send(buffer::ServerReq::List { start_seq, limit, tx }) .await .map_err(|_| Error::internal_error("server shutting down"))?; @@ -308,20 +168,33 @@ impl ereporter_api::EreporterApi for EreporterApiImpl { let page = ResultsPage::new( list, &EmptyScanParams {}, - |ereport: &ereporter_api::Ereport, _| ereport.seq, + |entry: &ereporter_api::Entry, _| entry.seq, )?; Ok(HttpResponseOk(page)) } - async fn ereports_truncate( + async fn ereports_acknowledge( reqctx: RequestContext, - path: dropshot::Path, + path: Path, ) -> Result { - let ctx = reqctx.context(); - let ereporter_api::SeqPathParam { seq } = path.into_inner(); - slog::debug!(reqctx.log, "received ereport truncate request"; "seq" => ?seq); + let registry = reqctx.context(); + let ereporter_api::AcknowledgePathParams { reporter_id, seq } = + path.into_inner(); + slog::debug!( + reqctx.log, + "received ereport acknowledge request"; + "reporter_id" => %reporter_id, + "seq" => ?seq, + ); + + let worker = registry.get_worker(&reporter_id).ok_or( + HttpError::for_not_found( + Some("NO_REPORTER".to_string()), + format!("no reporter with ID {reporter_id}"), + ), + )?; let (tx, rx) = oneshot::channel(); - ctx.tx + worker .send(buffer::ServerReq::TruncateTo { seq, tx }) .await .map_err(|_| Error::internal_error("server shutting down"))?; @@ -332,3 +205,88 @@ impl ereporter_api::EreporterApi for EreporterApiImpl { Ok(HttpResponseDeleted()) } } + +/// How to discover Nexus' IP for registration. +#[derive(Clone)] +enum NexusDiscovery { + Addr(SocketAddr), + Dns(Resolver), +} + +impl State { + pub(crate) async fn register_reporter( + &self, + log: &slog::Logger, + reporter_id: Uuid, + ) -> Generation { + #[derive(Debug, thiserror::Error, SlogInlineError)] + enum RegistrationError { + #[error(transparent)] + Dns(#[from] internal_dns::resolver::ResolveError), + #[error(transparent)] + Client(#[from] nexus_client::Error), + } + + let info = nexus_client::types::EreporterInfo { + reporter_id, + address: self.server_address.to_string(), + }; + let do_register = || async { + let nexus_addr = self.nexus.nexus_addr().await.map_err(|e| { + BackoffError::transient(RegistrationError::from(e)) + })?; + let nexus_client = nexus_client::Client::new( + &format!("http://{nexus_addr}"), + log.clone(), + ); + let nexus_client::types::EreporterRegistered { seq } = nexus_client + .cpapi_ereporters_post(&info) + .await + .map_err(|e| { + BackoffError::transient(RegistrationError::from(e)) + })? + .into_inner(); + Ok(seq) + }; + let log_failure = |error: RegistrationError, delay| { + warn!( + log, + "failed to register ereporter with Nexus; retrying..."; + "reporter_id" => %reporter_id, + "reporter_address" => %self.server_address, + "error" => error, + "delay" => ?delay, + ); + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_register, + log_failure, + ) + .await + .expect("Expected infinite retry loop registering ereporter") + } +} + +impl NexusDiscovery { + async fn nexus_addr( + &self, + ) -> Result { + match self { + NexusDiscovery::Addr(addr) => Ok(*addr), + NexusDiscovery::Dns(resolver) => resolver + .lookup_socket_v6(ServiceName::Nexus) + .await + .map(Into::into), + } + } +} + +impl std::fmt::Debug for NexusDiscovery { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Addr(addr) => f.debug_tuple("Addr").field(&addr).finish(), + Self::Dns(_) => f.debug_tuple("Dns").finish(), + } + } +} From c17475367cba51b2d68d19494635a8af563e8119 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 16 Oct 2024 14:03:01 -0700 Subject: [PATCH 08/17] [ereport] start sketching proxy --- ereporter/src/proxy.rs | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/ereporter/src/proxy.rs b/ereporter/src/proxy.rs index e69de29bb2d..16b1dd8421d 100644 --- a/ereporter/src/proxy.rs +++ b/ereporter/src/proxy.rs @@ -0,0 +1,56 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use ereporter_api::Entry; +use omicron_common::api::external::Error; +use omicron_common::api::external::Generation; +use std::future::Future; +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; +use uuid::Uuid; + +pub trait ReporterProxy { + fn list( + &self, + starting_seq: Option, + limit: Option, + ) -> impl Future, Error>> + Send; + + fn acknowledge( + &self, + seq: Generation, + ) -> impl Future> + Send; + + fn recover_seq( + &self, + seq: Generation, + ) -> impl Future> + Send; +} + +pub(crate) struct ProxyWorker

{ + proxy: P, + log: slog::Logger, + id: Uuid, + server: watch::Receiver>, + server_reqs: mpsc::Receiver, + seqs: mpsc::Receiver, +} + +impl ProxyWorker

{ + async fn run(mut self) { + loop { + tokio::select! { + biased; + seq = self.seqs.recv() => { + if let Some(seq) = seq { + if let Err(_e) = self.proxy.recover_seq(seq).await { + // TODO(eliza): waht do about error + } + } + + }, + } + } + } +} From 3ef098bd2d3f74e173363c84ed2aaf55c27d5207 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 11:32:09 -0700 Subject: [PATCH 09/17] [ereport] quick standalone ingester --- Cargo.lock | 37 +++ Cargo.toml | 3 + ereporter/tester-ingester/Cargo.toml | 25 ++ ereporter/tester-ingester/src/lib.rs | 328 ++++++++++++++++++++++++++ ereporter/tester-ingester/src/main.rs | 33 +++ 5 files changed, 426 insertions(+) create mode 100644 ereporter/tester-ingester/Cargo.toml create mode 100644 ereporter/tester-ingester/src/lib.rs create mode 100644 ereporter/tester-ingester/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index c15f9faf0a4..28db53b8d6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2717,6 +2717,28 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "ereport-tester-ingester" +version = "0.1.0" +dependencies = [ + "anyhow", + "camino", + "clap", + "dropshot", + "ereporter-client", + "nexus-types", + "omicron-common", + "omicron-workspace-hack", + "schemars", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", + "uuid", +] + [[package]] name = "ereporter" version = "0.1.0" @@ -2751,6 +2773,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "ereporter-client" +version = "0.1.0" +dependencies = [ + "chrono", + "futures", + "omicron-common", + "omicron-workspace-hack", + "progenitor", + "reqwest 0.12.7", + "serde", + "slog", + "uuid", +] + [[package]] name = "errno" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 292b31d296a..05631cab537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "end-to-end-tests", "ereporter", "ereporter/api", + "ereporter/tester-ingester", "gateway", "gateway-api", "gateway-cli", @@ -168,6 +169,7 @@ default-members = [ "end-to-end-tests", "ereporter", "ereporter/api", + "ereporter/tester-ingester", "gateway", "gateway-api", "gateway-cli", @@ -351,6 +353,7 @@ dropshot = { version = "0.12.0", features = [ "usdt-probes" ] } dyn-clone = "1.0.17" either = "1.13.0" ereporter-api = { path = "ereporter/api" } +ereporter-client = { path = "clients/ereporter-client" } ereporter = { path = "ereporter" } expectorate = "1.1.0" fatfs = "0.3.6" diff --git a/ereporter/tester-ingester/Cargo.toml b/ereporter/tester-ingester/Cargo.toml new file mode 100644 index 00000000000..4a58317d1e8 --- /dev/null +++ b/ereporter/tester-ingester/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ereport-tester-ingester" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +camino.workspace = true +clap.workspace = true +dropshot.workspace = true +ereporter-client.workspace = true +nexus-types.workspace = true +omicron-common.workspace = true +omicron-workspace-hack.workspace = true +uuid.workspace = true +schemars.workspace = true +serde.workspace = true +serde_json.workspace = true +slog.workspace = true +slog-async.workspace = true +slog-term.workspace = true +tokio = { workspace = true, features = ["sync"] } + +[lints] +workspace = true diff --git a/ereporter/tester-ingester/src/lib.rs b/ereporter/tester-ingester/src/lib.rs new file mode 100644 index 00000000000..b2ff35759cc --- /dev/null +++ b/ereporter/tester-ingester/src/lib.rs @@ -0,0 +1,328 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +use anyhow::Context; +use camino::{Utf8Path, Utf8PathBuf}; +use clap::Parser; +use dropshot::{ + endpoint, HttpError, HttpResponseOk, RequestContext, TypedBody, +}; +use nexus_types::internal_api::params::EreporterInfo; +use omicron_common::api::external::Generation; +use schemars::JsonSchema; +use serde::Serialize; +use std::collections::hash_map::{Entry, HashMap}; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use uuid::Uuid; + +/// Configuration for a standalone ereport ingester. +#[derive(Clone, Debug, Parser)] +pub struct IngesterConfig { + /// Directory in which to store ingested ereports. + pub data_dir: camino::Utf8PathBuf, + + /// The address for the mock Nexus server used to register. + /// + /// This program starts a mock version of Nexus, which is used only to + /// register the producers and collectors. This allows them to operate + /// as they usually would, registering each other with Nexus so that an + /// assignment between them can be made. + #[arg( + long, + default_value_t = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 12345, 0, 0) + )] + pub nexus: SocketAddrV6, + + /// Interval at which ereports are ingested. + #[arg(long, default_value_t = 5)] + pub interval_secs: usize, +} + +impl IngesterConfig { + pub async fn run(self, log: slog::Logger) -> anyhow::Result<()> { + let (registration_tx, registrations) = mpsc::channel(128); + let ingester = Ingester { + reporters: HashMap::new(), + log: log.new(slog::o!("component" => "ingester")), + path: self.data_dir, + registrations, + interval: Duration::from_secs(self.interval_secs as u64), + }; + let ingester = tokio::spawn(ingester.run()); + + let _server = { + let apictx = ServerContext { registration_tx }; + let mut api = dropshot::ApiDescription::new(); + api.register(cpapi_ereporters_post)?; + dropshot::HttpServerStarter::new( + &dropshot::ConfigDropshot { + default_handler_task_mode: + dropshot::HandlerTaskMode::Detached, + bind_address: self.nexus.into(), + ..Default::default() + }, + api, + apictx, + &log.new(slog::o!("component" => "standalone-nexus")), + ) + .map_err(|e| { + anyhow::anyhow!("failed to start standalone nexus server: {e}") + })? + .start() + }; + slog::info!( + log, + "created standalone nexus server for ereporter registration"; + "address" => %self.nexus, + ); + + ingester.await.context("ingester task panicked")??; + Ok(()) + } +} + +struct Ingester { + reporters: HashMap, + registrations: mpsc::Receiver, + log: slog::Logger, + path: Utf8PathBuf, + interval: Duration, +} + +struct Reporter { + path: Utf8PathBuf, + clients: HashMap, +} + +struct Registration { + uuid: Uuid, + addr: SocketAddr, + tx: oneshot::Sender, +} + +impl Ingester { + async fn run(mut self) -> anyhow::Result<()> { + if !self.path.exists() { + std::fs::create_dir_all(&self.path).with_context(|| { + format!("couldn't create directory {}", self.path) + })?; + slog::info!(self.log, "created data dir"; "path" => %self.path); + } + let mut interval = time::interval(self.interval); + slog::info!(self.log, "ingesting ereports every {:?}", self.interval); + loop { + tokio::select! { + biased; + req = self.registrations.recv() => { + let Some(reg) = req else { + slog::warn!(self.log, "the registration-request sender has gone away?"); + anyhow::bail!("the registration request sender went away unexpectedly"); + }; + self.register_client(reg).await?; + }, + _ = interval.tick() => { + self.collect().await?; + } + } + } + } + + async fn collect(&mut self) -> anyhow::Result<()> { + slog::debug!(self.log, "collecting ereports..."); + for (id, reporter) in &self.reporters { + slog::debug!(self.log, "collecting ereports from {id}"); + for (addr, client) in &reporter.clients { + let reports = match client.ereports_list(id, None, None).await { + Ok(e) => e.into_inner(), + Err(e) => { + slog::error!(self.log, + "error collecting ereports"; + "reporter_id" => %id, + "reporter_addr" => %addr, + "error" => %e, + ); + continue; + } + }; + for ereporter_client::types::Entry { + seq, + value, + reporter_id, + } in reports.items + { + match value { + ereporter_client::types::EntryKind::Ereport(report) => { + slog::info!( + &self.log, + "ereport from {reporter_id}"; + "reporter_id" => %reporter_id, + "seq" => %seq, + "report" => ?report, + ); + match tokio::fs::File::create_new( + reporter.path.join(&format!("{seq}.json")), + ) + .await + { + Err(error) => { + slog::error!(self.log, + "couldn't create new file for ereport, may already have been ingested!"; + "reporter_id" => %reporter_id, + "seq" => %seq, + "error" => %error, + ); + continue; + } + Ok(mut f) => { + let bytes = serde_json::to_vec_pretty(&report) + .with_context(|| format!("failed to serialize ereport {seq} from {reporter_id}"))?; + + f + .write_all(&bytes + ) + .await + .with_context(|| format!("failed to write ereport {seq} from {reporter_id}"))?; + } + } + } + + ereporter_client::types::EntryKind::DataLoss { + dropped, + } => { + slog::warn!(self.log, + "reporter {reporter_id} reports data loss at seq {seq}"; + "reporter_id" => %reporter_id, + "seq" => %seq, + "dropped" => ?dropped, + ); + } + } + } + } + } + + Ok(()) + } + + async fn register_client( + &mut self, + Registration { uuid, addr, tx }: Registration, + ) -> anyhow::Result<()> { + let reporter = self.reporters.entry(uuid).or_insert_with(|| { + let path = self.path.join(uuid.to_string()); + Reporter { path, clients: HashMap::new() } + }); + std::fs::create_dir_all(&reporter.path).with_context(|| { + format!( + "couldn't create reporter ereport directory {}", + reporter.path + ) + })?; + let seq = recover_seq(&reporter.path).await?; + match reporter.clients.entry(addr) { + Entry::Occupied(_) => { + slog::info!( + self.log, + "recovering sequence for reporter"; + "reporter_id" => %uuid, + "address" => %addr, + "seq" => %seq, + ); + } + Entry::Vacant(e) => { + slog::info!( + self.log, + "registered new endpoint for reporter"; + "reporter_id" => %uuid, + "address" => %addr, + "seq" => %seq, + ); + let log = self.log.new(slog::o!( + "reporter_id" => uuid.to_string(), + "reporter_addr" => addr.to_string(), + )); + let client = ereporter_client::Client::new( + &format!("http://{addr}"), + log, + ); + e.insert(client); + } + } + + if tx.send(seq).is_err() { + slog::warn!( + self.log, + "reporter gave up on registration attempt unexpectedly" + ); + } + Ok(()) + } +} + +async fn recover_seq(path: &Utf8PathBuf) -> anyhow::Result { + let mut dir = tokio::fs::read_dir(path) + .await + .with_context(|| format!("failed to read {path}"))?; + let mut max = 0; + while let Some(entry) = dir + .next_entry() + .await + .with_context(|| format!("failed to get next entry in {path}"))? + { + let path = entry.path(); + let path = Utf8Path::from_path(path.as_ref()) + .with_context(|| format!("path {} was not utf8", path.display()))?; + if path.is_dir() { + continue; + } + if let Some(file) = path.file_stem() { + match file.parse::() { + Ok(seq) => max = std::cmp::max(seq, max), + Err(_) => { + continue; + } + } + } + } + + Ok(Generation::from_u32(max)) +} + +#[derive(Clone)] +struct ServerContext { + registration_tx: mpsc::Sender, +} + +/// Register an error reporter with Nexus, returning the next sequence +/// number for an error report from that reporter. +#[endpoint { + method = POST, + path = "/ereport/reporters", +}] +async fn cpapi_ereporters_post( + rqctx: RequestContext, + identity: TypedBody, +) -> Result, HttpError> { + let ctx = rqctx.context(); + let EreporterInfo { reporter_id, address } = identity.into_inner(); + let (tx, rx) = oneshot::channel(); + ctx.registration_tx + .send(Registration { addr: address, uuid: reporter_id, tx }) + .await + .expect("the main task should not have gone away"); + let seq = rx.await.expect("the main task should not have given up on us"); + Ok(HttpResponseOk(EreporterRegistered { seq })) +} + +/// Response to error reporter registration requests. +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct EreporterRegistered { + /// The starting sequence number of the next error report from this + /// reporter. If the reporter has not been seen by Nexus previously, this + /// may be 0. + pub seq: Generation, +} diff --git a/ereporter/tester-ingester/src/main.rs b/ereporter/tester-ingester/src/main.rs new file mode 100644 index 00000000000..9640b74222b --- /dev/null +++ b/ereporter/tester-ingester/src/main.rs @@ -0,0 +1,33 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use clap::Parser; +#[derive(Clone, Debug, Parser)] +struct Args { + /// The log level. + #[arg(long, default_value_t = slog::Level::Info, value_parser = parse_log_level)] + log_level: slog::Level, + + #[clap(flatten)] + ingester: ereport_tester_ingester::IngesterConfig, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let Args { log_level, ingester } = Args::parse(); + let log = { + use omicron_common::FileKv; + use slog::Drain; + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let drain = slog::LevelFilter::new(drain, log_level).fuse(); + slog::Logger::root(drain.fuse(), slog::o!(FileKv)) + }; + ingester.run(log).await +} + +fn parse_log_level(s: &str) -> Result { + s.parse().map_err(|_| format!("invalid log level {s:?}")) +} From 33593d47b2ab4d5eeb6a546fe70341033740b3ed Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 12:30:18 -0700 Subject: [PATCH 10/17] [ereport] add demo reporter, fix test stuff --- Cargo.lock | 6 +- Cargo.toml | 4 +- clients/ereporter-client/src/lib.rs | 4 + ereporter/Cargo.toml | 7 ++ ereporter/src/lib.rs | 3 +- .../Cargo.toml | 5 +- ereporter/test-utils/src/bin/ereport-demo.rs | 77 +++++++++++++ .../src/bin/ingester-tester.rs} | 2 +- .../src/lib.rs | 105 ++++++++++++------ 9 files changed, 175 insertions(+), 38 deletions(-) rename ereporter/{tester-ingester => test-utils}/Cargo.toml (80%) create mode 100644 ereporter/test-utils/src/bin/ereport-demo.rs rename ereporter/{tester-ingester/src/main.rs => test-utils/src/bin/ingester-tester.rs} (95%) rename ereporter/{tester-ingester => test-utils}/src/lib.rs (74%) diff --git a/Cargo.lock b/Cargo.lock index 28db53b8d6d..9c87ed750f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2718,13 +2718,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] -name = "ereport-tester-ingester" +name = "ereport-test-utils" version = "0.1.0" dependencies = [ "anyhow", "camino", "clap", "dropshot", + "ereporter", "ereporter-client", "nexus-types", "omicron-common", @@ -2745,6 +2746,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "clap", "dropshot", "ereporter-api", "internal-dns", @@ -2753,8 +2755,10 @@ dependencies = [ "omicron-workspace-hack", "serde", "slog", + "slog-async", "slog-dtrace", "slog-error-chain", + "slog-term", "thiserror", "tokio", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 05631cab537..75d3a8cb4b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ members = [ "end-to-end-tests", "ereporter", "ereporter/api", - "ereporter/tester-ingester", + "ereporter/test-utils", "gateway", "gateway-api", "gateway-cli", @@ -169,7 +169,7 @@ default-members = [ "end-to-end-tests", "ereporter", "ereporter/api", - "ereporter/tester-ingester", + "ereporter/test-utils", "gateway", "gateway-api", "gateway-cli", diff --git a/clients/ereporter-client/src/lib.rs b/clients/ereporter-client/src/lib.rs index 35da6012b6f..2f55b4e316b 100644 --- a/clients/ereporter-client/src/lib.rs +++ b/clients/ereporter-client/src/lib.rs @@ -19,6 +19,10 @@ progenitor::generate_api!( post_hook = (|log: &slog::Logger, result: &Result<_, _>| { slog::debug!(log, "client response"; "result" => ?result); }), + + replace = { + Generation = omicron_common::api::external::Generation, + }, ); impl omicron_common::api::external::ClientError for types::Error { diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml index 04f3376dc63..d92509c9ead 100644 --- a/ereporter/Cargo.toml +++ b/ereporter/Cargo.toml @@ -20,5 +20,12 @@ slog-dtrace.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["sync"] } +[dev-dependencies] +clap.workspace = true +slog.workspace = true +slog-async.workspace = true +slog-term.workspace = true +tokio = { workspace = true, features = ["sync", "macros", "full"] } + [lints] workspace = true diff --git a/ereporter/src/lib.rs b/ereporter/src/lib.rs index b43547ae1b5..6ba58ca5fdc 100644 --- a/ereporter/src/lib.rs +++ b/ereporter/src/lib.rs @@ -11,6 +11,7 @@ mod buffer; pub mod proxy; pub mod registry; pub mod server; +pub use self::registry::ReporterRegistry; #[must_use = "an `EreportBuilder` does nothing unless `submit()` is called"] pub struct EreportBuilder { @@ -61,7 +62,7 @@ pub struct Reporter(pub(crate) mpsc::Sender); impl Reporter { /// Begin constructing a new ereport, returning an [`EreportBuilder`]. pub async fn report( - &mut self, + &self, class: impl ToString, ) -> Result { let time_created = Utc::now(); diff --git a/ereporter/tester-ingester/Cargo.toml b/ereporter/test-utils/Cargo.toml similarity index 80% rename from ereporter/tester-ingester/Cargo.toml rename to ereporter/test-utils/Cargo.toml index 4a58317d1e8..65a4b1e292d 100644 --- a/ereporter/tester-ingester/Cargo.toml +++ b/ereporter/test-utils/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "ereport-tester-ingester" +name = "ereport-test-utils" version = "0.1.0" edition = "2021" @@ -8,6 +8,7 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true dropshot.workspace = true +ereporter.workspace = true ereporter-client.workspace = true nexus-types.workspace = true omicron-common.workspace = true @@ -19,7 +20,7 @@ serde_json.workspace = true slog.workspace = true slog-async.workspace = true slog-term.workspace = true -tokio = { workspace = true, features = ["sync"] } +tokio = { workspace = true, features = ["sync", "macros", "full"] } [lints] workspace = true diff --git a/ereporter/test-utils/src/bin/ereport-demo.rs b/ereporter/test-utils/src/bin/ereport-demo.rs new file mode 100644 index 00000000000..a526270a401 --- /dev/null +++ b/ereporter/test-utils/src/bin/ereport-demo.rs @@ -0,0 +1,77 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +use clap::Parser; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use uuid::Uuid; + +/// A demo binary that generates a single ereport and then waits for it to be collected. +#[derive(Clone, Parser)] +struct Args { + #[arg(long, short)] + uuid: Uuid, + + /// The address for the mock Nexus server used to register. + #[arg( + long, + default_value_t = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345) + )] + nexus_address: SocketAddr, + + /// The address for the mock Nexus server used to register. + #[arg( + long, + default_value_t = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0), + )] + server_address: SocketAddr, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let Args { uuid, nexus_address, server_address } = Args::parse(); + let log = { + use omicron_common::FileKv; + use slog::Drain; + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + // let drain = slog::LevelFilter::new(drain, log_level).fuse(); + slog::Logger::root(drain.fuse(), slog::o!(FileKv)) + }; + let registry = ereporter::ReporterRegistry::new( + ereporter::registry::LogConfig::Logger(log.clone()), + 128, + )?; + let reporter = registry.register_reporter(uuid); + let _server = registry.start_server( + ereporter::server::Config { + server_address, + registration_address: Some(nexus_address), + request_body_max_bytes: 1024 * 8, + }, + None, + )?; + + let mut report = reporter + .report("list.suspect") + .await + .map_err(|_| anyhow::anyhow!("couldnt get ye report"))?; + // https://www.youtube.com/watch?v=TSOtPQ-K3Ds + report.fact("de.version", "0"); + report.fact("de.mod-name", "hal-9000-diagnosis"); + report.fact("de.mod-version", "1.0"); + report.fact("de.authority.product-id", "HAL 9000-series computer"); + report.fact("de.authority.server-id", "HAL 9000"); + report.fact("fault-list.0.unit", "AE-35"); + report.fact("fault-list.0.class", "defect.discovery-one.ae-32-fault"); + report.fact("fault-list.0.certainty", "0x64"); + report.fact("fault-list.0.100-failure-in", "72h"); + report.fact( + "fault-list.0.nosub_class", + "ereport.discovery-one.communications-dish.ae-35-unit", + ); + report.submit(); + + tokio::signal::ctrl_c().await?; + Ok(()) +} diff --git a/ereporter/tester-ingester/src/main.rs b/ereporter/test-utils/src/bin/ingester-tester.rs similarity index 95% rename from ereporter/tester-ingester/src/main.rs rename to ereporter/test-utils/src/bin/ingester-tester.rs index 9640b74222b..9cbb45e2288 100644 --- a/ereporter/tester-ingester/src/main.rs +++ b/ereporter/test-utils/src/bin/ingester-tester.rs @@ -10,7 +10,7 @@ struct Args { log_level: slog::Level, #[clap(flatten)] - ingester: ereport_tester_ingester::IngesterConfig, + ingester: ereport_test_utils::IngesterConfig, } #[tokio::main] diff --git a/ereporter/tester-ingester/src/lib.rs b/ereporter/test-utils/src/lib.rs similarity index 74% rename from ereporter/tester-ingester/src/lib.rs rename to ereporter/test-utils/src/lib.rs index b2ff35759cc..d59b33ec45a 100644 --- a/ereporter/tester-ingester/src/lib.rs +++ b/ereporter/test-utils/src/lib.rs @@ -133,8 +133,11 @@ impl Ingester { async fn collect(&mut self) -> anyhow::Result<()> { slog::debug!(self.log, "collecting ereports..."); + // TODO(eliza): perhaps we should try to do every reporter in parallel + // at some point? for (id, reporter) in &self.reporters { slog::debug!(self.log, "collecting ereports from {id}"); + let mut saw_reports = false; for (addr, client) in &reporter.clients { let reports = match client.ereports_list(id, None, None).await { Ok(e) => e.into_inner(), @@ -154,40 +157,41 @@ impl Ingester { reporter_id, } in reports.items { + saw_reports = true; match value { ereporter_client::types::EntryKind::Ereport(report) => { + let path = + reporter.path.join(&format!("{seq}.json")); + if path.exists() { + slog::info!( + self.log, + "we are already familiar with ereport \ + {seq} from {reporter_id}, ignoring it"; + "reporter_id" => %reporter_id, + "seq" => %seq, + ); + continue; + } + slog::info!( &self.log, - "ereport from {reporter_id}"; + "ereport {seq} from {reporter_id}: {report:#?}"; "reporter_id" => %reporter_id, "seq" => %seq, - "report" => ?report, ); - match tokio::fs::File::create_new( - reporter.path.join(&format!("{seq}.json")), - ) - .await - { - Err(error) => { - slog::error!(self.log, - "couldn't create new file for ereport, may already have been ingested!"; - "reporter_id" => %reporter_id, - "seq" => %seq, - "error" => %error, - ); - continue; - } - Ok(mut f) => { - let bytes = serde_json::to_vec_pretty(&report) - .with_context(|| format!("failed to serialize ereport {seq} from {reporter_id}"))?; + let mut f = tokio::fs::File::create_new(&path) + .await + .with_context(|| { + format!("failed to create file {path}") + })?; - f - .write_all(&bytes - ) - .await - .with_context(|| format!("failed to write ereport {seq} from {reporter_id}"))?; - } - } + let bytes = serde_json::to_vec_pretty(&report) + .with_context(|| format!("failed to serialize ereport {seq} from {reporter_id}"))?; + + f + .write_all(&bytes) + .await + .with_context(|| format!("failed to write ereport {seq} from {reporter_id}"))?; } ereporter_client::types::EntryKind::DataLoss { @@ -203,6 +207,40 @@ impl Ingester { } } } + + if saw_reports { + // All ereports ingested for this reporter ID. Now, ack them up to the + // latest seq. + let seq = latest_seq(&reporter.path) + .await + .with_context(|| { + format!("couldn't determine latest seq for {id}") + })? + .unwrap_or_else(|| Generation::new()); + for (addr, client) in &reporter.clients { + match client.ereports_acknowledge(id, &seq).await { + Ok(_) => { + slog::info!( + &self.log, + "acked reports"; + "reporter_id" => %id, + "reporter_addr" => ?addr, + "seq" => %seq, + ); + } + Err(e) => { + slog::warn!( + &self.log, + "failed to ack reports"; + "reporter_id" => %id, + "reporter_addr" => ?addr, + "seq" => %seq, + "error" => %e, + ); + } + } + } + } } Ok(()) @@ -222,7 +260,12 @@ impl Ingester { reporter.path ) })?; - let seq = recover_seq(&reporter.path).await?; + let seq = latest_seq(&reporter.path) + .await? + // If there is an existing sequence number for this reporter, return + // the next one; otherwise, start at sequence 0. + .map(|seq| seq.next()) + .unwrap_or_else(|| Generation::new()); match reporter.clients.entry(addr) { Entry::Occupied(_) => { slog::info!( @@ -263,11 +306,11 @@ impl Ingester { } } -async fn recover_seq(path: &Utf8PathBuf) -> anyhow::Result { +async fn latest_seq(path: &Utf8PathBuf) -> anyhow::Result> { let mut dir = tokio::fs::read_dir(path) .await .with_context(|| format!("failed to read {path}"))?; - let mut max = 0; + let mut max = None; while let Some(entry) = dir .next_entry() .await @@ -281,7 +324,7 @@ async fn recover_seq(path: &Utf8PathBuf) -> anyhow::Result { } if let Some(file) = path.file_stem() { match file.parse::() { - Ok(seq) => max = std::cmp::max(seq, max), + Ok(seq) => max = std::cmp::max(Some(seq), max), Err(_) => { continue; } @@ -289,7 +332,7 @@ async fn recover_seq(path: &Utf8PathBuf) -> anyhow::Result { } } - Ok(Generation::from_u32(max)) + Ok(max.map(Generation::from_u32)) } #[derive(Clone)] From c0bb7c6a7f66fb4bc053cf8adb3104be6e8c9e38 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 12:30:42 -0700 Subject: [PATCH 11/17] [ereport] fix backwards buffer retain, remove todo --- ereporter/src/buffer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ereporter/src/buffer.rs b/ereporter/src/buffer.rs index 217f85fba36..d0e2c8a221e 100644 --- a/ereporter/src/buffer.rs +++ b/ereporter/src/buffer.rs @@ -125,7 +125,7 @@ impl BufferWorker { } ServerReq::TruncateTo { seq, tx } => { let prev_len = self.buf.len(); - self.buf.retain(|ereport| ereport.seq <= seq); + self.buf.retain(|ereport| ereport.seq > seq); slog::info!( self.log, @@ -143,7 +143,6 @@ impl BufferWorker { "client canceled truncate request" ); } - todo!() } } } From 84bd6b4e8366479c57acee9ccbd8e137b28807ec Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 12:30:54 -0700 Subject: [PATCH 12/17] [ereport] register bound server address --- ereporter/src/server.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs index c545fcae48e..9af4cdcd698 100644 --- a/ereporter/src/server.rs +++ b/ereporter/src/server.rs @@ -47,7 +47,7 @@ impl ReporterRegistry { ) -> anyhow::Result { let log = &self.0.log; - let _server = { + let server = { let dropshot_cfg = ConfigDropshot { bind_address: config.server_address, request_body_max_bytes: config.request_body_max_bytes, @@ -117,9 +117,15 @@ impl ReporterRegistry { self.0 .server_tx - .send(Some(State { server_address: config.server_address, nexus })) + .send(Some(State { + // Use the server's actual address once it has been bound, + // rather than the configured address, since the configured port + // may be :0. + server_address: server.local_addr(), + nexus, + })) .expect("receivers should never be dropped"); - Ok(RunningServer { _server }) + Ok(RunningServer { _server: server }) } } From 2979fa81e3e359b3f73a2fdf4a31c4fd3e6e8cc2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 12:31:47 -0700 Subject: [PATCH 13/17] [ereport] temporarily allow unfinished proxy reporter --- ereporter/src/proxy.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ereporter/src/proxy.rs b/ereporter/src/proxy.rs index 16b1dd8421d..0d2971644eb 100644 --- a/ereporter/src/proxy.rs +++ b/ereporter/src/proxy.rs @@ -1,12 +1,11 @@ // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. - +#![allow(dead_code)] // TODO(eliza): i'm still working on this bit use ereporter_api::Entry; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use std::future::Future; -use std::sync::Arc; use tokio::sync::{mpsc, watch}; use uuid::Uuid; From c7818b806bcde72e333b9d3765e3f374d6813818 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 13:27:12 -0700 Subject: [PATCH 14/17] [ereport] put client in workspace members --- Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 75d3a8cb4b6..de4bd9a4d6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "clients/ddm-admin-client", "clients/dns-service-client", "clients/dpd-client", + "clients/ereporter-client", "clients/gateway-client", "clients/installinator-client", "clients/nexus-client", @@ -135,6 +136,7 @@ default-members = [ "clients/ddm-admin-client", "clients/dns-service-client", "clients/dpd-client", + "clients/ereporter-client", "clients/gateway-client", "clients/installinator-client", "clients/nexus-client", From ce615a023623582264539e220d9bd550c389c319 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 13:27:21 -0700 Subject: [PATCH 15/17] [ereport] use prettier slog format for demo stuff --- ereporter/test-utils/src/bin/ereport-demo.rs | 2 +- ereporter/test-utils/src/bin/ingester-tester.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ereporter/test-utils/src/bin/ereport-demo.rs b/ereporter/test-utils/src/bin/ereport-demo.rs index a526270a401..5c1ff02460e 100644 --- a/ereporter/test-utils/src/bin/ereport-demo.rs +++ b/ereporter/test-utils/src/bin/ereport-demo.rs @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { use omicron_common::FileKv; use slog::Drain; let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); // let drain = slog::LevelFilter::new(drain, log_level).fuse(); slog::Logger::root(drain.fuse(), slog::o!(FileKv)) diff --git a/ereporter/test-utils/src/bin/ingester-tester.rs b/ereporter/test-utils/src/bin/ingester-tester.rs index 9cbb45e2288..dd93b17a8f5 100644 --- a/ereporter/test-utils/src/bin/ingester-tester.rs +++ b/ereporter/test-utils/src/bin/ingester-tester.rs @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> { use omicron_common::FileKv; use slog::Drain; let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); let drain = slog::LevelFilter::new(drain, log_level).fuse(); slog::Logger::root(drain.fuse(), slog::o!(FileKv)) From 8ea3c96544b310f0e5d8e15c4e9dd2104cfcce08 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 13:40:48 -0700 Subject: [PATCH 16/17] [ereporter] fix doc comment link --- ereporter/src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ereporter/src/registry.rs b/ereporter/src/registry.rs index 5b5d874f230..00ed682217c 100644 --- a/ereporter/src/registry.rs +++ b/ereporter/src/registry.rs @@ -29,7 +29,7 @@ pub struct ReporterRegistry(pub(crate) Arc); /// Either configuration for building a logger, or an actual logger already /// instantiated. /// -/// This can be used to start a [`Server`] with a new logger or a child of a +/// This can be used to start a [`ReporterRegistry`] with a new logger or a child of a /// parent logger if desired. #[derive(Debug, Clone)] pub enum LogConfig { From 8d767558382738113f3a467a88cc4429f7104992 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 13:50:42 -0700 Subject: [PATCH 17/17] [ereport] another docs whoopsie --- ereporter/api/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ereporter/api/src/lib.rs b/ereporter/api/src/lib.rs index 1ff5c1c43e2..fb434a8cbb3 100644 --- a/ereporter/api/src/lib.rs +++ b/ereporter/api/src/lib.rs @@ -44,7 +44,7 @@ pub trait EreporterApi { ) -> Result; } -/// Path parameters to the [`EreporterAPi::ereports_list`] endpoint. +/// Path parameters to the [`EreporterApi::ereports_list`] endpoint. #[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)] pub struct ListPathParams { pub reporter_id: Uuid,