Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ds): wait for ds poll events before marking actor as ready #1644

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/dev-full/rivet-guard/traefik.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ log:
level: INFO
accessLog: {}
providers:
providersThrottleDuration: 0.025s
http:
endpoint: >-
http://rivet-server:8081/traefik-provider/config/game-guard?datacenter=f288913c-735d-4188-bf9b-2fcf6eac7b9c
Expand Down
1 change: 1 addition & 0 deletions docker/monolith/rivet-guard/traefik.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ log:
level: INFO
accessLog: {}
providers:
providersThrottleDuration: 0.025s
http:
endpoint: >-
http://rivet-server:8081/traefik-provider/config/game-guard?datacenter=f288913c-735d-4188-bf9b-2fcf6eac7b9c
Expand Down
171 changes: 100 additions & 71 deletions examples/system-test/ws_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,117 @@ const client = new RivetClient({
token: RIVET_SERVICE_TOKEN,
});

let actorId: string | undefined;
try {

console.log("Creating actor");
const { actor } = await client.actor.create({
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
body: {
tags: {
name: "ws",
},
buildTags: { name: "ws", current: "true" },
network: {
ports: {
http: {
protocol: "https",
// internalPort: 80,
routing: {
guard: {},
async function run() {
let actorId: string | undefined;
try {
console.log("Creating actor");
const { actor } = await client.actor.create({
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
body: {
tags: {
name: "ws",
},
buildTags: { name: "ws", current: "true" },
network: {
ports: {
http: {
protocol: "https",
// internalPort: 80,
routing: {
guard: {},
},
},
},
},
lifecycle: {
durable: false,
},
},
lifecycle: {
durable: false,
},
},
});
actorId = actor.id;
});
actorId = actor.id;

const port = actor.network.ports.http;
assertExists(port, "missing port http");
const actorOrigin =
`${port.protocol}://${port.hostname}:${port.port}${port.path ?? ""}`;
console.log("Created actor at", actorOrigin);
const port = actor.network.ports.http;
assertExists(port, "missing port http");
const actorOrigin = `${port.protocol}://${port.hostname}:${port.port}${port.path ?? ""}`;
console.log("Created actor at", actorOrigin);

// Check HTTP health of service
const response = await fetch(`${actorOrigin}/health`);
if (!response.ok) {
throw new Error(`Health check failed with status: ${response.status}`);
}
console.log("Health check passed");
//await new Promise((resolve) => setTimeout(resolve, 300));

await new Promise((resolve, reject) => {
// Open a WebSocket to that endpoint
const ws = new WebSocket(`${actorOrigin}/ws`);
//console.time(`ready-${actorId}`);
//while (true) {
// // Check HTTP health of service
// const response = await fetch(`${actorOrigin}/health`);
// if (!response.ok) {
// await new Promise(resolve => setTimeout(resolve, 100));
// continue;
// }
// console.timeEnd(`ready-${actorId}`);
// break;
//}

ws.onmessage = (evt) => {
const [type, body] = JSON.parse(evt.data);
if (type === "init") {
console.log("Init event data:", body);
} else if (type === "pong") {
console.log("Pong");
ws.close();
resolve(undefined);
} else {
console.warn("unknown message type", type);
}
};
// Check HTTP health of service
const response = await fetch(`${actorOrigin}/health`);
if (!response.ok) {
//throw new Error(`Health check failed with status: ${response.status}`);
console.error(`Health check failed with status: ${response.status}`);
Deno.exit(1);
}
console.log("Health check passed");

ws.onopen = () => {
console.log("Ping");
ws.send(JSON.stringify(["ping", 123]));
};
await new Promise((resolve, reject) => {
// Open a WebSocket to that endpoint
const ws = new WebSocket(`${actorOrigin}/ws`);

ws.onclose = () => {
console.log("WebSocket connection closed");
};
ws.onmessage = (evt) => {
const [type, body] = JSON.parse(evt.data);
if (type === "init") {
console.log("Init event data:", body);
} else if (type === "pong") {
console.log("Pong");
ws.close();
resolve(undefined);
} else {
console.warn("unknown message type", type);
}
};

ws.onerror = (err) => {
console.error("WS error", err);
reject("ws error");
};
});
} catch (error) {
console.error("Error:", error);
} finally {
if (actorId) {
console.log("Destroying", actorId);
await client.actor.destroy(actorId, {
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
ws.onopen = () => {
console.log("Ping");
ws.send(JSON.stringify(["ping", 123]));
};

ws.onclose = () => {
console.log("WebSocket connection closed");
};

ws.onerror = (err) => {
console.error("WS error", err);
reject("ws error");
};
});
} catch (error) {
console.error("Error:", error);
} finally {
if (actorId) {
console.log("Destroying", actorId);
await client.actor.destroy(actorId, {
project: RIVET_PROJECT,
environment: RIVET_ENVIRONMENT,
});
}
}
}

async function runLoop() {
while (true) {
await run()
await new Promise((resolve) => setTimeout(resolve, Math.random() * 250));
}
}

for (let i = 0; i < 2; i++) {
await new Promise((resolve) => setTimeout(resolve, 100));
runLoop();
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{auth::Auth, types};
struct DynamicServerProxiedPort {
server_id: Uuid,
datacenter_id: Uuid,
create_ts: i64,
guard_public_hostname_dns_parent: Option<String>,
guard_public_hostname_static: Option<String>,

Expand All @@ -37,18 +38,20 @@ struct DynamicServerProxiedPort {
pub async fn build_ds(
ctx: &Ctx<Auth>,
dc_id: Uuid,
server_id: Option<Uuid>,
config: &mut types::TraefikConfigResponse,
) -> GlobalResult<()> {
) -> GlobalResult<Option<i64>> {
let proxied_ports = ctx
.cache()
.ttl(60_000)
.fetch_one_json("ds_proxied_ports", dc_id, |mut cache, dc_id| async move {
.fetch_one_json("ds_proxied_ports2", dc_id, |mut cache, dc_id| async move {
let rows = sql_fetch_all!(
[ctx, DynamicServerProxiedPort]
"
SELECT
s.server_id,
s.datacenter_id,
s.create_ts,
dc.guard_public_hostname_dns_parent,
dc.guard_public_hostname_static,
pp.label,
Expand Down Expand Up @@ -87,6 +90,8 @@ pub async fn build_ds(
.await?
.unwrap_or_default();

let latest_ds_create_ts = proxied_ports.iter().map(|pp| pp.create_ts).max();

config.http.middlewares.insert(
"ds-rate-limit".to_owned(),
types::TraefikMiddlewareHttp::RateLimit {
Expand Down Expand Up @@ -140,7 +145,7 @@ pub async fn build_ds(
"dynamic servers traefik config"
);

Ok(())
Ok(latest_ds_create_ts)
}

#[tracing::instrument(skip(config))]
Expand Down
48 changes: 31 additions & 17 deletions packages/api/traefik-provider/src/route/game_guard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,49 @@ pub mod job;
pub struct ConfigQuery {
token: Option<String>,
datacenter: Uuid,
server: Option<Uuid>,
}

#[tracing::instrument(skip(ctx))]
pub async fn config(
ctx: Ctx<Auth>,
_watch_index: WatchIndexQuery,
ConfigQuery { token, datacenter }: ConfigQuery,
ConfigQuery {
token,
datacenter,
server,
}: ConfigQuery,
) -> GlobalResult<types::TraefikConfigResponseNullified> {
ctx.auth().token(&token).await?;

let mut config = types::TraefikConfigResponse::default();

// Fetch configs and catch any errors

build_ds(&ctx, datacenter, &mut config).await?;
let mut config = types::TraefikConfigResponse::default();
build_job(&ctx, datacenter, &mut config).await?;
let latest_ds_create_ts = build_ds(&ctx, datacenter, server, &mut config).await?;

// Publish message when the request is complete
if let Some(latest_ds_create_ts) = latest_ds_create_ts {
ctx.msg(ds::workflows::server::pegboard::TraefikPoll {
server_id: server,
latest_ds_create_ts,
})
.tag("datacenter_id", datacenter)
.send()
.await?;
}

// tracing::info!(
// http_services = ?config.http.services.len(),
// http_routers = config.http.routers.len(),
// http_middlewares = ?config.http.middlewares.len(),
// tcp_services = ?config.tcp.services.len(),
// tcp_routers = config.tcp.routers.len(),
// tcp_middlewares = ?config.tcp.middlewares.len(),
// udp_services = ?config.udp.services.len(),
// udp_routers = config.udp.routers.len(),
// udp_middlewares = ?config.udp.middlewares.len(),
// "traefik config"
// );
tracing::debug!(
http_services = ?config.http.services.len(),
http_routers = config.http.routers.len(),
http_middlewares = ?config.http.middlewares.len(),
tcp_services = ?config.tcp.services.len(),
tcp_routers = config.tcp.routers.len(),
tcp_middlewares = ?config.tcp.middlewares.len(),
udp_services = ?config.udp.services.len(),
udp_routers = config.udp.routers.len(),
udp_middlewares = ?config.udp.middlewares.len(),
"traefik config"
);

Ok(types::TraefikConfigResponseNullified {
http: config.http.nullified(),
Expand Down
Loading
Loading