Skip to content

Commit

Permalink
[oximeter] Rely on dynamically set arguments within Oximeter (#1237)
Browse files Browse the repository at this point in the history
Part of #1216

- Provide an ID and address as a deployment, not package-time arguments
- Optionally resolve Nexus and Clickhouse's IP addresses by DNS
  • Loading branch information
smklein authored Jun 26, 2022
1 parent 3acf9e8 commit f2bed81
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion common/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ pub const SLED_AGENT_PORT: u16 = 12345;

/// The port propolis-server listens on inside the propolis zone.
pub const PROPOLIS_PORT: u16 = 12400;

pub const CLICKHOUSE_PORT: u16 = 8123;
pub const OXIMETER_PORT: u16 = 12223;

pub const NEXUS_INTERNAL_PORT: u16 = 12221;

// Anycast is a mechanism in which a single IP address is shared by multiple
// devices, and the destination is located based on routing distance.
//
Expand Down
17 changes: 8 additions & 9 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use oximeter_collector::Oximeter;
use oximeter_producer::Server as ProducerServer;
use slog::o;
use slog::Logger;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6};
use std::path::Path;
use std::time::Duration;
use uuid::Uuid;
Expand Down Expand Up @@ -199,21 +199,20 @@ pub async fn start_oximeter(
id: Uuid,
) -> Result<Oximeter, String> {
let db = oximeter_collector::DbConfig {
address: SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port),
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)),
batch_size: 10,
batch_interval: 1,
};
let config = oximeter_collector::Config {
id,
nexus_address,
nexus_address: Some(nexus_address),
db,
dropshot: ConfigDropshot {
bind_address: SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 0),
..Default::default()
},
log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error },
};
Oximeter::with_logger(&config, log).await.map_err(|e| e.to_string())
let args = oximeter_collector::OximeterArguments {
id,
address: SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0),
};
Oximeter::with_logger(&config, &args, log).await.map_err(|e| e.to_string())
}

#[derive(Debug, Clone, oximeter::Target)]
Expand Down
1 change: 1 addition & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = "MPL-2.0"
[dependencies]
clap = { version = "3.2", features = ["derive"] }
dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] }
internal-dns-client = { path = "../../internal-dns-client" }
nexus-client = { path = "../../nexus-client" }
omicron-common = { path = "../../common" }
oximeter = { path = "../oximeter" }
Expand Down
7 changes: 0 additions & 7 deletions oximeter/collector/config.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
# Example configuration file for running an oximeter collector server

id = "1da65e5b-210c-4859-a7d7-200c1e659972"
nexus_address = "127.0.0.1:12221"

[db]
address = "[::1]:8123"
batch_size = 1000
batch_interval = 5 # In seconds

[log]
level = "debug"
mode = "stderr-terminal"

[dropshot]
bind_address = "[::1]:12223"
54 changes: 31 additions & 23 deletions oximeter/collector/src/bin/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
use clap::Parser;
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use oximeter_collector::{oximeter_api, Config, Oximeter};
use oximeter_collector::{oximeter_api, Config, Oximeter, OximeterArguments};
use std::net::SocketAddrV6;
use std::path::PathBuf;
use uuid::Uuid;

