Skip to content

Commit

Permalink
feat: Add final run_once tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 5, 2024
1 parent da81dfc commit 9cc3cc9
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 8 deletions.
39 changes: 31 additions & 8 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,38 @@ impl Worker {
);

job_stream
.for_each_concurrent(self.concurrency, |job| async move {
let result = run_and_release_job(&job, self, &StreamSource::RunOnce).await;
match result {
Ok(_) => {
info!(job_id = job.id(), "Job processed");
}
Err(e) => {
error!("Error while processing job : {:?}", e);
.for_each_concurrent(self.concurrency, |mut job| async move {
loop {
let result = run_and_release_job(&job, self, &StreamSource::RunOnce).await;

match result {
Ok(_) => {
info!(job_id = job.id(), "Job processed");
}
Err(e) => {
error!("Error while processing job : {:?}", e);
}
};

// If the job has a queue, we need to fetch another job because the job_signal will not trigger
// Is there a simpler way to do this ?
if job.job_queue_id().is_none() {
break;
}
info!(job_id = job.id(), "Job has queue, fetching another job");
let new_job = get_job(
self.pg_pool(),
self.task_details(),
self.escaped_schema(),
self.worker_id(),
self.forbidden_flags(),
)
.await
.unwrap_or(None);
let Some(new_job) = new_job else {
break;
};
job = new_job;
}
})
.await;
Expand Down
99 changes: 99 additions & 0 deletions tests/run_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,3 +1189,102 @@ async fn single_worker_runs_jobs_in_series_purges_all_before_exit() {
})
.await;
}

#[tokio::test]
async fn jobs_added_to_the_same_queue_will_be_ran_serially_even_if_multiple_workers() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RXS: OnceCell<Mutex<VecDeque<oneshot::Receiver<()>>>> = OnceCell::const_new();
RXS.set(Mutex::new(VecDeque::new())).unwrap();
let mut txs = vec![];

#[derive(serde::Deserialize, serde::Serialize)]
struct Job3Args {
a: i32,
}

#[graphile_worker::task]
async fn job3(_args: Job3Args, _: WorkerContext) -> Result<(), ()> {
let rx = RXS.get().unwrap().lock().await.pop_front().unwrap(); // Obtain the receiver for the current job
rx.await.unwrap(); // Wait for the signal to complete the job
JOB3_CALL_COUNT.increment().await;
Ok(())
}

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job(job3)
.init()
.await
.expect("Failed to create worker");
let worker = Rc::new(worker);

// Schedule 5 jobs to the same queue
for _ in 1..=5 {
let (tx, rx) = oneshot::channel::<()>();
txs.push(tx);
RXS.get().unwrap().lock().await.push_back(rx);

worker
.create_utils()
.add_job::<job3>(
Job3Args { a: 1 },
Some(JobSpec {
queue_name: Some("serial".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");
}

// Start multiple worker instances to process the jobs
let mut handles = vec![];
for _ in 0..3 {
let worker = worker.clone();
handles.push(spawn_local(async move {
worker.run_once().await.expect("Failed to run worker");
}));
}

// Sequentially complete each job and verify progress
for i in 1..=5 {
// Give other workers a chance to interfere
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Complete the current job
txs.remove(0)
.send(())
.expect("Failed to send completion signal");

// Wait a brief moment to ensure the job is processed
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(
JOB3_CALL_COUNT.get().await,
i,
"Jobs should be processed serially"
);
}

// Wait for all worker instances to finish
for handle in handles {
handle.await.expect("Worker task failed");
}

// Verify all jobs were completed
assert_eq!(
JOB3_CALL_COUNT.get().await,
5,
"All jobs should have completed"
);

// Check that all jobs are purged from the database
let jobs_after = test_db.get_jobs().await;
assert_eq!(
jobs_after.len(),
0,
"All jobs should be removed after completion"
);
})
.await;
}

0 comments on commit 9cc3cc9

Please sign in to comment.