Skip to content

Commit

Permalink
refactor(ds): wait for ds poll events before marking actor as ready
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Dec 17, 2024
1 parent 3e224b1 commit 9d2f8ad
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 109 deletions.
1 change: 1 addition & 0 deletions docker/dev-full/rivet-guard/traefik.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ log:
level: INFO
accessLog: {}
providers:
providersThrottleDuration: 0.025s
http:
endpoint: >-
http://rivet-server:8081/traefik-provider/config/game-guard?datacenter=f288913c-735d-4188-bf9b-2fcf6eac7b9c
Expand Down
1 change: 1 addition & 0 deletions docker/monolith/rivet-guard/traefik.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ log:
level: INFO
accessLog: {}
providers:
providersThrottleDuration: 0.025s
http:
endpoint: >-
http://rivet-server:8081/traefik-provider/config/game-guard?datacenter=f288913c-735d-4188-bf9b-2fcf6eac7b9c
Expand Down
171 changes: 100 additions & 71 deletions examples/system-test/ws_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,117 @@ const client = new RivetClient({
token: RIVET_SERVICE_TOKEN,
});

let actorId: string | undefined;
try {

console.log("Creating actor");
const { actor } = await client.actor.create({
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
body: {
tags: {
name: "ws",
},
buildTags: { name: "ws", current: "true" },
network: {
ports: {
http: {
protocol: "https",
// internalPort: 80,
routing: {
guard: {},
async function run() {
let actorId: string | undefined;
try {
console.log("Creating actor");
const { actor } = await client.actor.create({
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
body: {
tags: {
name: "ws",
},
buildTags: { name: "ws", current: "true" },
network: {
ports: {
http: {
protocol: "https",
// internalPort: 80,
routing: {
guard: {},
},
},
},
},
lifecycle: {
durable: false,
},
},
lifecycle: {
durable: false,
},
},
});
actorId = actor.id;
});
actorId = actor.id;

const port = actor.network.ports.http;
assertExists(port, "missing port http");
const actorOrigin =
`${port.protocol}://${port.hostname}:${port.port}${port.path ?? ""}`;
console.log("Created actor at", actorOrigin);
const port = actor.network.ports.http;
assertExists(port, "missing port http");
const actorOrigin = `${port.protocol}://${port.hostname}:${port.port}${port.path ?? ""}`;
console.log("Created actor at", actorOrigin);

// Check HTTP health of service
const response = await fetch(`${actorOrigin}/health`);
if (!response.ok) {
throw new Error(`Health check failed with status: ${response.status}`);
}
console.log("Health check passed");
//await new Promise((resolve) => setTimeout(resolve, 300));

await new Promise((resolve, reject) => {
// Open a WebSocket to that endpoint
const ws = new WebSocket(`${actorOrigin}/ws`);
//console.time('ready');
//while (true) {
// // Check HTTP health of service
// const response = await fetch(`${actorOrigin}/health`);
// if (!response.ok) {
// await new Promise(resolve => setTimeout(resolve, 100));
// continue;
// }
// console.timeEnd('ready');
// break;
//}

ws.onmessage = (evt) => {
const [type, body] = JSON.parse(evt.data);
if (type === "init") {
console.log("Init event data:", body);
} else if (type === "pong") {
console.log("Pong");
ws.close();
resolve(undefined);
} else {
console.warn("unknown message type", type);
}
};
// Check HTTP health of service
const response = await fetch(`${actorOrigin}/health`);
if (!response.ok) {
//throw new Error(`Health check failed with status: ${response.status}`);
console.error(`Health check failed with status: ${response.status}`);
Deno.exit(1);
}
console.log("Health check passed");

ws.onopen = () => {
console.log("Ping");
ws.send(JSON.stringify(["ping", 123]));
};
await new Promise((resolve, reject) => {
// Open a WebSocket to that endpoint
const ws = new WebSocket(`${actorOrigin}/ws`);

ws.onclose = () => {
console.log("WebSocket connection closed");
};
ws.onmessage = (evt) => {
const [type, body] = JSON.parse(evt.data);
if (type === "init") {
console.log("Init event data:", body);
} else if (type === "pong") {
console.log("Pong");
ws.close();
resolve(undefined);
} else {
console.warn("unknown message type", type);
}
};

ws.onerror = (err) => {
console.error("WS error", err);
reject("ws error");
};
});
} catch (error) {
console.error("Error:", error);
} finally {
if (actorId) {
console.log("Destroying", actorId);
await client.actor.destroy(actorId, {
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
ws.onopen = () => {
console.log("Ping");
ws.send(JSON.stringify(["ping", 123]));
};

ws.onclose = () => {
console.log("WebSocket connection closed");
};

ws.onerror = (err) => {
console.error("WS error", err);
reject("ws error");
};
});
} catch (error) {
console.error("Error:", error);
} finally {
if (actorId) {
console.log("Destroying", actorId);
await client.actor.destroy(actorId, {
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
});
}
}
}

async function runLoop() {
while (true) {
await run()
await new Promise((resolve) => setTimeout(resolve, Math.random() * 250));
}
}

for (let i = 0; i < 2; i++) {
await new Promise((resolve) => setTimeout(resolve, 100));
runLoop();
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{auth::Auth, types};
struct DynamicServerProxiedPort {
server_id: Uuid,
datacenter_id: Uuid,
create_ts: i64,
guard_public_hostname_dns_parent: Option<String>,
guard_public_hostname_static: Option<String>,

Expand All @@ -37,18 +38,20 @@ struct DynamicServerProxiedPort {
pub async fn build_ds(
ctx: &Ctx<Auth>,
dc_id: Uuid,
server_id: Option<Uuid>,
config: &mut types::TraefikConfigResponse,
) -> GlobalResult<()> {
) -> GlobalResult<Option<i64>> {
let proxied_ports = ctx
.cache()
.ttl(60_000)
.fetch_one_json("ds_proxied_ports", dc_id, |mut cache, dc_id| async move {
.fetch_one_json("ds_proxied_ports2", dc_id, |mut cache, dc_id| async move {
let rows = sql_fetch_all!(
[ctx, DynamicServerProxiedPort]
"
SELECT
s.server_id,
s.datacenter_id,
s.create_ts,
dc.guard_public_hostname_dns_parent,
dc.guard_public_hostname_static,
pp.label,
Expand Down Expand Up @@ -87,6 +90,8 @@ pub async fn build_ds(
.await?
.unwrap_or_default();

let latest_ds_create_ts = proxied_ports.iter().map(|pp| pp.create_ts).max();

config.http.middlewares.insert(
"ds-rate-limit".to_owned(),
types::TraefikMiddlewareHttp::RateLimit {
Expand Down Expand Up @@ -140,7 +145,7 @@ pub async fn build_ds(
"dynamic servers traefik config"
);

Ok(())
Ok(latest_ds_create_ts)
}

#[tracing::instrument(skip(config))]
Expand Down
48 changes: 31 additions & 17 deletions packages/api/traefik-provider/src/route/game_guard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,49 @@ pub mod job;
pub struct ConfigQuery {
token: Option<String>,
datacenter: Uuid,
server: Option<Uuid>,
}

#[tracing::instrument(skip(ctx))]
pub async fn config(
ctx: Ctx<Auth>,
_watch_index: WatchIndexQuery,
ConfigQuery { token, datacenter }: ConfigQuery,
ConfigQuery {
token,
datacenter,
server,
}: ConfigQuery,
) -> GlobalResult<types::TraefikConfigResponseNullified> {
ctx.auth().token(&token).await?;

let mut config = types::TraefikConfigResponse::default();

// Fetch configs and catch any errors

build_ds(&ctx, datacenter, &mut config).await?;
let mut config = types::TraefikConfigResponse::default();
build_job(&ctx, datacenter, &mut config).await?;
let latest_ds_create_ts = build_ds(&ctx, datacenter, server, &mut config).await?;

// Publish message when the request is complete
if let Some(latest_ds_create_ts) = latest_ds_create_ts {
ctx.msg(ds::workflows::server::pegboard::TraefikPoll {
server_id: server,
latest_ds_create_ts,
})
.tag("datacenter_id", datacenter)
.send()
.await?;
}

// tracing::info!(
// http_services = ?config.http.services.len(),
// http_routers = config.http.routers.len(),
// http_middlewares = ?config.http.middlewares.len(),
// tcp_services = ?config.tcp.services.len(),
// tcp_routers = config.tcp.routers.len(),
// tcp_middlewares = ?config.tcp.middlewares.len(),
// udp_services = ?config.udp.services.len(),
// udp_routers = config.udp.routers.len(),
// udp_middlewares = ?config.udp.middlewares.len(),
// "traefik config"
// );
tracing::debug!(
http_services = ?config.http.services.len(),
http_routers = config.http.routers.len(),
http_middlewares = ?config.http.middlewares.len(),
tcp_services = ?config.tcp.services.len(),
tcp_routers = config.tcp.routers.len(),
tcp_middlewares = ?config.tcp.middlewares.len(),
udp_services = ?config.udp.services.len(),
udp_routers = config.udp.routers.len(),
udp_middlewares = ?config.udp.middlewares.len(),
"traefik config"
);

Ok(types::TraefikConfigResponseNullified {
http: config.http.nullified(),
Expand Down
Loading

0 comments on commit 9d2f8ad

Please sign in to comment.