pub fn run_openapi() -> Result<(), String> {
oximeter_api()
Expand All @@ -24,18 +26,22 @@ pub fn run_openapi() -> Result<(), String> {
/// Run an oximeter metric collection server in the Oxide Control Plane.
#[derive(Parser)]
#[clap(name = "oximeter", about = "See README.adoc for more information")]
struct Args {
#[clap(
short = 'O',
long = "openapi",
help = "Print the external OpenAPI Spec document and exit",
action
)]
openapi: bool,

/// Path to TOML file with configuration for the server
#[clap(name = "CONFIG_FILE", action)]
config_file: PathBuf,
enum Args {
/// Print the external OpenAPI Spec document and exit
Openapi,

/// Start an Oximeter server
Run {
/// Path to TOML file with configuration for the server
#[clap(name = "CONFIG_FILE", action)]
config_file: PathBuf,

#[clap(short, long, action)]
id: Uuid,

#[clap(short, long, action)]
address: SocketAddrV6,
},
}

#[tokio::main]
Expand All @@ -47,15 +53,17 @@ async fn main() {

async fn do_run() -> Result<(), CmdError> {
let args = Args::parse();
let config = Config::from_file(args.config_file).unwrap();
if args.openapi {
run_openapi().map_err(CmdError::Failure)
} else {
Oximeter::new(&config)
.await
.unwrap()
.serve_forever()
.await
.map_err(|e| CmdError::Failure(e.to_string()))
match args {
Args::Openapi => run_openapi().map_err(CmdError::Failure),
Args::Run { config_file, id, address } => {
let config = Config::from_file(config_file).unwrap();
let args = OximeterArguments { id, address };
Oximeter::new(&config, &args)
.await
.unwrap()
.serve_forever()
.await
.map_err(|e| CmdError::Failure(e.to_string()))
}
}
}
89 changes: 68 additions & 21 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ use dropshot::{
HttpResponseUpdatedNoContent, HttpServer, HttpServerStarter,
RequestContext, TypedBody,
};
use internal_dns_client::{
multiclient::{ResolveError, Resolver},
names::{ServiceName, SRV},
};
use omicron_common::address::{CLICKHOUSE_PORT, NEXUS_INTERNAL_PORT};
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::backoff;
use oximeter::types::{ProducerResults, ProducerResultsItem};
use oximeter_db::{Client, DbWrite};
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, o, trace, warn, Drain, Logger};
use std::collections::{btree_map::Entry, BTreeMap};
use std::net::SocketAddr;
use std::net::{SocketAddr, SocketAddrV6};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -37,6 +42,9 @@ pub enum Error {

#[error(transparent)]
Database(#[from] oximeter_db::Error),

#[error(transparent)]
ResolveError(#[from] ResolveError),
}

// Messages for controlling a collection task
Expand Down Expand Up @@ -231,8 +239,11 @@ async fn results_sink(
/// Configuration for interacting with the metric database.
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct DbConfig {
/// Address of the ClickHouse server
pub address: SocketAddr,
/// Optional address of the ClickHouse server.
///
/// If "None", will be inferred from DNS.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub address: Option<SocketAddr>,

/// Batch size of samples at which to insert
pub batch_size: usize,
Expand All @@ -259,6 +270,7 @@ impl OximeterAgent {
pub async fn with_id(
id: Uuid,
db_config: DbConfig,
resolver: &Resolver,
log: &Logger,
) -> Result<Self, Error> {
let (result_sender, result_receiver) = mpsc::channel(8);
Expand All @@ -267,7 +279,17 @@ impl OximeterAgent {

// Construct the ClickHouse client first, propagate an error if we can't reach the
// database.
let client = Client::new(db_config.address, &log);
let db_address = if let Some(address) = db_config.address {
address
} else {
SocketAddr::new(
resolver
.lookup_ip(SRV::Service(ServiceName::Clickhouse))
.await?,
CLICKHOUSE_PORT,
)
};
let client = Client::new(db_address, &log);
client.init_db().await?;

// Spawn the task for aggregating and inserting all metrics
Expand Down Expand Up @@ -334,18 +356,15 @@ impl OximeterAgent {
/// Configuration used to initialize an oximeter server
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
/// An unique ID for this oximeter server
pub id: Uuid,

/// The address used to connect to Nexus.
pub nexus_address: SocketAddr,
///
/// If "None", will be inferred from DNS.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nexus_address: Option<SocketAddr>,

/// Configuration for working with ClickHouse
pub db: DbConfig,

/// The internal Dropshot HTTP server configuration
pub dropshot: ConfigDropshot,

/// Logging configuration
pub log: ConfigLogging,
}
Expand All @@ -360,6 +379,11 @@ impl Config {
}
}

pub struct OximeterArguments {
pub id: Uuid,
pub address: SocketAddrV6,
}

/// A server used to collect metrics from components in the control plane.
pub struct Oximeter {
_agent: Arc<OximeterAgent>,
Expand All @@ -371,12 +395,15 @@ impl Oximeter {
///
/// This starts an HTTP server used to communicate with other agents in Omicron, especially
/// Nexus. It also registers itself as a new `oximeter` instance with Nexus.
pub async fn new(config: &Config) -> Result<Self, Error> {
pub async fn new(
config: &Config,
args: &OximeterArguments,
) -> Result<Self, Error> {
let log = config
.log
.to_logger("oximeter")
.map_err(|msg| Error::Server(msg.to_string()))?;
Self::with_logger(config, log).await
Self::with_logger(config, args, log).await
}

/// Create a new `Oximeter`, specifying an alternative logger to use.
Expand All @@ -385,6 +412,7 @@ impl Oximeter {
/// `config`, using `log` instead.
pub async fn with_logger(
config: &Config,
args: &OximeterArguments,
log: Logger,
) -> Result<Self, Error> {
let (drain, registration) = slog_dtrace::with_drain(log);
Expand All @@ -398,10 +426,13 @@ impl Oximeter {
}
info!(log, "starting oximeter server");

let resolver = Resolver::new_from_ip(*args.address.ip())?;

let make_agent = || async {
debug!(log, "creating ClickHouse client");
Ok(Arc::new(
OximeterAgent::with_id(config.id, config.db, &log).await?,
OximeterAgent::with_id(args.id, config.db, &resolver, &log)
.await?,
))
};
let log_client_failure = |error, delay| {
Expand All @@ -421,7 +452,10 @@ impl Oximeter {

let dropshot_log = log.new(o!("component" => "dropshot"));
let server = HttpServerStarter::new(
&config.dropshot,
&ConfigDropshot {
bind_address: SocketAddr::V6(args.address),
..Default::default()
},
oximeter_api(),
Arc::clone(&agent),
&dropshot_log,
Expand All @@ -433,20 +467,33 @@ impl Oximeter {
let client = reqwest::Client::new();
let notify_nexus = || async {
debug!(log, "contacting nexus");
client
.post(format!(
"http://{}/metrics/collectors",
config.nexus_address
let nexus_address = if let Some(address) = config.nexus_address {
address
} else {
SocketAddr::V6(SocketAddrV6::new(
resolver
.lookup_ipv6(SRV::Service(ServiceName::Nexus))
.await
.map_err(|e| {
backoff::BackoffError::transient(e.to_string())
})?,
NEXUS_INTERNAL_PORT,
0,
0,
))
};

client
.post(format!("http://{}/metrics/collectors", nexus_address,))
.json(&nexus_client::types::OximeterInfo {
address: server.local_addr().to_string(),
collector_id: agent.id,
})
.send()
.await
.map_err(backoff::BackoffError::transient)?
.map_err(|e| backoff::BackoffError::transient(e.to_string()))?
.error_for_status()
.map_err(backoff::BackoffError::transient)
.map_err(|e| backoff::BackoffError::transient(e.to_string()))
};
let log_notification_failure = |error, delay| {
warn!(
Expand Down
14 changes: 10 additions & 4 deletions oximeter/collector/tests/output/cmd-oximeter-noargs-stderr
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
error: The following required arguments were not provided:
<CONFIG_FILE>
oximeter
See README.adoc for more information

USAGE:
oximeter [OPTIONS] <CONFIG_FILE>
oximeter <SUBCOMMAND>

For more information try --help
OPTIONS:
-h, --help Print help information

SUBCOMMANDS:
help Print this message or the help of the given subcommand(s)
openapi Print the external OpenAPI Spec document and exit
run Start an Oximeter server
2 changes: 1 addition & 1 deletion oximeter/collector/tests/test_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn test_oximeter_openapi() {
// But we do know where it is at compile time, so we load it then.
let config = include_str!("../../collector/config.toml");
let config_path = write_config(config);
let exec = Exec::cmd(path_to_oximeter()).arg(&config_path).arg("--openapi");
let exec = Exec::cmd(path_to_oximeter()).arg("openapi");
let (exit_status, stdout_text, stderr_text) = run_command(exec);
fs::remove_file(&config_path).expect("failed to remove temporary file");
assert_exit_code(exit_status, EXIT_SUCCESS, &stderr_text);
Expand Down
Loading

0 comments on commit f2bed81

Please sign in to comment.