Skip to content

Commit

Permalink
fix: delegate more funcionality to dc-scale
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Apr 12, 2024
1 parent 8e3e6cb commit dbb7921
Show file tree
Hide file tree
Showing 16 changed files with 652 additions and 692 deletions.
3 changes: 3 additions & 0 deletions lib/pools/src/utils/crdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Sen
///
/// See
/// https://www.cockroachlabs.com/docs/v22.2/advanced-client-side-transaction-retries
///
/// **NOTE** The transaction will be rolled back if the future is cancelled. See
/// https://docs.rs/sqlx/0.7.4/sqlx/struct.Transaction.html
#[tracing::instrument(skip_all)]
pub async fn tx<T, F>(crdb: &CrdbPool, f: F) -> GlobalResult<T>
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ CREATE TABLE servers (
nomad_node_id TEXT,

create_ts INT NOT NULL,
install_complete_ts INT,
nomad_join_ts INT,
-- Null if not draining
drain_ts INT,
drain_complete_ts INT,
-- When the server was marked to be deleted by rivet
cloud_destroy_ts INT,
taint_ts INT
Expand Down
6 changes: 4 additions & 2 deletions svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::FutureExt;
use proto::backend::{self, pkg::*};
use rivet_operation::prelude::*;

Expand All @@ -21,7 +22,7 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
let updated_datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.base();

Box::pin(async move {
async move {
// Check for expired rows
let datacenters = sql_fetch_all!(
[ctx, (Uuid,), @tx tx]
Expand Down Expand Up @@ -58,7 +59,8 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
}

Ok(datacenters)
})
}
.boxed()
})
.await?;

Expand Down
97 changes: 23 additions & 74 deletions svc/pkg/cluster/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ use std::collections::HashMap;
use proto::backend::{self, pkg::*};
use rivet_operation::prelude::*;

#[derive(sqlx::FromRow)]
struct Server {
datacenter_id: Uuid,
server_id: Uuid,
drain_ts: i64,
}

#[tracing::instrument(skip_all)]
pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()> {
let client = chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("cluster-gc");
Expand All @@ -27,78 +20,34 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
);
let crdb = ctx.crdb().await?;

// Select all draining gg servers
let gg_servers = sql_fetch_all!(
[ctx, Server, &crdb]
// Update all draining gg and ats servers that have completed draining
let datacenter_rows = sql_fetch_all!(
[ctx, (Uuid,), &crdb]
"
SELECT datacenter_id, server_id, drain_ts
FROM db_cluster.servers
WHERE
pool_type = $1 AND
cloud_destroy_ts IS NULL AND
drain_ts IS NOT NULL
WITH updated AS (
UPDATE db_cluster.servers AS s
SET drain_complete_ts = $2
FROM db_cluster.datacenters AS d
WHERE
s.datacenter_id = d.datacenter_id AND
pool_type = ANY($1) AND
cloud_destroy_ts IS NULL AND
drain_ts IS NOT NULL AND
drain_ts < $2 - d.drain_timeout
RETURNING s.datacenter_id
)
SELECT DISTINCT datacenter_id
FROM updated
",
backend::cluster::PoolType::Gg as i64
)
.await?;

if gg_servers.is_empty() {
return Ok(());
}

let datacenters_res = op!([ctx] cluster_datacenter_get {
datacenter_ids: gg_servers
.iter()
.map(|server| server.datacenter_id.into())
.collect::<Vec<_>>(),
})
.await?;

// Collect into hashmap for better reads
let datacenters = datacenters_res
.datacenters
.iter()
.map(|dc| Ok((unwrap_ref!(dc.datacenter_id).as_uuid(), dc)))
.collect::<GlobalResult<HashMap<_, _>>>()?;

// Filter all gg servers that are finished draining
let destroy_server_ids = gg_servers
.iter()
.map(|server| {
let datacenter_config = unwrap!(datacenters.get(&server.datacenter_id));
let drain_cutoff = ts - datacenter_config.drain_timeout as i64;

Ok((server, drain_cutoff))
})
.filter_map(|res| match res {
Ok((server, drain_cutoff)) => {
if server.drain_ts < drain_cutoff {
Some(Ok(server.server_id))
} else {
None
}
}
Err(err) => Some(Err(err)),
})
.collect::<GlobalResult<Vec<_>>>()?;

// Mark as destroyed
sql_execute!(
[ctx, &crdb]
"
UPDATE db_cluster.servers
SET cloud_destroy_ts = $2
WHERE server_id = ANY($1)
",
&destroy_server_ids,
util::timestamp::now(),
&[backend::cluster::PoolType::Gg as i64, backend::cluster::PoolType::Ats as i64],
ts,
)
.await?;

for server_id in destroy_server_ids {
msg!([ctx] cluster::msg::server_destroy(server_id) {
server_id: Some(server_id.into()),
force: false,
// Scale
for (datacenter_id,) in datacenter_rows {
msg!([ctx] cluster::msg::datacenter_scale(datacenter_id) {
datacenter_id: Some(datacenter_id.into()),
})
.await?;
}
Expand Down
6 changes: 4 additions & 2 deletions svc/pkg/cluster/worker/src/workers/datacenter_create.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_worker::prelude::*;
use futures_util::FutureExt;
use proto::backend::{self, pkg::*};

#[worker(name = "cluster-datacenter-create")]
Expand Down Expand Up @@ -27,7 +28,7 @@ async fn worker(
let ctx = ctx.clone();
let pools_buf = pools_buf.clone();

Box::pin(async move {
async move {
sql_execute!(
[ctx, @tx tx]
"
Expand Down Expand Up @@ -77,7 +78,8 @@ async fn worker(
.await?;

Ok(())
})
}
.boxed()
})
.await?;

Expand Down
Loading

0 comments on commit dbb7921

Please sign in to comment.