Skip to content

Commit

Permalink
fix(pegboard): add dc id to pb client
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 29, 2024
1 parent d6dd68c commit 572ed09
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 30 deletions.
4 changes: 3 additions & 1 deletion lib/pegboard/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn main() -> Result<()> {
tokio::spawn(metrics::run_standalone());

let client_id = Uuid::parse_str(&utils::var("CLIENT_ID")?)?;
let datacenter_id = Uuid::parse_str(&utils::var("DATACENTER_ID")?)?;
let network_ip = utils::var("NETWORK_IP")?.parse::<Ipv4Addr>()?;

let system = System::new_with_specifics(
Expand All @@ -56,7 +57,8 @@ async fn main() -> Result<()> {
let mut url = Url::parse("ws://127.0.0.1:5030")?;
url.set_path(&format!("/v{PROTOCOL_VERSION}"));
url.query_pairs_mut()
.append_pair("client_id", &client_id.to_string());
.append_pair("client_id", &client_id.to_string())
.append_pair("datacenter_id", &datacenter_id.to_string());

tracing::info!("connecting to ws: {url}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ ConditionPathExists=/etc/pegboard/
[Service]
Environment="CLIENT_ID=___SERVER_ID___"
Environment="DATACENTER_ID=___DATACENTER_ID___"
Environment="NETWORK_IP=___VLAN_IP___"
ExecStart=/usr/bin/pegboard
Restart=always
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE TABLE clients (
client_id UUID PRIMARY KEY,
datacenter_id UUID NOT NULL,
create_ts INT NOT NULL,
last_ping_ts INT NOT NULL,
last_event_idx INT NOT NULL DEFAULT 0,
Expand Down
10 changes: 7 additions & 3 deletions svc/pkg/pegboard/src/workflows/datacenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,21 @@ async fn allocate_container(
"
INSERT INTO db_pegboard.containers (container_id, client_id, config, create_ts)
SELECT $3, client_id, $4, $5
FROM db_pegboard.clients
FROM db_pegboard.clients AS c
WHERE
datacenter_id = $1 AND
last_ping_ts > $2 AND
drain_ts IS NULL AND
delete_ts IS NULL AND
(
SELECT SUM(((config->'resources'->'cpu')::INT))
FROM db_pegboard.containers
FROM db_pegboard.containers AS co
WHERE co.client_id = c.client_id
) + $6 <= cpu AND
(
SELECT SUM(((config->'resources'->'memory')::INT))
FROM db_pegboard.containers
FROM db_pegboard.containers AS co
WHERE co.client_id = c.client_id
) + $7 <= memory
ORDER BY cpu, memory DESC
LIMIT 1
Expand Down
76 changes: 50 additions & 26 deletions svc/pkg/pegboard/standalone/ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,20 @@ async fn handle_connection(
let ctx = ctx.clone();

tokio::spawn(async move {
let (ws_stream, protocol_version, client_id) =
match setup_connection(raw_stream, addr).await {
Ok(x) => x,
Err(err) => {
tracing::error!(?addr, "{err}");
return;
}
};
let (ws_stream, url_data) = match setup_connection(raw_stream, addr).await {
Ok(x) => x,
Err(err) => {
tracing::error!(?addr, "{err}");
return;
}
};

if let Err(err) =
handle_connection_inner(&ctx, conns.clone(), ws_stream, protocol_version, client_id)
.await
{
if let Err(err) = handle_connection_inner(&ctx, conns.clone(), ws_stream, url_data).await {
tracing::error!(?addr, "{err}");
}

// Clean up
let conn = conns.write().await.remove(&client_id);
let conn = conns.write().await.remove(&url_data.client_id);
if let Some(conn) = conn {
if let Err(err) = conn.tx.lock().await.send(Message::Close(None)).await {
tracing::error!(?addr, "failed closing socket: {err}");
Expand All @@ -106,7 +102,7 @@ async fn handle_connection(
async fn setup_connection(
raw_stream: TcpStream,
addr: SocketAddr,
) -> GlobalResult<(WebSocketStream<TcpStream>, u16, Uuid)> {
) -> GlobalResult<(WebSocketStream<TcpStream>, UrlData)> {
let mut uri = None;
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
Expand All @@ -123,17 +119,20 @@ async fn setup_connection(

// Parse URL
let uri = unwrap!(uri, "socket has no associated request");
let (protocol_version, client_id) = parse_url(addr, uri)?;
let url_data = parse_url(addr, uri)?;

Ok((ws_stream, protocol_version, client_id))
Ok((ws_stream, url_data))
}

async fn handle_connection_inner(
ctx: &StandaloneCtx,
conns: Arc<RwLock<Connections>>,
ws_stream: WebSocketStream<TcpStream>,
protocol_version: u16,
client_id: Uuid,
UrlData {
protocol_version,
client_id,
datacenter_id,
}: UrlData,
) -> GlobalResult<()> {
let (tx, mut rx) = ws_stream.split();

Expand All @@ -157,7 +156,7 @@ async fn handle_connection_inner(
}

// Insert into db and spawn workflow (if not exists)
upsert_client(ctx, client_id).await?;
upsert_client(ctx, client_id, datacenter_id).await?;

// Receive messages from socket
while let Some(msg) = rx.next().await {
Expand Down Expand Up @@ -191,7 +190,11 @@ async fn handle_connection_inner(
GlobalResult::Ok(())
}

async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()> {
async fn upsert_client(
ctx: &StandaloneCtx,
client_id: Uuid,
datacenter_id: Uuid,
) -> GlobalResult<()> {
// Inserting before creating the workflow prevents a race condition with using select + insert instead
let (exists, deleted) = sql_fetch_one!(
[ctx, (bool, bool)]
Expand All @@ -210,8 +213,10 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()>
delete_ts IS NOT NULL
),
insert_client AS (
INSERT INTO db_pegboard.clients (client_id, create_ts, last_ping_ts)
VALUES ($1, $2, $2)
INSERT INTO db_pegboard.clients (
client_id, datacenter_id, create_ts, last_ping_ts
)
VALUES ($1, $2, $3, $3)
ON CONFLICT (client_id)
DO UPDATE
SET delete_ts = NULL
Expand All @@ -222,6 +227,7 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()>
EXISTS(SELECT 1 FROM select_deleted) AS deleted
",
client_id,
datacenter_id,
util::timestamp::now(),
)
.await?;
Expand All @@ -232,7 +238,7 @@ async fn upsert_client(ctx: &StandaloneCtx, client_id: Uuid) -> GlobalResult<()>

if exists == deleted {
tracing::info!(?client_id, "new client");

// Spawn a new client workflow
ctx.workflow(pegboard::workflows::client::Input { client_id })
.tag("client_id", client_id)
Expand Down Expand Up @@ -334,7 +340,14 @@ async fn msg_thread(ctx: &StandaloneCtx, conns: Arc<RwLock<Connections>>) -> Glo
}
}

fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> {
#[derive(Clone, Copy)]
struct UrlData {
protocol_version: u16,
client_id: Uuid,
datacenter_id: Uuid,
}

fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<UrlData> {
let url = url::Url::parse(&format!("ws://{addr}{uri}"))?;

// Get protocol version from last path segment
Expand All @@ -345,13 +358,24 @@ fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> {
ensure!(last_segment.starts_with('v'), "invalid protocol version");
let protocol_version = last_segment[1..].parse::<u16>()?;

// Read client_id from query parameters
// Read client_id and datacenter_id from query parameters
let client_id = unwrap!(
url.query_pairs()
.find_map(|(n, v)| (n == "client_id").then_some(v)),
"missing `client_id` query parameter"
);
let client_id = util::uuid::parse(client_id.as_ref())?;

Ok((protocol_version, client_id))
let datacenter_id = unwrap!(
url.query_pairs()
.find_map(|(n, v)| (n == "datacenter_id").then_some(v)),
"missing `datacenter_id` query parameter"
);
let datacenter_id = util::uuid::parse(datacenter_id.as_ref())?;

Ok(UrlData {
protocol_version,
client_id,
datacenter_id,
})
}

0 comments on commit 572ed09

Please sign in to comment.