Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflows): add limit to pulling workflows #1020

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::{
event::combine_events,
};

/// Max amount of workflows pulled from the database with each call to `pull_workflows`.
const MAX_PULLED_WORKFLOWS: i64 = 10;
/// Maximum times a query ran bu this database adapter is retried.
const MAX_QUERY_RETRIES: usize = 16;

pub struct DatabasePostgres {
Expand Down Expand Up @@ -143,7 +146,6 @@ impl Database for DatabasePostgres {
worker_instance_id: Uuid,
filter: &[&str],
) -> WorkflowResult<Vec<PulledWorkflow>> {
// TODO(RVT-3753): include limit on query to allow better workflow spread between nodes?
// Select all workflows that haven't started or that have a wake condition
let workflow_rows = sqlx::query_as::<_, PulledWorkflowRow>(indoc!(
"
Expand Down Expand Up @@ -194,6 +196,7 @@ impl Database for DatabasePostgres {
output IS NOT NULL
)
)
LIMIT $4
RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts
),
-- Update last ping
Expand All @@ -208,6 +211,7 @@ impl Database for DatabasePostgres {
.bind(worker_instance_id)
.bind(filter)
.bind(rivet_util::timestamp::now())
.bind(MAX_PULLED_WORKFLOWS)
.fetch_all(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down
Loading