diff --git a/Cargo.lock b/Cargo.lock index 2c050ef..7a79723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -495,6 +495,19 @@ dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "commons" +version = "0.1.0" +dependencies = [ + "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "copyless" version = "0.1.5" @@ -644,6 +657,54 @@ dependencies = [ "synstructure 0.12.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fcos-graph-builder" +version = "0.1.0" +dependencies = [ + "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cbloom 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "commons 0.1.0", + "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "envsubst 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.56 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fcos-policy-engine" +version = "0.1.0" +dependencies = [ + "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cbloom 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "commons 0.1.0", + "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "envsubst 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.56 (registry+https://github.com/rust-lang/crates.io-index)", + "structopt 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "flate2" version = "1.0.16" diff --git a/Cargo.toml b/Cargo.toml index 6e3f0fc..8322195 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [workspace] members = [ + "commons", "dumnati", -# "graph-builder", -# "policy-engine", + "fcos-graph-builder", + "fcos-policy-engine", ] diff --git a/commons/.gitignore b/commons/.gitignore new file mode 100644 index 0000000..53eaa21 --- /dev/null +++ b/commons/.gitignore @@ -0,0 +1,2 @@ +/target +**/*.rs.bk diff --git a/commons/Cargo.toml b/commons/Cargo.toml new file mode 100644 index 0000000..8caccd4 --- /dev/null +++ b/commons/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "commons" +version = "0.1.0" +authors = ["Allen Bai "] +edition = "2018" +publish = false + +[dependencies] +actix-web = "^2.0.0" +chrono = "^0.4.7" +failure = "^0.1.1" +maplit = "^1.0" +prometheus = "^0.9" +serde = "^1.0.70" +serde_derive = "^1.0.70" diff --git a/commons/src/graph.rs b/commons/src/graph.rs new file mode 100644 index 0000000..ee8d089 --- /dev/null +++ b/commons/src/graph.rs @@ -0,0 +1,203 @@ +use crate::metadata; +use failure::Fallible; +use serde_derive::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Single release entry in the Cincinnati update-graph. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CincinnatiPayload { + pub version: String, + pub metadata: HashMap, + pub payload: String, +} + +/// Cincinnati update-graph, a DAG with releases (nodes) and update paths (edges). +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Graph { + pub nodes: Vec, + pub edges: Vec<(u64, u64)>, +} + +impl Default for Graph { + fn default() -> Self { + Self { + nodes: vec![], + edges: vec![], + } + } +} + +impl Graph { + /// Assemble a graph from release-index and updates metadata. + pub fn from_metadata( + releases: Vec, + updates: metadata::UpdatesJSON, + ) -> Fallible { + let nodes: Vec = releases + .into_iter() + .enumerate() + .map(|(age_index, entry)| { + let mut current = CincinnatiPayload { + version: entry.version, + payload: "".to_string(), + metadata: maplit::hashmap! { + metadata::AGE_INDEX.to_string() => age_index.to_string(), + }, + }; + for commit in entry.commits { + if commit.architecture.is_empty() || commit.checksum.is_empty() { + continue; + } + let key = format!("{}.{}", metadata::ARCH_PREFIX, commit.architecture); + let value = commit.checksum; + current.metadata.insert(key, value); + } + + // Augment with dead-ends metadata. + Self::inject_deadend_reason(&updates, &mut current); + + // Augment with barriers metadata. + Self::inject_barrier_reason(&updates, &mut current); + + // Augment with rollouts metadata. + Self::inject_throttling_params(&updates, &mut current); + + current + }) + .collect(); + + // Compute the update graph. + let edges = Self::compute_edges(&nodes)?; + + let graph = Graph { nodes, edges }; + Ok(graph) + } + + /// Compute edges based on graph metadata. + fn compute_edges(nodes: &[CincinnatiPayload]) -> Fallible> { + use std::collections::BTreeSet; + use std::ops::Bound; + + // Collect all rollouts and barriers. + let mut rollouts = BTreeSet::::new(); + let mut barriers = BTreeSet::::new(); + for (index, release) in nodes.iter().enumerate() { + if release.metadata.contains_key(metadata::ROLLOUT) { + rollouts.insert(index as u64); + } + if release.metadata.contains_key(metadata::BARRIER) { + barriers.insert(index as u64); + } + } + + // Add edges targeting rollouts, back till the previous barrier. + let mut edges = vec![]; + for (index, _release) in nodes.iter().enumerate().rev() { + let age = index as u64; + if !rollouts.contains(&age) { + continue; + } + + let previous_barrier = barriers + .range((Bound::Unbounded, Bound::Excluded(age))) + .next_back() + .cloned() + .unwrap_or(0); + + for i in previous_barrier..age { + edges.push((i, age)) + } + } + + // Add edges targeting barriers, back till the previous barrier. + let mut start = 0; + for target in barriers { + if rollouts.contains(&target) { + // This is an in-progress barrier. Rollout logic already took care + // of it, nothing to do here. + } else { + for i in start..target { + edges.push((i, target)) + } + } + start = target; + } + + Ok(edges) + } + + fn inject_barrier_reason(updates: &metadata::UpdatesJSON, release: &mut CincinnatiPayload) { + for entry in &updates.releases { + if entry.version != release.version { + continue; + } + + if let Some(barrier) = &entry.metadata.barrier { + let reason = if barrier.reason.is_empty() { + "generic" + } else { + &barrier.reason + }; + + release + .metadata + .insert(metadata::BARRIER.to_string(), true.to_string()); + release + .metadata + .insert(metadata::BARRIER_REASON.to_string(), reason.to_string()); + } + } + } + + fn inject_deadend_reason(updates: &metadata::UpdatesJSON, release: &mut CincinnatiPayload) { + for entry in &updates.releases { + if entry.version != release.version { + continue; + } + + if let Some(deadend) = &entry.metadata.deadend { + let reason = if deadend.reason.is_empty() { + "generic" + } else { + &deadend.reason + }; + + release + .metadata + .insert(metadata::DEADEND.to_string(), true.to_string()); + release + .metadata + .insert(metadata::DEADEND_REASON.to_string(), reason.to_string()); + } + } + } + + fn inject_throttling_params(updates: &metadata::UpdatesJSON, release: &mut CincinnatiPayload) { + for entry in &updates.releases { + if entry.version != release.version { + continue; + } + + if let Some(rollout) = &entry.metadata.rollout { + release + .metadata + .insert(metadata::ROLLOUT.to_string(), true.to_string()); + if let Some(val) = rollout.start_epoch { + release + .metadata + .insert(metadata::START_EPOCH.to_string(), val.to_string()); + } + if let Some(val) = rollout.start_percentage { + release + .metadata + .insert(metadata::START_VALUE.to_string(), val.to_string()); + } + if let Some(minutes) = &rollout.duration_minutes { + release + .metadata + .insert(metadata::DURATION.to_string(), minutes.to_string()); + } + } + } + } +} diff --git a/commons/src/lib.rs b/commons/src/lib.rs new file mode 100644 index 0000000..9b5aeeb --- /dev/null +++ b/commons/src/lib.rs @@ -0,0 +1,4 @@ +pub mod graph; +pub mod metadata; +pub mod metrics; +pub mod policy; diff --git a/commons/src/metadata.rs b/commons/src/metadata.rs new file mode 100644 index 0000000..02e1036 --- /dev/null +++ b/commons/src/metadata.rs @@ -0,0 +1,80 @@ +//! Fedora CoreOS metadata. + +use serde_derive::Deserialize; + +/// Templated URL for release index. +pub static RELEASES_JSON: &str = + "https://builds.coreos.fedoraproject.org/prod/streams/${stream}/releases.json"; + +/// Templated URL for stream metadata. +pub static STREAM_JSON: &str = "https://builds.coreos.fedoraproject.org/updates/${stream}.json"; + +pub static SCHEME: &str = "org.fedoraproject.coreos.scheme"; + +pub static AGE_INDEX: &str = "org.fedoraproject.coreos.releases.age_index"; +pub static ARCH_PREFIX: &str = "org.fedoraproject.coreos.releases.arch"; + +pub static BARRIER: &str = "org.fedoraproject.coreos.updates.barrier"; +pub static BARRIER_REASON: &str = "org.fedoraproject.coreos.updates.barrier_reason"; +pub static DEADEND: &str = "org.fedoraproject.coreos.updates.deadend"; +pub static DEADEND_REASON: &str = "org.fedoraproject.coreos.updates.deadend_reason"; +pub static ROLLOUT: &str = "org.fedoraproject.coreos.updates.rollout"; +pub static DURATION: &str = "org.fedoraproject.coreos.updates.duration_minutes"; +pub static START_EPOCH: &str = "org.fedoraproject.coreos.updates.start_epoch"; +pub static START_VALUE: &str = "org.fedoraproject.coreos.updates.start_value"; + +/// Fedora CoreOS release index. +#[derive(Debug, Deserialize)] +pub struct ReleasesJSON { + pub releases: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct Release { + pub commits: Vec, + pub version: String, + pub metadata: String, +} + +#[derive(Debug, Deserialize)] +pub struct ReleaseCommit { + pub architecture: String, + pub checksum: String, +} + +/// Fedora CoreOS updates metadata +#[derive(Debug, Deserialize)] +pub struct UpdatesJSON { + pub stream: String, + pub releases: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct ReleaseUpdate { + pub version: String, + pub metadata: UpdateMetadata, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateMetadata { + pub barrier: Option, + pub deadend: Option, + pub rollout: Option, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateBarrier { + pub reason: String, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateDeadend { + pub reason: String, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateRollout { + pub start_epoch: Option, + pub start_percentage: Option, + pub duration_minutes: Option, +} diff --git a/commons/src/metrics.rs b/commons/src/metrics.rs new file mode 100644 index 0000000..eba3fa8 --- /dev/null +++ b/commons/src/metrics.rs @@ -0,0 +1,18 @@ +//! Metrics endpoint. + +use actix_web::HttpResponse; + +/// Serve metrics requests (Prometheus textual format). +pub async fn serve_metrics() -> Result { + use prometheus::Encoder; + + let content = { + let metrics = prometheus::default_registry().gather(); + let txt_enc = prometheus::TextEncoder::new(); + let mut buf = vec![]; + txt_enc.encode(&metrics, &mut buf)?; + buf + }; + + Ok(HttpResponse::Ok().body(content)) +} diff --git a/commons/src/policy.rs b/commons/src/policy.rs new file mode 100644 index 0000000..541c31c --- /dev/null +++ b/commons/src/policy.rs @@ -0,0 +1,117 @@ +use crate::graph::Graph; +use crate::metadata; +use failure::{bail, Fallible}; + +/// Prune outgoing edges from "deadend" nodes. +pub fn filter_deadends(input: Graph) -> Graph { + use std::collections::HashSet; + + let mut graph = input; + let mut deadends = HashSet::new(); + + for (index, release) in graph.nodes.iter().enumerate() { + if release.metadata.get(metadata::DEADEND) == Some(&"true".into()) { + deadends.insert(index); + } + } + + graph.edges.retain(|(from, _to)| { + let index = *from as usize; + !deadends.contains(&index) + }); + graph.edges.shrink_to_fit(); + + graph +} + +/// Pick relevant payload for requested basearch. +pub fn pick_basearch(input: Graph, basearch: String) -> Fallible { + let mut graph = input; + let key = format!("{}.{}", metadata::ARCH_PREFIX, &basearch); + + if basearch != "x86_64" { + bail!("unexpected basearch '{}", basearch); + } + + for mut release in &mut graph.nodes { + if let Some(payload) = release.metadata.remove(&key) { + release.payload = payload; + release + .metadata + .insert(metadata::SCHEME.to_string(), "checksum".to_string()); + } + release + .metadata + .retain(|k, _| !k.starts_with(metadata::ARCH_PREFIX)); + } + + Ok(graph) +} + +/// Conditionally prune incoming edges towards throttled rollouts. +pub fn throttle_rollouts(input: Graph, client_wariness: f64) -> Graph { + use std::collections::HashSet; + + let mut graph = input; + let mut hidden = HashSet::new(); + let now = chrono::Utc::now().timestamp(); + + for (index, release) in graph.nodes.iter().enumerate() { + // Skip if this release is not being rolled out. + if release.metadata.get(metadata::ROLLOUT).is_none() { + continue; + }; + + // Start epoch defaults to 0. + let start_epoch = match release.metadata.get(metadata::START_EPOCH) { + Some(epoch) => epoch.parse::().unwrap_or(0), + None => 0i64, + }; + + // Start value defaults to 0.0. + let start_value = match release.metadata.get(metadata::START_VALUE) { + Some(val) => val.parse::().unwrap_or(0f64), + None => 0f64, + }; + + // Duration has no default (i.e. no progress). + let mut minutes: Option = None; + if let Some(mins) = release.metadata.get(metadata::DURATION) { + if let Ok(m) = mins.parse::() { + minutes = Some(m.max(1)); + } + } + + let throttling: f64; + if let Some(mins) = minutes { + let end = start_epoch + (mins.saturating_mul(60)) as i64; + let rate = (1.0 - start_value) / (end.saturating_sub(start_epoch)) as f64; + if now < start_epoch { + throttling = 0.0; + } else if now > end { + throttling = 1.0; + } else { + throttling = start_value + rate * (now - start_epoch) as f64; + } + } else { + // Without duration, rollout does not progress past initial value. + if now < start_epoch { + throttling = 0.0; + } else { + throttling = start_value + } + } + + if client_wariness > throttling { + hidden.insert(index); + } + } + + graph.edges.retain(|(_from, to)| { + let index = *to as usize; + !hidden.contains(&index) + }); + graph.edges.shrink_to_fit(); + + graph +} diff --git a/fcos-graph-builder/.gitignore b/fcos-graph-builder/.gitignore new file mode 100644 index 0000000..53eaa21 --- /dev/null +++ b/fcos-graph-builder/.gitignore @@ -0,0 +1,2 @@ +/target +**/*.rs.bk diff --git a/fcos-graph-builder/Cargo.toml b/fcos-graph-builder/Cargo.toml new file mode 100644 index 0000000..665cdc1 --- /dev/null +++ b/fcos-graph-builder/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "fcos-graph-builder" +version = "0.1.0" +authors = ["Allen Bai "] +edition = "2018" +publish = false + +[dependencies] +actix = "^0.9.0" +actix-web = "^2.0.0" +cbloom = "^0.1.3" +chrono = "^0.4.7" +commons = { path = "../commons" } +env_logger = "^0.7.1" +envsubst = "^0.2" +failure = "^0.1.1" +futures = "^0.3.1" +lazy_static = "^1.3.0" +log = "^0.4.3" +maplit = "^1.0" +prometheus = "^0.9" +reqwest = { version = "^0.10.1", features = ["json"] } +serde = "^1.0.70" +serde_derive = "^1.0.70" +serde_json = "^1.0.22" +structopt = "^0.3.7" diff --git a/fcos-graph-builder/src/main.rs b/fcos-graph-builder/src/main.rs new file mode 100644 index 0000000..76c7580 --- /dev/null +++ b/fcos-graph-builder/src/main.rs @@ -0,0 +1,138 @@ +#[macro_use] +extern crate log; +#[macro_use] +extern crate prometheus; + +mod scraper; + +use actix::prelude::*; +use actix_web::{web, App, HttpResponse}; +use commons::{metrics, policy}; +use failure::Fallible; +use prometheus::{IntCounterVec, IntGauge, IntGaugeVec}; +use serde::Deserialize; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use structopt::StructOpt; + +lazy_static::lazy_static! { + static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!( + "fcos_cincinnati_gb_scraper_graph_final_edges", + "Number of edges in the cached graph, after processing", + &["stream"] + ).unwrap(); + static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!( + "fcos_cincinnati_gb_scraper_graph_final_releases", + "Number of releases in the cached graph, after processing", + &["stream"] + ).unwrap(); + static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!( + "fcos_cincinnati_gb_scraper_graph_last_refresh_timestamp", + "UTC timestamp of last graph refresh", + &["stream"] + ).unwrap(); + static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!( + "fcos_cincinnati_gb_scraper_upstream_scrapes_total", + "Total number of upstream scrapes", + &["stream"] + ).unwrap(); + // NOTE(lucab): alternatively this could come from the runtime library, see + // https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics + static ref PROCESS_START_TIME: IntGauge = register_int_gauge!(opts!( + "process_start_time_seconds", + "Start time of the process since unix epoch in seconds." + )).unwrap(); + +} + +fn main() -> Fallible<()> { + env_logger::Builder::from_default_env().try_init()?; + + let opts = CliOptions::from_args(); + trace!("started with CLI options: {:#?}", opts); + + let sys = actix::System::new("fcos_cincinnati_gb"); + + // TODO(lucab): figure out all configuration params. + let streams_cfg = maplit::btreeset!["next", "stable", "testing"]; + let mut scrapers = HashMap::with_capacity(streams_cfg.len()); + for stream in streams_cfg { + let addr = scraper::Scraper::new(stream)?.start(); + scrapers.insert(stream.to_string(), addr); + } + + let service_state = AppState { scrapers }; + + let start_timestamp = chrono::Utc::now(); + PROCESS_START_TIME.set(start_timestamp.timestamp()); + + // Graph-builder service. + let gb_service = service_state.clone(); + actix_web::HttpServer::new(move || { + App::new() + .data(gb_service.clone()) + .route("/v1/graph", web::get().to(gb_serve_graph)) + }) + .bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), 8080))? + .run(); + + // Graph-builder status service. + let gb_status = service_state; + actix_web::HttpServer::new(move || { + App::new() + .data(gb_status.clone()) + .route("/metrics", web::get().to(metrics::serve_metrics)) + }) + .bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), 9080))? + .run(); + + sys.run()?; + Ok(()) +} + +#[derive(Clone, Debug)] +pub(crate) struct AppState { + scrapers: HashMap>, +} + +#[derive(Deserialize)] +pub struct GraphQuery { + basearch: Option, + stream: Option, +} + +pub(crate) async fn gb_serve_graph( + data: actix_web::web::Data, + query: actix_web::web::Query, +) -> Result { + let basearch = query + .basearch + .as_ref() + .map(String::from) + .unwrap_or_default(); + let stream = query.stream.as_ref().map(String::from).unwrap_or_default(); + + let addr = match data.scrapers.get(&stream) { + None => return Ok(HttpResponse::NotFound().finish()), + Some(addr) => addr, + }; + + let cached_graph = addr.send(scraper::GetCachedGraph { stream }).await??; + + let arch_graph = policy::pick_basearch(cached_graph, basearch)?; + let final_graph = policy::filter_deadends(arch_graph); + + let json = + serde_json::to_string_pretty(&final_graph).map_err(|e| failure::format_err!("{}", e))?; + let resp = HttpResponse::Ok() + .content_type("application/json") + .body(json); + Ok(resp) +} + +#[derive(Debug, StructOpt)] +pub(crate) struct CliOptions { + /// Path to configuration file. + #[structopt(short = "c")] + pub config_path: Option, +} diff --git a/fcos-graph-builder/src/scraper.rs b/fcos-graph-builder/src/scraper.rs new file mode 100644 index 0000000..0c4d8c4 --- /dev/null +++ b/fcos-graph-builder/src/scraper.rs @@ -0,0 +1,195 @@ +use actix::prelude::*; +use commons::{graph, metadata}; +use failure::{Error, Fallible}; +use reqwest::Method; +use std::num::NonZeroU64; +use std::time::Duration; + +/// Default timeout for HTTP requests (30 minutes). +const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60); + +/// Release scraper. +#[derive(Clone, Debug)] +pub struct Scraper { + graph: graph::Graph, + hclient: reqwest::Client, + stream: String, + pause_secs: NonZeroU64, + stream_metadata_url: reqwest::Url, + release_index_url: reqwest::Url, +} + +impl Scraper { + pub fn new(stream: S) -> Fallible + where + S: Into, + { + let stream = stream.into(); + let vars = maplit::hashmap! { "stream".to_string() => stream.clone() }; + let releases_json = envsubst::substitute(metadata::RELEASES_JSON, &vars)?; + let stream_json = envsubst::substitute(metadata::STREAM_JSON, &vars)?; + let hclient = reqwest::ClientBuilder::new() + .timeout(DEFAULT_HTTP_REQ_TIMEOUT) + .build()?; + + let scraper = Self { + graph: graph::Graph::default(), + hclient, + pause_secs: NonZeroU64::new(30).expect("non-zero pause"), + stream, + release_index_url: reqwest::Url::parse(&releases_json)?, + stream_metadata_url: reqwest::Url::parse(&stream_json)?, + }; + Ok(scraper) + } + + /// Return a request builder with base URL and parameters set. + fn new_request( + &self, + method: reqwest::Method, + url: reqwest::Url, + ) -> Fallible { + let builder = self.hclient.request(method, url); + Ok(builder) + } + + /// Fetch releases from release-index. + fn fetch_releases(&self) -> impl Future, Error>> { + let target = self.release_index_url.clone(); + let req = self.new_request(Method::GET, target); + + async { + let resp = req?.send().await?; + let content = resp.error_for_status()?; + let json = content.json::().await?; + Ok(json.releases) + } + } + + /// Fetch updates metadata. + fn fetch_updates(&self) -> impl Future> { + let target = self.stream_metadata_url.clone(); + let req = self.new_request(Method::GET, target); + + async { + let resp = req?.send().await?; + let content = resp.error_for_status()?; + let json = content.json::().await?; + Ok(json) + } + } + + /// Combine release-index and updates metadata. + fn assemble_graph(&self) -> impl Future> { + let stream_releases = self.fetch_releases(); + let stream_updates = self.fetch_updates(); + + // NOTE(lucab): this inner scope is in order to get a 'static lifetime on + // the future for actix compatibility. + async { + let (graph, updates) = + futures::future::try_join(stream_releases, stream_updates).await?; + graph::Graph::from_metadata(graph, updates) + } + } + + /// Update cached graph. + fn update_cached_graph(&mut self, graph: graph::Graph) { + self.graph = graph; + + let refresh_timestamp = chrono::Utc::now(); + crate::LAST_REFRESH + .with_label_values(&[&self.stream]) + .set(refresh_timestamp.timestamp()); + crate::GRAPH_FINAL_EDGES + .with_label_values(&[&self.stream]) + .set(self.graph.edges.len() as i64); + crate::GRAPH_FINAL_RELEASES + .with_label_values(&[&self.stream]) + .set(self.graph.nodes.len() as i64); + } +} + +impl Actor for Scraper { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + // Kick-start the state machine. + Self::tick_now(ctx); + } +} + +pub(crate) struct RefreshTick {} + +impl Message for RefreshTick { + type Result = Result<(), failure::Error>; +} + +impl Handler for Scraper { + type Result = ResponseActFuture>; + + fn handle(&mut self, _msg: RefreshTick, _ctx: &mut Self::Context) -> Self::Result { + crate::UPSTREAM_SCRAPES + .with_label_values(&[&self.stream]) + .inc(); + + let latest_graph = self.assemble_graph(); + let update_graph = actix::fut::wrap_future::<_, Self>(latest_graph) + .map(|graph, actor, _ctx| { + match graph { + Ok(graph) => actor.update_cached_graph(graph), + Err(e) => log::error!("transient scraping failure: {}", e), + }; + }) + .then(|_r, actor, ctx| { + let pause = Duration::from_secs(actor.pause_secs.get()); + Self::tick_later(ctx, pause); + actix::fut::ok(()) + }); + + Box::new(update_graph) + } +} + +pub(crate) struct GetCachedGraph { + pub(crate) stream: String, +} + +impl Default for GetCachedGraph { + fn default() -> Self { + Self { + stream: "testing".to_string(), + } + } +} + +impl Message for GetCachedGraph { + type Result = Result; +} + +impl Handler for Scraper { + type Result = ResponseActFuture>; + + fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result { + use failure::format_err; + if msg.stream != self.stream { + return Box::new(actix::fut::err(format_err!( + "unexpected stream '{}'", + msg.stream + ))); + } + Box::new(actix::fut::ok(self.graph.clone())) + } +} + +impl Scraper { + /// Schedule an immediate refresh of the state machine. + pub fn tick_now(ctx: &mut Context) { + ctx.notify(RefreshTick {}) + } + + /// Schedule a delayed refresh of the state machine. + pub fn tick_later(ctx: &mut Context, after: std::time::Duration) -> actix::SpawnHandle { + ctx.notify_later(RefreshTick {}, after) + } +} diff --git a/fcos-policy-engine/.gitignore b/fcos-policy-engine/.gitignore new file mode 100644 index 0000000..53eaa21 --- /dev/null +++ b/fcos-policy-engine/.gitignore @@ -0,0 +1,2 @@ +/target +**/*.rs.bk diff --git a/fcos-policy-engine/Cargo.toml b/fcos-policy-engine/Cargo.toml new file mode 100644 index 0000000..bf28630 --- /dev/null +++ b/fcos-policy-engine/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "fcos-policy-engine" +version = "0.1.0" +authors = ["Allen Bai "] +edition = "2018" +publish = false + +[dependencies] +actix = "^0.9.0" +actix-web = "^2.0.0" +cbloom = "^0.1.3" +chrono = "^0.4.7" +commons = { path = "../commons" } +env_logger = "^0.7.1" +envsubst = "^0.2" +failure = "^0.1.1" +futures = "^0.3.1" +lazy_static = "^1.3.0" +log = "^0.4.3" +maplit = "^1.0" +prometheus = "^0.9" +reqwest = { version = "^0.10.1", features = ["json"] } +serde = "^1.0.70" +serde_derive = "^1.0.70" +serde_json = "^1.0.22" +structopt = "^0.3.7" diff --git a/fcos-policy-engine/src/main.rs b/fcos-policy-engine/src/main.rs new file mode 100644 index 0000000..cca7692 --- /dev/null +++ b/fcos-policy-engine/src/main.rs @@ -0,0 +1,185 @@ +#[macro_use] +extern crate log; +#[macro_use] +extern crate prometheus; + +use actix_web::{web, App, HttpResponse}; +use commons::{graph, metrics, policy}; +use failure::{Error, Fallible}; +use prometheus::{Histogram, IntCounter, IntGauge}; +use serde::Deserialize; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::Arc; +use structopt::StructOpt; + +lazy_static::lazy_static! { + static ref V1_GRAPH_INCOMING_REQS: IntCounter = register_int_counter!(opts!( + "fcos_cincinnati_pe_v1_graph_incoming_requests_total", + "Total number of incoming HTTP client request to /v1/graph" + )) + .unwrap(); + static ref UNIQUE_IDS: IntCounter = register_int_counter!(opts!( + "fcos_cincinnati_pe_v1_graph_unique_uuids_total", + "Total number of unique node UUIDs (per-instance Bloom filter)." + )) + .unwrap(); + static ref ROLLOUT_WARINESS: Histogram = register_histogram!( + "fcos_cincinnati_pe_v1_graph_rollout_wariness", + "Per-request rollout wariness.", + prometheus::linear_buckets(0.0, 0.1, 11).unwrap() + ) + .unwrap(); + // NOTE(lucab): alternatively this could come from the runtime library, see + // https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics + static ref PROCESS_START_TIME: IntGauge = register_int_gauge!(opts!( + "process_start_time_seconds", + "Start time of the process since unix epoch in seconds." + )).unwrap(); + +} + +fn main() -> Fallible<()> { + env_logger::Builder::from_default_env().try_init()?; + + let opts = CliOptions::from_args(); + trace!("started with CLI options: {:#?}", opts); + + let sys = actix::System::new("fcos_cincinnati_pe"); + + let node_population = Arc::new(cbloom::Filter::new(10 * 1024 * 1024, 1_000_000)); + let service_state = AppState { + population: Arc::clone(&node_population), + }; + + let start_timestamp = chrono::Utc::now(); + PROCESS_START_TIME.set(start_timestamp.timestamp()); + + // Policy-engine service. + let pe_service = service_state.clone(); + actix_web::HttpServer::new(move || { + App::new() + .data(pe_service.clone()) + .route("/v1/graph", web::get().to(pe_serve_graph)) + }) + .bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), 8081))? + .run(); + + // Policy-engine status service. + let pe_status = service_state; + actix_web::HttpServer::new(move || { + App::new() + .data(pe_status.clone()) + .route("/metrics", web::get().to(metrics::serve_metrics)) + }) + .bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), 9081))? + .run(); + + sys.run()?; + Ok(()) +} + +#[derive(Clone, Debug)] +pub(crate) struct AppState { + population: Arc, +} + +#[derive(Deserialize)] +pub struct GraphQuery { + basearch: Option, + stream: Option, + rollout_wariness: Option, + node_uuid: Option, +} + +pub(crate) async fn pe_serve_graph( + data: actix_web::web::Data, + actix_web::web::Query(query): actix_web::web::Query, +) -> Result { + pe_record_metrics(&data, &query); + + let basearch = query + .basearch + .as_ref() + .map(String::from) + .unwrap_or_default(); + let stream = query.stream.as_ref().map(String::from).unwrap_or_default(); + trace!("graph query stream: {:#?}", stream); + + let wariness = compute_wariness(&query); + ROLLOUT_WARINESS.observe(wariness); + + // TODO (zonggen): remove hard-coded empty graph and use the graph fetched from fcos-graph-builder + let cached_graph = graph::Graph::default(); + + let arch_graph = policy::pick_basearch(cached_graph, basearch)?; + let throttled_graph = policy::throttle_rollouts(arch_graph, wariness); + let final_graph = policy::filter_deadends(throttled_graph); + + let json = + serde_json::to_string_pretty(&final_graph).map_err(|e| failure::format_err!("{}", e))?; + let resp = HttpResponse::Ok() + .content_type("application/json") + .body(json); + Ok(resp) +} + +#[allow(clippy::let_and_return)] +fn compute_wariness(params: &GraphQuery) -> f64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + if let Ok(input) = params + .rollout_wariness + .as_ref() + .map(String::from) + .unwrap_or_default() + .parse::() + { + let wariness = input.max(0.0).min(1.0); + return wariness; + } + + let uuid = params + .node_uuid + .as_ref() + .map(String::from) + .unwrap_or_default(); + let wariness = { + // Left limit not included in range. + const COMPUTED_MIN: f64 = 0.0 + 0.000_001; + const COMPUTED_MAX: f64 = 1.0; + let mut hasher = DefaultHasher::new(); + uuid.hash(&mut hasher); + let digest = hasher.finish(); + // Scale down. + let scaled = (digest as f64) / (std::u64::MAX as f64); + // Clamp within limits. + scaled.max(COMPUTED_MIN).min(COMPUTED_MAX) + }; + + wariness +} + +pub(crate) fn pe_record_metrics(data: &AppState, query: &GraphQuery) { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + V1_GRAPH_INCOMING_REQS.inc(); + + if let Some(uuid) = &query.node_uuid { + let mut hasher = DefaultHasher::default(); + uuid.hash(&mut hasher); + let client_uuid = hasher.finish(); + if !data.population.maybe_contains(client_uuid) { + data.population.insert(client_uuid); + UNIQUE_IDS.inc(); + } + } +} + +#[derive(Debug, StructOpt)] +pub(crate) struct CliOptions { + /// Path to configuration file. + #[structopt(short = "c")] + pub config_path: Option, +}