Skip to content

Commit

Permalink
feat(workflows): add observe workflows fn (#901)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 18, 2024
1 parent 918038a commit 22a1ebd
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 43 deletions.
3 changes: 2 additions & 1 deletion lib/bolt/config/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ pub struct ImageResizing {}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct RivetBilling {
pub dynamic_servers_capacity_price_id: String,
pub indie_price_id: String,
pub studio_price_id: String,
}

#[derive(Serialize, Deserialize, Clone, Debug, Default)]
Expand Down
2 changes: 2 additions & 0 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ impl ServiceContextData {
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::Consumer { .. }
)
};

Expand Down
48 changes: 48 additions & 0 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ impl TestCtx {
Ok(signal_id)
}

/// Waits for a workflow to be triggered with a superset of given input. Strictly for tests.
pub fn observe<W: Workflow>(&self, input: serde_json::Value) -> GlobalResult<ObserveHandle> {
// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;

Ok(ObserveHandle {
db: self.db.clone(),
name: W::NAME,
input: input_val,
ts: rivet_util::timestamp::now(),
})
}

pub async fn op<I>(
&self,
input: I,
Expand Down Expand Up @@ -270,3 +285,36 @@ impl TestCtx {
&self.op_ctx
}
}

pub struct ObserveHandle {
db: DatabaseHandle,
name: &'static str,
input: serde_json::Value,
ts: i64,
}

impl ObserveHandle {
pub async fn next(&mut self) -> GlobalResult<Uuid> {
tracing::info!(name=%self.name, input=?self.input, "observing workflow");
tracing::info!(ts=%self.ts);

let (workflow_id, create_ts) = loop {
if let Some((workflow_id, create_ts)) = self
.db
.poll_workflow(self.name, &self.input, self.ts)
.await
.map_err(GlobalError::raw)?
{
break (workflow_id, create_ts);
}

tokio::time::sleep(Duration::from_millis(200)).await;
};

tracing::info!(name=%self.name, id=?workflow_id, "workflow found");

self.ts = create_ts + 1;

Ok(workflow_id)
}
}
11 changes: 9 additions & 2 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub trait Database: Send {
async fn dispatch_workflow(
&self,
ray_id: Uuid,
id: Uuid,
name: &str,
workflow_id: Uuid,
workflow_name: &str,
input: serde_json::Value,
) -> WorkflowResult<()>;
async fn get_workflow(&self, id: Uuid) -> WorkflowResult<Option<WorkflowRow>>;
Expand Down Expand Up @@ -75,6 +75,13 @@ pub trait Database: Send {
sub_workflow_name: &str,
input: serde_json::Value,
) -> WorkflowResult<()>;

async fn poll_workflow(
&self,
name: &str,
input: &serde_json::Value,
after_ts: i64,
) -> WorkflowResult<Option<(Uuid, i64)>>;
}

#[derive(sqlx::FromRow)]
Expand Down
25 changes: 25 additions & 0 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,29 @@ impl Database for DatabasePostgres {

Ok(())
}

async fn poll_workflow(
&self,
workflow_name: &str,
input: &serde_json::Value,
after_ts: i64,
) -> WorkflowResult<Option<(Uuid, i64)>> {
sqlx::query_as::<_, (Uuid, i64)>(indoc!(
"
SELECT workflow_id, create_ts
FROM db_workflow.workflows
WHERE
workflow_name = $1 AND
-- Subset
input @> $2 AND
create_ts >= $3
",
))
.bind(workflow_name)
.bind(input)
.bind(after_ts)
.fetch_optional(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)
}
}
16 changes: 0 additions & 16 deletions proto/backend/billing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package rivet.backend.billing;

import "proto/common.proto";
import "proto/backend/billing/game_plan.proto";

message GameLobbyMetrics {
rivet.common.Uuid game_id = 1;
Expand All @@ -17,18 +16,3 @@ message RegionTierMetrics {
string lobby_group_name_id = 5;
int64 uptime = 4; // in seconds
}

message Team {
rivet.common.Uuid team_id = 1;
string stripe_customer_id = 2;
optional int64 payment_method_attached_ts = 3;
optional int64 payment_method_valid_ts = 4;
optional int64 payment_failed_ts = 5;
optional int64 payment_succeeded_ts = 6;
}

message Game {
rivet.common.Uuid game_id = 1;
rivet.backend.billing.game_plan.GamePlan plan = 2;
}

15 changes: 0 additions & 15 deletions proto/backend/billing/game_plan.proto

This file was deleted.

7 changes: 0 additions & 7 deletions svc/pkg/foo/worker/src/workflows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,5 @@ async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult<FooOutput> {
.map(|(id,)| id)
.collect();

let user_id = util::uuid::parse("000b3124-91d9-472e-8104-3dcc41e1a74d")?;
let user_get_res = op!([ctx] user_get {
user_ids: vec![user_id.into()],
})
.await?;
let user = unwrap!(user_get_res.users.first());

Ok(FooOutput { ids })
}
8 changes: 6 additions & 2 deletions svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ async fn handle(
.collect::<Vec<common::Uuid>>();

if query_ids.len() as isize == MAX_COUNT {
tracing::warn!("too many find queries, short circuiting to prevent bad things from happening");
return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() })
tracing::warn!(
"too many find queries, short circuiting to prevent bad things from happening"
);
return Ok(mm::lobby_find_lobby_query_list::Response {
query_ids: Vec::new(),
});
}

Ok(mm::lobby_find_lobby_query_list::Response { query_ids })
Expand Down

0 comments on commit 22a1ebd

Please sign in to comment.