Skip to content

Commit

Permalink
chore: restructure server binaries
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 31, 2025
1 parent 724da15 commit 9b0bc5b
Show file tree
Hide file tree
Showing 37 changed files with 365 additions and 185 deletions.
85 changes: 74 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions Cargo.toml

Large diffs are not rendered by default.

30 changes: 17 additions & 13 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ impl Database for DatabaseFdbSqliteNats {
// Set up sqlite
let pulled_workflows = futures_util::stream::iter(partial_workflows)
.map(|partial| async move {
let pool = self.pools.sqlite(partial.workflow_id).await?;
let pool = self.pools.sqlite(db_name(partial.workflow_id)).await?;
sqlite::init(partial.workflow_id, &pool).await?;

// Fetch all events
Expand Down Expand Up @@ -1170,7 +1170,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<Option<SignalData>> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

let owned_filter = filter
.into_iter()
Expand Down Expand Up @@ -1840,7 +1840,7 @@ impl Database for DatabaseFdbSqliteNats {
body: &serde_json::value::RawValue,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -1901,7 +1901,7 @@ impl Database for DatabaseFdbSqliteNats {
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
if TAGGED_SIGNALS_ENABLED {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -1965,7 +1965,7 @@ impl Database for DatabaseFdbSqliteNats {
loop_location: Option<&Location>,
unique: bool,
) -> WorkflowResult<Uuid> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -2127,7 +2127,7 @@ impl Database for DatabaseFdbSqliteNats {
res: Result<&serde_json::value::RawValue, &str>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;
let input_hash = event_id.input_hash.to_be_bytes();

match res {
Expand Down Expand Up @@ -2233,7 +2233,7 @@ impl Database for DatabaseFdbSqliteNats {
body: &serde_json::value::RawValue,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2269,7 +2269,7 @@ impl Database for DatabaseFdbSqliteNats {
output: Option<&serde_json::value::RawValue>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

self.query(|| async {
// Attempt to use an existing connection
Expand Down Expand Up @@ -2443,7 +2443,7 @@ impl Database for DatabaseFdbSqliteNats {
deadline_ts: i64,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2474,7 +2474,7 @@ impl Database for DatabaseFdbSqliteNats {
location: &Location,
state: SleepState,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand All @@ -2501,7 +2501,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2532,7 +2532,7 @@ impl Database for DatabaseFdbSqliteNats {
event_name: Option<&str>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2563,7 +2563,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2594,3 +2594,7 @@ struct PartialWorkflow {
pub ray_id: Uuid,
pub input: Box<serde_json::value::RawValue>,
}

fn db_name(workflow_id: Uuid) -> String {
format!("{workflow_id}-internal")
}
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use crate::{
activity::Activity as ActivityTrait,
ctx::workflow::Loop,
ctx::*,
db,
db::{self, Database},
error::{WorkflowError, WorkflowResult},
executable::Executable,
history::removed::*,
Expand Down
1 change: 0 additions & 1 deletion packages/common/chirp-workflow/core/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use chirp_workflow::db::Database;
use chirp_workflow::prelude::*;
use serde_json::json;
use uuid::Uuid;
Expand Down
4 changes: 2 additions & 2 deletions packages/common/migrate/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub async fn up(config: rivet_config::Config, services: &[SqlService]) -> Result
tracing::info!(sql_services = ?services.len(), "running sql migrations");

let server_config = config.server.as_ref().context("missing server")?;
let is_development = server_config.rivet.auth.access_kind
== rivet_config::config::rivet::AccessKind::Development;
// let is_development = server_config.rivet.auth.access_kind
// == rivet_config::config::rivet::AccessKind::Development;

let crdb = rivet_pools::db::crdb::setup(config.clone())
.await
Expand Down
9 changes: 4 additions & 5 deletions packages/common/pools/src/db/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use sqlx::{
Sqlite,
};
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::Error;

Expand All @@ -16,7 +15,7 @@ pub type SqlitePool = sqlx::SqlitePool;
#[derive(Clone)]
pub struct SqlitePoolManager {
// TODO: Somehow remove old pools
pools: Arc<Mutex<HashMap<Uuid, SqlitePool>>>,
pools: Arc<Mutex<HashMap<String, SqlitePool>>>,
}

impl SqlitePoolManager {
Expand All @@ -27,10 +26,10 @@ impl SqlitePoolManager {
}

/// Get or creates an sqlite pool for the given key
pub async fn get(&self, key: Uuid) -> Result<SqlitePool, Error> {
pub async fn get(&self, key: &str) -> Result<SqlitePool, Error> {
let mut pools_guard = self.pools.lock().await;

let pool = if let Some(pool) = pools_guard.get(&key) {
let pool = if let Some(pool) = pools_guard.get(key) {
pool.clone()
} else {
// TODO: Hardcoded for testing
Expand Down Expand Up @@ -61,7 +60,7 @@ impl SqlitePoolManager {
// Run at the start of every connection
setup_pragma(&pool).await.map_err(Error::BuildSqlx)?;

pools_guard.insert(key, pool.clone());
pools_guard.insert(key.to_string(), pool.clone());

tracing::debug!(?key, "sqlite connected");

Expand Down
5 changes: 2 additions & 3 deletions packages/common/pools/src/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use global_error::{ensure_with, prelude::*, GlobalResult};
use rivet_config::Config;
use tokio_util::sync::{CancellationToken, DropGuard};
use uuid::Uuid;

use crate::{
db::sqlite::SqlitePoolManager, ClickHousePool, CrdbPool, Error, FdbPool, NatsPool, RedisPool,
Expand Down Expand Up @@ -168,8 +167,8 @@ impl Pools {
self.0.fdb.clone().ok_or(Error::MissingFdbPool)
}

pub async fn sqlite(&self, key: Uuid) -> Result<SqlitePool, Error> {
self.0.sqlite.get(key).await
pub async fn sqlite(&self, key: impl AsRef<str>) -> Result<SqlitePool, Error> {
self.0.sqlite.get(key.as_ref()).await
}

#[tracing::instrument(skip_all)]
Expand Down
Loading

0 comments on commit 9b0bc5b

Please sign in to comment.