Skip to content

Commit

Permalink
perf(lock): add rwlock tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Dec 27, 2023
1 parent 4d6f279 commit a035280
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async_job"
version = "0.1.3"
version = "0.1.4"
edition = "2021"
description = "Simple async cron job crate for Rust"
repository = "https://github.com/spider-rs/async_job"
Expand All @@ -17,7 +17,7 @@ chrono = "0.4.31"
cron = "0.12.0"
lazy_static = "1.4.0"
log = "0.4.20"
tokio = { version = "^1.35.0", features = [ "macros", "time", "parking_lot" ] }
tokio = { version = "^1.35.0", features = [ "macros", "time", "parking_lot", "sync" ] }

[features]
default = ["rt-multi-thread"]
Expand Down
45 changes: 33 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! use async_job::{Job, Runner, Schedule, async_trait};
//! use tokio::time::Duration;
//! use tokio;
//!
//!
//! struct ExampleJob;
//!
//! #[async_trait]
Expand Down Expand Up @@ -65,19 +65,19 @@ use chrono::{DateTime, Duration, Utc};
pub use cron::Schedule;
use lazy_static::lazy_static;
use log::{debug, error, info};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, Mutex,
Arc, RwLock,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;

lazy_static! {
/// Singleton instance of a tracker that won't allow
/// same job to run again while its already running
/// unless you specificly allow the job to run in
/// parallel with itself
pub static ref TRACKER: Mutex<Tracker> = Mutex::new(Tracker::new());
pub static ref TRACKER: RwLock<Tracker> = RwLock::new(Tracker::new());
}

#[async_trait]
Expand Down Expand Up @@ -187,7 +187,7 @@ pub struct Runner {
/// is the task running or not
pub running: bool,
/// channel sending message
pub tx: Option<mpsc::Sender<Result<(), ()>>>,
pub tx: Option<UnboundedSender<Result<(), ()>>>,
/// tracker to determine crons working
pub working: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -275,8 +275,14 @@ impl Runner {
async fn spawn(
runner: Runner,
working: Arc<AtomicBool>,
) -> (Option<JoinHandle<()>>, Option<Sender<Result<(), ()>>>) {
let (tx, rx): (Sender<Result<(), ()>>, Receiver<Result<(), ()>>) = mpsc::channel();
) -> (
Option<JoinHandle<()>>,
Option<UnboundedSender<Result<(), ()>>>,
) {
let (tx, mut rx): (
UnboundedSender<Result<(), ()>>,
UnboundedReceiver<Result<(), ()>>,
) = unbounded_channel();

let handler = tokio::spawn(async move {
let mut jobs = runner.jobs;
Expand All @@ -288,25 +294,40 @@ async fn spawn(
}

for (id, job) in jobs.iter_mut().enumerate() {
let no = (id + 1).to_string();
let no: String = (id + 1).to_string();

if job.should_run()
&& (job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id))
&& (job.allow_parallel_runs()
|| match TRACKER.read() {
Ok(s) => !s.running(&id),
_ => false,
})
{
TRACKER.lock().unwrap().start(&id);
match TRACKER.write() {
Ok(mut s) => {
s.start(&id);
}
_ => (),
}

let now = Utc::now();
debug!(
"START: {} --- {}",
format!("cron-job-thread-{}", no),
now.format("%H:%M:%S%.f")
);

working.store(true, Ordering::Relaxed);

// keep the work on the same task for now.
job.handle().await;

working.store(TRACKER.lock().unwrap().stop(&id) != 0, Ordering::Relaxed);
working.store(
match TRACKER.write() {
Ok(mut s) => s.stop(&id) != 0,
_ => false,
},
Ordering::Relaxed,
);

debug!(
"FINISH: {} --- {}",
Expand Down

0 comments on commit a035280

Please sign in to comment.