Skip to content

Commit

Permalink
feat(ds, pb): move to edge
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 25, 2025
1 parent 003f3c7 commit 1023d01
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 15 deletions.
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ impl ActivityCtx {
self.conn.clickhouse().await
}

/// Access the SQLite database for this workflow. This cannot access any other database.
pub async fn sqlite(&self) -> Result<SqlitePool, rivet_pools::Error> {
self.conn.sqlite(format!("{}-data", self.workflow_id)).await
}

// Backwards compatibility
pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> {
&self.op_ctx
Expand Down
4 changes: 4 additions & 0 deletions packages/common/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl Connection {
self.pools.redis("ephemeral")
}

pub async fn sqlite(&self, key: impl AsRef<str>) -> Result<SqlitePool, rivet_pools::Error> {
self.pools.sqlite(key).await
}

pub fn perf(&self) -> &chirp_perf::PerfCtx {
self.client.perf()
}
Expand Down
1 change: 1 addition & 0 deletions packages/common/operation/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ where
self.conn.cache_handle()
}

/// Used by compat layer for chirp workflow.
pub fn pools(&self) -> &rivet_pools::Pools {
self.conn.pools()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use chirp_workflow::prelude::*;

use super::Input;

pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
pub async fn run(ctx: &mut WorkflowCtx) -> GlobalResult<()> {
ctx.activity(MigrateInitInput {}).await?;

Ok(())
Expand All @@ -12,15 +10,18 @@ pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
struct MigrateInitInput {}

#[activity(MigrateInit)]
async fn migrate_init(ctx: &ActivityCtx, &MigrateInitInput) -> GlobalResult<()> {
async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalResult<()> {
let pool = ctx.sqlite().await?;

sql_execute!(
[ctx]
[ctx, pool]
"
CREATE TABLE test (
)
",
)
.await
.map_err(Into::into)
.await?;

Ok(())
}
4 changes: 4 additions & 0 deletions packages/services/edge/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use chirp_workflow::prelude::*;

mod migrations;

#[derive(Debug, Serialize, Deserialize)]
pub struct Input {
pub actor_id: Uuid,
}

#[workflow]
pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
migrations::run(ctx).await?;

Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use chirp_workflow::prelude::*;

use super::Input;

pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
pub async fn run(ctx: &mut WorkflowCtx) -> GlobalResult<()> {
ctx.activity(MigrateInitInput {}).await?;

Ok(())
Expand All @@ -12,15 +10,18 @@ pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
struct MigrateInitInput {}

#[activity(MigrateInit)]
async fn migrate_init(ctx: &ActivityCtx, &MigrateInitInput) -> GlobalResult<()> {
async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalResult<()> {
let pool = ctx.sqlite().await?;

sql_execute!(
[ctx]
[ctx, pool]
"
CREATE TABLE test (
)
",
)
.await
.map_err(Into::into)
.await?;

Ok(())
}
28 changes: 27 additions & 1 deletion packages/services/edge/pegboard/src/workflows/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ pub struct Input {

#[workflow]
pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
migrations::run(ctx, input).await?;
migrations::run(ctx).await?;

// Whatever started this client should be listening for this
ctx.signal(Registered {})
.tag("client_id", input.client_id)
.send()
.await?;

ctx.repeat(|ctx| {
let client_id = input.client_id;
Expand Down Expand Up @@ -85,6 +91,17 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
Main::Command(command) => {
handle_commands(ctx, client_id, vec![command]).await?;
}
Main::PrewarmImage(sig) => {
ctx.msg(ToWs {
client_id,
inner: protocol::ToClient::PrewarmImage {
image_id: sig.image_id,
image_artifact_url_stub: sig.image_artifact_url_stub,
},
})
.send()
.await?;
}
Main::Drain(_) => {
ctx.activity(SetDrainInput {
client_id,
Expand Down Expand Up @@ -577,12 +594,20 @@ async fn fetch_all_actors(
Ok(actor_ids)
}

#[signal("pegboard_client_registered")]
pub struct Registered {}

#[message("pegboard_client_to_ws")]
pub struct ToWs {
pub client_id: Uuid,
pub inner: protocol::ToClient,
}

#[signal("pegboard_prewarm_image")]
pub struct PrewarmImage {
pub image_id: Uuid,
pub image_artifact_url_stub: String,
}
#[message("pegboard_client_close_ws")]
pub struct CloseWs {
pub client_id: Uuid,
Expand All @@ -606,6 +631,7 @@ join_signal!(Main {
Command(protocol::Command),
// Forwarded from the ws to this workflow
Forward(protocol::ToServer),
PrewarmImage,
Drain,
Undrain,
Destroy,
Expand Down

0 comments on commit 1023d01

Please sign in to comment.