Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax the lifetime constraints on the actions #36

Merged
merged 5 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Cargo.lock

# IDE
.idea
.vscode
/.vscode/*
!/.vscode/settings.json

# Exclude execute log
/*.log
/*.log
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.cargo.features": "all"
}
61 changes: 33 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ Users need to program to implement the `Action` trait to define the specific log
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task


Here is the `examples/impl_action.rs` example:

```rust
//! Implement the Action trait to define the task logic.

use std::sync::Arc;
use dagrs::{
Action,
Dag, DefaultTask, EnvVar,log, Input, Output, RunningError,LogLevel
};
use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError};

struct SimpleAction(usize);

/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function.
/// The logic here is simply to get the output value (`usize`) of all predecessor tasks and then accumulate.
impl Action for SimpleAction{
fn run(&self, input: Input,env:Arc<EnvVar>) -> Result<Output,RunningError> {
/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate.
impl Action for SimpleAction {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output, RunningError> {
let base = env.get::<usize>("base").unwrap();
let mut sum = self.0;
input
Expand All @@ -76,27 +79,29 @@ impl Action for SimpleAction{
}
}

// Initialize the global logger
let _initialized = log::init_logger(LogLevel::Info,None);
// Generate four tasks.
let a= DefaultTask::new(SimpleAction(10),"Task a");
let mut b=DefaultTask::new(SimpleAction(20),"Task b");
let mut c=DefaultTask::new(SimpleAction(30),"Task c");
let mut d=DefaultTask::new(SimpleAction(40),"Task d");
// Set the precursor for each task.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b,&c]);
// Take these four tasks as a Dag.
let mut dag=Dag::with_tasks(vec![a,b,c,d]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(),220);
fn main() {
// Initialize the global logger
let _initialized = log::init_logger(LogLevel::Info, None);
// Generate four tasks.
let a = DefaultTask::new(SimpleAction(10), "Task a");
let mut b = DefaultTask::new(SimpleAction(20), "Task b");
let mut c = DefaultTask::new(SimpleAction(30), "Task c");
let mut d = DefaultTask::new(SimpleAction(40), "Task d");
// Set the precursor for each task.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b, &c]);
// Take these four tasks as a Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(), 220);
}
```

**explain:**
Expand Down
3 changes: 2 additions & 1 deletion dagrs_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "dagrs_core"
version = "0.3.0"
edition = "2021"
license = "MIT OR Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -21,4 +22,4 @@ yaml = []

[[bin]]
name = "dagrs"
required-features = ["yaml"]
required-features = ["yaml"]
8 changes: 4 additions & 4 deletions dagrs_core/src/engine/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Dag {
}

/// Create a dag by adding a series of tasks.
pub fn with_tasks(tasks: Vec<impl Task + 'static>) -> Dag {
pub fn with_tasks(tasks: Vec<impl Task + 'static>) -> Self {
let mut dag = Dag::new();
tasks.into_iter().for_each(|task| {
let task = Box::new(task) as Box<dyn Task>;
Expand All @@ -103,7 +103,7 @@ impl Dag {
#[cfg(feature = "yaml")]
pub fn with_yaml(
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
use crate::YamlParser;
let parser = Box::new(YamlParser);
Expand All @@ -114,7 +114,7 @@ impl Dag {
pub fn with_config_file_and_parser(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
Dag::read_tasks(file, parser, specific_actions)
}
Expand All @@ -124,7 +124,7 @@ impl Dag {
fn read_tasks(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
let mut dag = Dag::new();
let tasks = parser.parse_tasks(file, specific_actions)?;
Expand Down
2 changes: 1 addition & 1 deletion dagrs_core/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ pub trait Parser {
fn parse_tasks(
&self,
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Vec<Box<dyn Task>>, ParserError>;
}
6 changes: 3 additions & 3 deletions dagrs_core/src/parser/yaml_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl YamlParser {
&self,
id: &str,
item: &Yaml,
specific_action: Option<Arc<dyn Action + Send + Sync + 'static>>,
specific_action: Option<Arc<dyn Action + Send + Sync>>,
) -> Result<YamlTask, YamlTaskError> {
// Get name first
let name = item["name"]
Expand All @@ -63,7 +63,7 @@ impl YamlParser {
id,
precursors,
name,
Arc::new(CommandAction::new(cmd)) as Arc<dyn Action + Send + Sync + 'static>,
Arc::new(CommandAction::new(cmd)) as Arc<dyn Action + Send + Sync >,
))
}
}
Expand All @@ -73,7 +73,7 @@ impl Parser for YamlParser {
fn parse_tasks(
&self,
file: &str,
mut specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
mut specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Vec<Box<dyn Task>>, ParserError> {
let content = self.load_file(file)?;
// Parse Yaml
Expand Down
33 changes: 33 additions & 0 deletions dagrs_core/src/task/default_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,39 @@ impl DefaultTask {
}
}

/// Allocate a new [`DefaultTask`] from any action and name.
/// The specific task behavior is a structure that implements [`Action`].
///
/// # Example
///
/// ```rust
/// use dagrs::{DefaultTask, Output,Input, Action,EnvVar,RunningError};
/// use std::sync::Arc;
/// struct SimpleAction(usize);
///
/// impl Action for SimpleAction {
/// fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output,RunningError> {
/// Ok(Output::new(self.0 + 10))
/// }
/// }
///
/// let action = Arc::new(SimpleAction(10));
/// let task = DefaultTask::from(action, String::from("Increment action"));
/// ```
///
/// `SimpleAction` is a struct that impl [`Action`]. Since task will be
/// executed in separated threads, [`Send`] and [`Sync`] is needed.
///
/// **Note:** This method will take the ownership of struct that impl [`Action`].
pub fn from(action: Arc<dyn Action + Send + Sync>, name: String) -> Self {
DefaultTask {
id: ID_ALLOCATOR.alloc(),
action,
name,
precursors: Vec::new(),
}
}

/// Tasks that shall be executed before this one.
///
/// # Example
Expand Down
2 changes: 1 addition & 1 deletion dagrs_core/src/task/yaml_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl YamlTask {
yaml_id: &str,
precursors: Vec<String>,
name: String,
action: Arc<dyn Action + Send + Sync + 'static>,
action: Arc<dyn Action + Send + Sync>,
) -> Self {
Self {
yid: yaml_id.to_owned(),
Expand Down
1 change: 1 addition & 0 deletions dagrs_derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "dagrs_derive"
version = "0.3.0"
edition = "2021"
license = "MIT OR Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
12 changes: 6 additions & 6 deletions examples/custom_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ impl MyTask {
txt_id: &str,
precursors: Vec<String>,
name: String,
action: impl Action + Send + Sync + 'static,
action: Arc<dyn Action + Send + Sync>,
) -> Self {
Self {
tid: (txt_id.to_owned(), dagrs::alloc_id()),
name,
precursors,
precursors_id: Vec::new(),
action: Arc::new(action),
action,
}
}

Expand Down Expand Up @@ -92,19 +92,19 @@ impl ConfigParser {
}

fn parse_one(&self, item: String) -> MyTask {
let attr: Vec<&str> = item.split(",").collect();
let attr: Vec<&str> = item.split(',').collect();

let pres_item = *attr.get(2).unwrap();
let pres = if pres_item.eq("") {
Vec::new()
} else {
pres_item.split(" ").map(|pre| pre.to_string()).collect()
pres_item.split(' ').map(|pre| pre.to_string()).collect()
};

let id = *attr.get(0).unwrap();
let id = *attr.first().unwrap();
let name = attr.get(1).unwrap().to_string();
let cmd = *attr.get(3).unwrap();
MyTask::new(id, pres, name, CommandAction::new(cmd))
MyTask::new(id, pres, name, Arc::new(CommandAction::new(cmd)))
}
}

Expand Down
6 changes: 3 additions & 3 deletions examples/custom_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ struct MyTask {
}

impl MyTask {
pub fn new(action: impl Action + 'static + Send + Sync, name: &str) -> Self {
pub fn new(action: Arc<dyn Action + Send + Sync>, name: &str) -> Self {
MyTask {
id: alloc_id(),
action: Arc::new(action),
action,
name: name.to_owned(),
predecessor_tasks: Vec::new(),
}
Expand Down Expand Up @@ -59,7 +59,7 @@ macro_rules! generate_task {
Ok(Output::new(sum))
}
}
MyTask::new($action($val), $name)
MyTask::new(Arc::new($action($val)), $name)
}};
}

Expand Down
2 changes: 1 addition & 1 deletion examples/dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dagrs_core::{
log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError, Task,
log, DefaultTask, EnvVar, LogLevel,
};
use dagrs_derive::dependencies;

Expand Down
4 changes: 2 additions & 2 deletions examples/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

extern crate dagrs;

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap};

use dagrs::{
gen_task, log, Action, Dag, DefaultTask, Engine, EnvVar, Input, LogLevel, Output, RunningError,
gen_task, log, Dag, DefaultTask, Engine, LogLevel,
};
fn main() {
// initialization log.
Expand Down
3 changes: 2 additions & 1 deletion tests/dag_job_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Some tests of the dag engine.

use std::{collections::HashMap, sync::Arc};

///! Some tests of the dag engine.
use dagrs::{
log, Action, Dag, DagError, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError,
};
Expand Down