diff --git a/lib/pegboard/manager/src/main.rs b/lib/pegboard/manager/src/main.rs index 04dc525aeb..8dcb047a1a 100644 --- a/lib/pegboard/manager/src/main.rs +++ b/lib/pegboard/manager/src/main.rs @@ -30,6 +30,7 @@ async fn main() -> Result<()> { tokio::spawn(metrics::run_standalone()); let client_id = Uuid::parse_str(&utils::var("CLIENT_ID")?)?; + let datacenter_id = Uuid::parse_str(&utils::var("DATACENTER_ID")?)?; let network_ip = utils::var("NETWORK_IP")?.parse::()?; let system = System::new_with_specifics( @@ -56,7 +57,8 @@ async fn main() -> Result<()> { let mut url = Url::parse("ws://127.0.0.1:5030")?; url.set_path(&format!("/v{PROTOCOL_VERSION}")); url.query_pairs_mut() - .append_pair("client_id", &client_id.to_string()); + .append_pair("client_id", &client_id.to_string()) + .append_pair("datacenter_id", &datacenter_id.to_string()); tracing::info!("connecting to ws: {url}"); diff --git a/svc/pkg/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh b/svc/pkg/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh index daa51129dd..2f205e0259 100644 --- a/svc/pkg/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh +++ b/svc/pkg/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh @@ -295,6 +295,7 @@ ConditionPathExists=/etc/pegboard/ [Service] Environment="CLIENT_ID=___SERVER_ID___" +Environment="DATACENTER_ID=___DATACENTER_ID___" Environment="NETWORK_IP=___VLAN_IP___" ExecStart=/usr/bin/pegboard Restart=always diff --git a/svc/pkg/pegboard/db/pegboard/migrations/20240913005543_init.up.sql b/svc/pkg/pegboard/db/pegboard/migrations/20240913005543_init.up.sql index b539cc010c..db341d96da 100644 --- a/svc/pkg/pegboard/db/pegboard/migrations/20240913005543_init.up.sql +++ b/svc/pkg/pegboard/db/pegboard/migrations/20240913005543_init.up.sql @@ -1,5 +1,6 @@ CREATE TABLE clients ( client_id UUID PRIMARY KEY, + datacenter_id UUID NOT NULL, create_ts INT NOT NULL, last_ping_ts INT NOT NULL, last_event_idx INT NOT NULL DEFAULT 0, diff --git a/svc/pkg/pegboard/src/workflows/datacenter.rs b/svc/pkg/pegboard/src/workflows/datacenter.rs index 5c04133736..af86bdc37d 100644 --- a/svc/pkg/pegboard/src/workflows/datacenter.rs +++ b/svc/pkg/pegboard/src/workflows/datacenter.rs @@ -108,17 +108,21 @@ async fn allocate_container( " INSERT INTO db_pegboard.containers (container_id, client_id, config, create_ts) SELECT $3, client_id, $4, $5 - FROM db_pegboard.clients + FROM db_pegboard.clients AS c WHERE datacenter_id = $1 AND last_ping_ts > $2 AND + drain_ts IS NULL AND + delete_ts IS NULL AND ( SELECT SUM(((config->'resources'->'cpu')::INT)) - FROM db_pegboard.containers + FROM db_pegboard.containers AS co + WHERE co.client_id = c.client_id ) + $6 <= cpu AND ( SELECT SUM(((config->'resources'->'memory')::INT)) - FROM db_pegboard.containers + FROM db_pegboard.containers AS co + WHERE co.client_id = c.client_id ) + $7 <= memory ORDER BY cpu, memory DESC LIMIT 1 diff --git a/svc/pkg/pegboard/standalone/ws/src/lib.rs b/svc/pkg/pegboard/standalone/ws/src/lib.rs index ac1a100d10..2efaf4d11c 100644 --- a/svc/pkg/pegboard/standalone/ws/src/lib.rs +++ b/svc/pkg/pegboard/standalone/ws/src/lib.rs @@ -77,24 +77,20 @@ async fn handle_connection( let ctx = ctx.clone(); tokio::spawn(async move { - let (ws_stream, protocol_version, client_id) = - match setup_connection(raw_stream, addr).await { - Ok(x) => x, - Err(err) => { - tracing::error!(?addr, "{err}"); - return; - } - }; + let (ws_stream, url_data) = match setup_connection(raw_stream, addr).await { + Ok(x) => x, + Err(err) => { + tracing::error!(?addr, "{err}"); + return; + } + }; - if let Err(err) = - handle_connection_inner(&ctx, conns.clone(), ws_stream, protocol_version, client_id) - .await - { + if let Err(err) = handle_connection_inner(&ctx, conns.clone(), ws_stream, url_data).await { tracing::error!(?addr, "{err}"); } // Clean up - let conn = conns.write().await.remove(&client_id); + let conn = conns.write().await.remove(&url_data.client_id); if let Some(conn) = conn { if let Err(err) = conn.tx.lock().await.send(Message::Close(None)).await { tracing::error!(?addr, "failed closing socket: {err}"); @@ -106,7 +102,7 @@ async fn handle_connection( async fn setup_connection( raw_stream: TcpStream, addr: SocketAddr, -) -> GlobalResult<(WebSocketStream, u16, Uuid)> { +) -> GlobalResult<(WebSocketStream, UrlData)> { let mut uri = None; let ws_stream = tokio_tungstenite::accept_hdr_async( raw_stream, @@ -123,17 +119,20 @@ async fn setup_connection( // Parse URL let uri = unwrap!(uri, "socket has no associated request"); - let (protocol_version, client_id) = parse_url(addr, uri)?; + let url_data = parse_url(addr, uri)?; - Ok((ws_stream, protocol_version, client_id)) + Ok((ws_stream, url_data)) } async fn handle_connection_inner( ctx: &StandaloneCtx, conns: Arc>, ws_stream: WebSocketStream, - protocol_version: u16, - client_id: Uuid, + UrlData { + protocol_version, + client_id, + datacenter_id, + }: UrlData, ) -> GlobalResult<()> { let (tx, mut rx) = ws_stream.split(); @@ -157,7 +156,7 @@ async fn handle_connection_inner( } // Insert into db and spawn workflow (if not exists) - upsert_client(ctx, client_id).await?; + upsert_client(ctx, client_id, datacenter_id).await?; // Receive messages from socket while let Some(msg) = rx.next().await { @@ -191,7 +190,11 @@ async fn handle_connection_inner( GlobalResult::Ok(()) } -async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()> { +async fn upsert_client( + ctx: &StandaloneCtx, + client_id: Uuid, + datacenter_id: Uuid, +) -> GlobalResult<()> { // Inserting before creating the workflow prevents a race condition with using select + insert instead let (exists, deleted) = sql_fetch_one!( [ctx, (bool, bool)] @@ -210,8 +213,10 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()> delete_ts IS NOT NULL ), insert_client AS ( - INSERT INTO db_pegboard.clients (client_id, create_ts, last_ping_ts) - VALUES ($1, $2, $2) + INSERT INTO db_pegboard.clients ( + client_id, datacenter_id, create_ts, last_ping_ts + ) + VALUES ($1, $2, $3, $3) ON CONFLICT (client_id) DO UPDATE SET delete_ts = NULL @@ -222,6 +227,7 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()> EXISTS(SELECT 1 FROM select_deleted) AS deleted ", client_id, + datacenter_id, util::timestamp::now(), ) .await?; @@ -232,7 +238,7 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()> if exists == deleted { tracing::info!(?client_id, "new client"); - + // Spawn a new client workflow ctx.workflow(pegboard::workflows::client::Input { client_id }) .tag("client_id", client_id) @@ -334,7 +340,14 @@ async fn msg_thread(ctx: &StandaloneCtx, conns: Arc>) -> Glo } } -fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> { +#[derive(Clone, Copy)] +struct UrlData { + protocol_version: u16, + client_id: Uuid, + datacenter_id: Uuid, +} + +fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult { let url = url::Url::parse(&format!("ws://{addr}{uri}"))?; // Get protocol version from last path segment @@ -345,7 +358,7 @@ fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> { ensure!(last_segment.starts_with('v'), "invalid protocol version"); let protocol_version = last_segment[1..].parse::()?; - // Read client_id from query parameters + // Read client_id and datacenter_id from query parameters let client_id = unwrap!( url.query_pairs() .find_map(|(n, v)| (n == "client_id").then_some(v)), @@ -353,5 +366,16 @@ fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> { ); let client_id = util::uuid::parse(client_id.as_ref())?; - Ok((protocol_version, client_id)) + let datacenter_id = unwrap!( + url.query_pairs() + .find_map(|(n, v)| (n == "datacenter_id").then_some(v)), + "missing `datacenter_id` query parameter" + ); + let datacenter_id = util::uuid::parse(datacenter_id.as_ref())?; + + Ok(UrlData { + protocol_version, + client_id, + datacenter_id, + }) }