Skip to content

Commit

Permalink
fix(ds): cache traefik routes
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 23, 2024
1 parent eb0223c commit 2849f6f
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 55 deletions.
78 changes: 36 additions & 42 deletions svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,43 @@ pub async fn build_ds(
dc_id: Uuid,
config: &mut types::TraefikConfigResponse,
) -> GlobalResult<()> {
// TODO put in function, clean up
// TODO: remove cache for now
let dynamic_servers = ctx
.cache()
.ttl(60)
.fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| async move {
let rows = sql_fetch_all!(
[ctx, DynamicServer]
"
SELECT
s.server_id,
s.datacenter_id,
ip.nomad_label AS label,
ip.nomad_ip,
ip.nomad_source,
gg.port_number,
gg.gg_port,
gg.port_name,
gg.protocol
FROM db_ds.internal_ports AS ip
JOIN db_ds.servers AS s
ON ip.server_id = s.server_id
JOIN db_ds.docker_ports_protocol_game_guard AS gg
ON
ip.server_id = gg.server_id AND
ip.nomad_label = CONCAT('ds_', gg.port_name)
WHERE
s.datacenter_id = $1 AND
s.destroy_ts IS NULL
",
dc_id
)
.await?;
cache.resolve(&dc_id, rows);

// let dynamic_servers: Option<Vec<DynamicServer>> = ctx
// .cache()
// // TODO: Set this for longer, this should mean that no caching happens
// .ttl(1)
// .fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| {
// let ctx = ctx.clone();
// async move {
let dynamic_servers = sql_fetch_all!(
[ctx, DynamicServer]
"
SELECT
s.server_id,
s.datacenter_id,
ip.nomad_label AS label,
ip.nomad_ip,
ip.nomad_source,
gg.port_number,
gg.gg_port,
gg.port_name,
gg.protocol
FROM db_ds.internal_ports AS ip
JOIN db_ds.servers AS s
ON ip.server_id = s.server_id
JOIN db_ds.docker_ports_protocol_game_guard AS gg
ON
ip.server_id = gg.server_id AND
ip.nomad_label = CONCAT('ds_', gg.port_name)
WHERE
s.datacenter_id = $1 AND
s.stop_ts IS NULL
",
dc_id
)
.await?;
// cache.resolve(&dc_id, rows);

// Ok(cache)
// }
// })
// .await?;
Ok(cache)
})
.await?
.unwrap_or_default();

// Process proxied ports
for dynamic_server in &dynamic_servers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ STORING (
nomad_node_vlan_ipv4
);
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;

CREATE INDEX ON servers (datacenter_id, stop_ts)
STORING (datacenter_id, kill_timeout_ms);
18 changes: 9 additions & 9 deletions svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@ struct PlanResult {
#[derive(Debug, sqlx::FromRow)]
struct RunRow {
server_id: Uuid,
datacenter_id: Uuid,
connectable_ts: Option<i64>,
nomad_alloc_plan_ts: Option<i64>, // this was nomad_plan_ts
}

// #[derive(Debug, sqlx::FromRow)]
// struct ProxiedPort {
// target_nomad_port_label: Option<String>,
// ingress_port: i64,
// ingress_hostnames: Vec<String>,
// proxy_protocol: i64,
// ssl_domain_mode: i64,
// }

#[derive(Clone)]
struct RunData {
job_id: String,
Expand Down Expand Up @@ -160,6 +152,7 @@ async fn update_db(
"
SELECT
s.server_id,
s.datacenter_id,
s.connectable_ts,
s.stop_ts,
sn.nomad_alloc_plan_ts
Expand Down Expand Up @@ -248,6 +241,13 @@ async fn update_db(
)
.await?;
}

// Invalidate cache when ports are updated
if !ports.is_empty() {
ctx.cache()
.purge("servers_ports", [run_row.datacenter_id])
.await?;
}
}

Ok(Some(DbOutput { server_id }))
Expand Down
2 changes: 2 additions & 0 deletions svc/pkg/ds/src/workers/nomad_monitor_alloc_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ async fn worker(
}
};

tracing::info!("run pending");

crate::workers::webhook::call(ctx, alloc_id.to_string()).await?;

Ok(())
Expand Down
15 changes: 11 additions & 4 deletions svc/pkg/ds/src/workflows/server/destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ struct UpdateDbInput {

#[derive(Debug, Serialize, Deserialize, Hash, sqlx::FromRow)]
struct UpdateDbOutput {
server_id: Uuid,
datacenter_id: Uuid,
kill_timeout_ms: i64,
dispatched_job_id: Option<String>,
Expand All @@ -84,7 +83,7 @@ struct UpdateDbOutput {
#[activity(UpdateDb)]
async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<UpdateDbOutput> {
// Run in transaction for internal retryability
rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let db_output = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.clone();
let server_id = input.server_id;

Expand All @@ -102,7 +101,6 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
s1.server_id = s2.server_id AND
s2.destroy_ts IS NULL
RETURNING
s1.server_id,
s1.datacenter_id,
s1.kill_timeout_ms,
sn.nomad_dispatched_job_id AS dispatched_job_id,
Expand All @@ -115,7 +113,16 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
}
.boxed()
})
.await
.await?;

// NOTE: This call is infallible because redis is infallible. If it was not, it would be put in its own
// workflow step
// Invalidate cache when server is destroyed
ctx.cache()
.purge("servers_ports", [db_output.datacenter_id])
.await?;

Ok(db_output)
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
7 changes: 7 additions & 0 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
})
.await?;

// NOTE: This call is infallible because redis is infallible. If it was not, it would be put in its own
// workflow step
// Invalidate cache when new server is created
ctx.cache()
.purge("servers_ports", [input.datacenter_id])
.await?;

Ok(())
}

Expand Down

0 comments on commit 2849f6f

Please sign in to comment.