Skip to content

Commit

Permalink
chore(pegboard): get connection working e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 25, 2024
1 parent 8fbfb47 commit aefc06d
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 174 deletions.
1 change: 0 additions & 1 deletion lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ indoc = "2.0"
lazy_static = "1.4"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
pnet_datalink = "0.35.0"
prometheus = "0.13"
rand = "0.8"
reqwest = { version = "0.11", features = ["stream"] }
Expand Down
29 changes: 2 additions & 27 deletions lib/pegboard/manager/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
net::{IpAddr, Ipv4Addr},
path::Path,
};
use std::{net::Ipv4Addr, path::Path};

use anyhow::*;
use futures_util::StreamExt;
Expand Down Expand Up @@ -33,7 +30,7 @@ async fn main() -> Result<()> {
tokio::spawn(metrics::run_standalone());

let client_id = Uuid::parse_str(&utils::var("CLIENT_ID")?)?;
let network_ip = get_network_ip(&utils::var("NETWORK_INTERFACE")?)?;
let network_ip = utils::var("NETWORK_IP")?.parse::<Ipv4Addr>()?;

let system = System::new_with_specifics(
RefreshKind::new()
Expand Down Expand Up @@ -76,28 +73,6 @@ async fn main() -> Result<()> {
ctx.start(rx).await
}

fn get_network_ip(network_interface_name: &str) -> Result<Ipv4Addr> {
let network_interface = pnet_datalink::interfaces()
.into_iter()
.find(|iface| iface.name == network_interface_name)
.context(format!(
"network interface not found: {network_interface_name}"
))?;
let network_ip = network_interface
.ips
.iter()
.find_map(|net| {
if let IpAddr::V4(ip) = net.ip() {
Some(ip)
} else {
None
}
})
.context("no ipv4 network on interface")?;

Ok(network_ip)
}

fn init_tracing() {
tracing_subscriber::registry()
.with(
Expand Down
8 changes: 6 additions & 2 deletions lib/pegboard/manager/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
"
CREATE TABLE IF NOT EXISTS state (
last_event_idx INTEGER NOT NULL,
last_command_idx INTEGER NOT NULL
last_command_idx INTEGER NOT NULL,
-- Keeps this table having one row
_persistence BOOLEAN UNIQUE NOT NULL DEFAULT TRUE
)
",
))
Expand All @@ -129,8 +132,9 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {

sqlx::query(indoc!(
"
INSERT INTO state
INSERT INTO state (last_event_idx, last_command_idx)
VALUES (0, 0)
ON CONFLICT DO NOTHING
",
))
.execute(&mut *conn)
Expand Down
9 changes: 8 additions & 1 deletion svc/pkg/cluster/src/util/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,14 @@ lazy_static::lazy_static! {
).unwrap();
pub static ref NOMAD_JOIN_DURATION: HistogramVec = register_histogram_vec_with_registry!(
"provision_nomad_join_duration",
"Time from installed to nomad joined.",
"Time from installed to Nomad joined.",
&["cluster_id", "datacenter_id", "provider_datacenter_id", "datacenter_name_id"],
PROVISION_BUCKETS.to_vec(),
*REGISTRY,
).unwrap();
pub static ref PEGBOARD_JOIN_DURATION: HistogramVec = register_histogram_vec_with_registry!(
"provision_pegboard_join_duration",
"Time from installed to Pegboard joined.",
&["cluster_id", "datacenter_id", "provider_datacenter_id", "datacenter_name_id"],
PROVISION_BUCKETS.to_vec(),
*REGISTRY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ ConditionPathExists=/etc/pegboard/
[Service]
Environment="CLIENT_ID=___SERVER_ID___"
Environment="NETWORK_INTERFACE=__VLAN_IFACE__"
Environment="NETWORK_IP=___VLAN_IP___"
ExecStart=/usr/bin/pegboard
Restart=always
RestartSec=2
Expand Down
190 changes: 128 additions & 62 deletions svc/pkg/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use chirp_workflow::prelude::*;
use rand::Rng;
use serde_json::json;

pub(crate) mod dns_create;
pub(crate) mod dns_delete;
Expand Down Expand Up @@ -185,13 +186,24 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
.send()
.await?;

// Create DNS record because the server is already installed
if let PoolType::Gg = input.pool_type {
ctx.workflow(dns_create::Input {
server_id: input.server_id,
})
.output()
.await?;
match input.pool_type {
// Create DNS record because the server is already installed
PoolType::Gg => {
ctx.workflow(dns_create::Input {
server_id: input.server_id,
})
.output()
.await?;
}
// Update tags to include pegboard client_id (currently the same as the server_id)
PoolType::Pegboard => {
ctx.activity(UpdateTagsInput {
server_id: input.server_id,
client_id: input.server_id,
})
.await?;
}
_ => {}
}

provider_server_workflow_id
Expand Down Expand Up @@ -252,6 +264,23 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
.send()
.await?;
}
Main::PegboardRegistered(_) => {
ctx.activity(SetPegboardClientIdInput {
server_id: input.server_id,
cluster_id: dc.cluster_id,
datacenter_id: dc.datacenter_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
datacenter_name_id: dc.name_id.clone(),
client_id: input.server_id,
})
.await?;

// Scale to get rid of tainted servers
ctx.signal(crate::workflows::datacenter::Scale {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;
}
Main::Drain(_) => {
ctx.workflow(drain::Input {
datacenter_id: input.datacenter_id,
Expand Down Expand Up @@ -495,40 +524,37 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<()>
)
.await?;

insert_metrics(
input.cluster_id,
input.datacenter_id,
&input.provider_datacenter_id,
&input.datacenter_name_id,
&input.pool_type,
provision_complete_ts,
create_ts,
)
.await;

Ok(())
}

async fn insert_metrics(
cluster_id: Uuid,
datacenter_id: Uuid,
provider_datacenter_id: &str,
datacenter_name_id: &str,
pool_type: &PoolType,
provision_complete_ts: i64,
create_ts: i64,
) {
// Insert metrics
let dt = (provision_complete_ts - create_ts) as f64 / 1000.0;

metrics::PROVISION_DURATION
.with_label_values(&[
&cluster_id.to_string(),
&datacenter_id.to_string(),
provider_datacenter_id,
datacenter_name_id,
&pool_type.to_string(),
&input.cluster_id.to_string(),
&input.datacenter_id.to_string(),
&input.provider_datacenter_id,
&input.datacenter_name_id,
&input.pool_type.to_string(),
])
.observe(dt);

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct UpdateTagsInput {
server_id: Uuid,
client_id: Uuid,
}

#[activity(UpdateTags)]
async fn update_tags(ctx: &ActivityCtx, input: &UpdateTagsInput) -> GlobalResult<()> {
ctx.update_workflow_tags(&json!({
"server_id": input.server_id,
"client_id": input.client_id,
}))
.await?;

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down Expand Up @@ -575,8 +601,7 @@ async fn set_nomad_node_id(ctx: &ActivityCtx, input: &SetNomadNodeIdInput) -> Gl
SET
nomad_node_id = $2,
nomad_join_ts = $3
WHERE
server_id = $1
WHERE server_id = $1
RETURNING nomad_node_id, install_complete_ts
",
input.server_id,
Expand All @@ -591,39 +616,75 @@ async fn set_nomad_node_id(ctx: &ActivityCtx, input: &SetNomadNodeIdInput) -> Gl

// Insert metrics
if let Some(install_complete_ts) = install_complete_ts {
insert_nomad_metrics(
input.cluster_id,
input.datacenter_id,
&input.provider_datacenter_id,
&input.datacenter_name_id,
nomad_join_ts,
install_complete_ts,
);
let dt = (nomad_join_ts - install_complete_ts) as f64 / 1000.0;

metrics::NOMAD_JOIN_DURATION
.with_label_values(&[
&input.cluster_id.to_string(),
&input.datacenter_id.to_string(),
&input.provider_datacenter_id,
&input.datacenter_name_id,
])
.observe(dt);
} else {
tracing::warn!("missing install_complete_ts");
}

Ok(())
}

fn insert_nomad_metrics(
#[derive(Debug, Serialize, Deserialize, Hash)]
struct SetPegboardClientIdInput {
server_id: Uuid,
cluster_id: Uuid,
datacenter_id: Uuid,
provider_datacenter_id: &str,
datacenter_name_id: &str,
nomad_join_ts: i64,
install_complete_ts: i64,
) {
let dt = (nomad_join_ts - install_complete_ts) as f64 / 1000.0;

metrics::NOMAD_JOIN_DURATION
.with_label_values(&[
&cluster_id.to_string(),
&datacenter_id.to_string(),
provider_datacenter_id,
datacenter_name_id,
])
.observe(dt);
provider_datacenter_id: String,
datacenter_name_id: String,
client_id: Uuid,
}

#[activity(SetPegboardClientId)]
async fn set_pegboard_client_id(
ctx: &ActivityCtx,
input: &SetPegboardClientIdInput,
) -> GlobalResult<()> {
let pegboard_join_ts = util::timestamp::now();

let (old_pegboard_client_id, install_complete_ts) = sql_fetch_one!(
[ctx, (Option<Uuid>, Option<i64>)]
"
UPDATE db_cluster.servers
SET
pegboard_client_id = $2
WHERE server_id = $1
RETURNING pegboard_client_id, install_complete_ts
",
input.server_id,
&input.client_id,
)
.await?;

if let Some(old_pegboard_client_id) = old_pegboard_client_id {
tracing::warn!(%old_pegboard_client_id, "pegboard client id was already set");
}

// Insert metrics
if let Some(install_complete_ts) = install_complete_ts {
let dt = (pegboard_join_ts - install_complete_ts) as f64 / 1000.0;

metrics::PEGBOARD_JOIN_DURATION
.with_label_values(&[
&input.cluster_id.to_string(),
&input.datacenter_id.to_string(),
&input.provider_datacenter_id,
&input.datacenter_name_id,
])
.observe(dt);
} else {
tracing::warn!("missing install_complete_ts");
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down Expand Up @@ -745,7 +806,11 @@ impl CustomListener for State {
*/
async fn listen(&self, ctx: &ListenCtx) -> WorkflowResult<Self::Output> {
// Determine which signals to listen to
let mut signals = vec![Destroy::NAME, NomadRegistered::NAME];
let mut signals = vec![
Destroy::NAME,
NomadRegistered::NAME,
pegboard::workflows::client::Registered::NAME,
];

if !self.draining {
signals.push(Drain::NAME);
Expand Down Expand Up @@ -823,4 +888,5 @@ join_signal!(Main {
DnsDelete,
Destroy,
NomadRegistered,
PegboardRegistered(pegboard::workflows::client::Registered),
});
1 change: 0 additions & 1 deletion svc/pkg/pegboard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use chirp_workflow::prelude::*;
pub mod ops;
pub mod protocol;
pub mod types;
pub mod utils;
pub mod workflows;

pub fn registry() -> WorkflowResult<Registry> {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/pegboard/src/utils.rs

This file was deleted.

Loading

0 comments on commit aefc06d

Please sign in to comment.