Skip to content

Commit

Permalink
examples & fix of async graph run
Browse files Browse the repository at this point in the history
Signed-off-by: A-Mavericks <363136637@qq.com>

Fix auto_node

example hello_dagrs & compute_dag

example hello_dagrs & compute_dag
  • Loading branch information
191220029 committed Dec 23, 2024
1 parent 889b6de commit 51d9564
Show file tree
Hide file tree
Showing 16 changed files with 438 additions and 176 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env_logger = "0.10.1"
async-trait = "0.1.83"
derive = { path = "derive", optional = true }
proc-macro2 = "1.0"
futures = "0.3.31"

[dev-dependencies]
simplelog = "0.12"
Expand Down
11 changes: 5 additions & 6 deletions derive/src/auto_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn auto_impl_node(
]);

quote::quote!(
#[async_trait::async_trait]
impl #generics dagrs::Node for #struct_ident #generics {
#impl_tokens
}
Expand Down Expand Up @@ -169,12 +170,10 @@ fn impl_run(
let in_channels_ident = &field_in_channels.ident;
let out_channels_ident = &field_out_channels.ident;
quote::quote!(
fn run(&mut self, env: std::sync::Arc<dagrs::EnvVar>) -> dagrs::Output {
tokio::runtime::Runtime::new().unwrap().block_on(async {
self.#ident
.run(&mut self.#in_channels_ident, &self.#out_channels_ident, env)
.await
})
async fn run(&mut self, env: std::sync::Arc<dagrs::EnvVar>) -> dagrs::Output {
self.#ident
.run(&mut self.#in_channels_ident, &self.#out_channels_ident, env)
.await
}
)
}
10 changes: 5 additions & 5 deletions derive/src/relay.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;

use proc_macro2::Ident;
use syn::{parse::Parse, Token};
Expand Down Expand Up @@ -77,16 +77,16 @@ pub(crate) fn add_relay(relaies: Relaies) -> proc_macro2::TokenStream {
}
for relay in relaies.0.iter() {
let task = relay.task.clone();
if (!cache.contains(&task)) {
if !cache.contains(&task) {
token.extend(quote::quote!(
graph.add_node(Box::new(#task));
graph.add_node(#task);
));
cache.insert(task);
}
for successor in relay.successors.iter() {
if (!cache.contains(successor)) {
if !cache.contains(successor) {
token.extend(quote::quote!(
graph.add_node(Box::new(#successor));
graph.add_node(#successor);
));
cache.insert(successor.clone());
}
Expand Down
9 changes: 8 additions & 1 deletion examples/auto_node.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! # Example: auto_node
//! The procedural macro `auto_node` simplifies the implementation of `Node` trait for custom types.
//! It works on structs except [tuple structs](https://doc.rust-lang.org/book/ch05-01-defining-structs.html#using-tuple-structs-without-named-fields-to-create-different-types).
use std::sync::Arc;

use dagrs::{auto_node, EmptyAction, EnvVar, InChannels, Node, NodeTable, OutChannels};
Expand All @@ -7,6 +11,7 @@ struct MyNode {/*Put customized fields here.*/}

#[auto_node]
struct _MyNodeGeneric<T, 'a> {
/*Put customized fields here.*/
my_field: Vec<T>,
my_name: &'a str,
}
Expand All @@ -30,7 +35,9 @@ fn main() {
assert_eq!(&s.id(), node_table.get(&node_name).unwrap());
assert_eq!(&s.name(), &node_name);

let output = s.run(Arc::new(EnvVar::new(NodeTable::default())));
let output = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { s.run(Arc::new(EnvVar::new(NodeTable::default()))).await });
match output {
dagrs::Output::Out(content) => assert!(content.is_none()),
_ => panic!(),
Expand Down
60 changes: 26 additions & 34 deletions examples/auto_relay.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
use std::sync::Arc;
//! # Example: auto_relay
//! The macro `dependencies!` simplifies the construction of a `Graph`,
//! including the addition of nodes and edges.
use dagrs::{
auto_node, dependencies,
graph::{self, graph::Graph},
EmptyAction, EnvVar, InChannels, Node, NodeTable, OutChannels,
};
use dagrs::{auto_node, dependencies, EmptyAction, InChannels, Node, NodeTable, OutChannels};

#[auto_node]
struct MyNode {/*Put customized fields here.*/}

impl MyNode {
fn new(name: &str, node_table: &mut NodeTable) -> Self {
Self {
id: node_table.alloc_id_for(name),
name: name.to_string(),
input_channels: InChannels::default(),
output_channels: OutChannels::default(),
action: Box::new(EmptyAction),
}
}
}

fn main() {
let mut node_table = NodeTable::default();

let node_name = "auto_node".to_string();

let s = MyNode {
id: node_table.alloc_id_for(&node_name),
name: node_name.clone(),
input_channels: InChannels::default(),
output_channels: OutChannels::default(),
action: Box::new(EmptyAction),
};

let a = MyNode {
id: node_table.alloc_id_for(&node_name),
name: node_name.clone(),
input_channels: InChannels::default(),
output_channels: OutChannels::default(),
action: Box::new(EmptyAction),
};

let b = MyNode {
id: node_table.alloc_id_for(&node_name),
name: node_name.clone(),
input_channels: InChannels::default(),
output_channels: OutChannels::default(),
action: Box::new(EmptyAction),
};
let mut g = dependencies!(s -> a b,
b -> a
let node_name = "auto_node";

let s = MyNode::new(node_name, &mut node_table);
let a = MyNode::new(node_name, &mut node_table);
let b = MyNode::new(node_name, &mut node_table);

let mut g = dependencies!(
s -> a b,
b -> a
);

g.run();
g.start();
}
106 changes: 106 additions & 0 deletions examples/compute_dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Only use Dag, execute a job. The graph is as follows:
//!
//! ↱----------↴
//! B -→ E --→ G
//! ↗ ↗ ↗
//! A --→ C /
//! ↘ ↘ /
//! D -→ F
//!
//! The final execution result is 272.
use std::sync::Arc;

use async_trait::async_trait;
use dagrs::{
Action, Content, DefaultNode, EnvVar, Graph, InChannels, Node, NodeTable, OutChannels, Output,
};

const BASE: &str = "base";

struct Compute(usize);

#[async_trait]
impl Action for Compute {
async fn run(
&self,
in_channels: &mut InChannels,
out_channels: &OutChannels,
env: Arc<EnvVar>,
) -> Output {
let base = env.get::<usize>(BASE).unwrap();
let mut sum = self.0;

in_channels
.map(|content| content.unwrap().into_inner::<usize>().unwrap())
.await
.into_iter()
.for_each(|x| sum += *x * base);

out_channels.broadcast(Content::new(sum)).await;

Output::Out(Some(Content::new(sum)))
}
}

fn main() {
// Initialization log.
env_logger::init();

// Create a new `NodeTable`.
let mut node_table = NodeTable::default();

// Generate some tasks.
let a = DefaultNode::with_action("Compute A".to_string(), Compute(1), &mut node_table);
let a_id = a.id();

let b = DefaultNode::with_action("Compute B".to_string(), Compute(2), &mut node_table);
let b_id = b.id();

let mut c = DefaultNode::new("Compute C".to_string(), &mut node_table);
c.set_action(Compute(4));
let c_id = c.id();

let mut d = DefaultNode::new("Compute D".to_string(), &mut node_table);
d.set_action(Compute(8));
let d_id = d.id();

let e = DefaultNode::with_action("Compute E".to_string(), Compute(16), &mut node_table);
let e_id = e.id();
let f = DefaultNode::with_action("Compute F".to_string(), Compute(32), &mut node_table);
let f_id = f.id();

let g = DefaultNode::with_action("Compute G".to_string(), Compute(64), &mut node_table);
let g_id = g.id();

// Create a graph.
let mut graph = Graph::new();
vec![a, b, c, d, e, f, g]
.into_iter()
.for_each(|node| graph.add_node(node));

// Set up task dependencies.
graph.add_edge(a_id, vec![b_id, c_id, d_id]);
graph.add_edge(b_id, vec![e_id, g_id]);
graph.add_edge(c_id, vec![e_id, f_id]);
graph.add_edge(d_id, vec![f_id]);
graph.add_edge(e_id, vec![g_id]);
graph.add_edge(f_id, vec![g_id]);

// Set a global environment variable for this dag.
let mut env = EnvVar::new(node_table);
env.set("base", 2usize);
graph.set_env(env);

// Start executing this dag.
graph.start();

// Verify execution result.
let res = graph
.get_results::<usize>()
.get(&g_id)
.unwrap()
.clone()
.unwrap();
assert_eq!(*res, 272)
}
75 changes: 75 additions & 0 deletions examples/custom_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! # Example: custom_node
//! Creates a custom implementation of [`Node`] that returns a [`String`],
//! then create a new [`Graph`] with this node and run.
use std::sync::Arc;

use async_trait::async_trait;
use dagrs::{
Content, EnvVar, Graph, InChannels, Node, NodeId, NodeName, NodeTable, OutChannels, Output,
};

struct MessageNode {
id: NodeId,
name: NodeName,
in_channels: InChannels,
out_channels: OutChannels,
/*Put your custom fields here.*/
message: String,
}

#[async_trait]
impl Node for MessageNode {
fn id(&self) -> NodeId {
self.id
}

fn name(&self) -> NodeName {
self.name.clone()
}

fn input_channels(&mut self) -> &mut InChannels {
&mut self.in_channels
}

fn output_channels(&mut self) -> &mut OutChannels {
&mut self.out_channels
}

async fn run(&mut self, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new(self.message.clone())))
}
}

impl MessageNode {
fn new(name: String, node_table: &mut NodeTable) -> Self {
Self {
id: node_table.alloc_id_for(&name),
name,
in_channels: InChannels::default(),
out_channels: OutChannels::default(),
message: "hello dagrs".to_string(),
}
}
}

fn main() {
// create an empty `NodeTable`
let mut node_table = NodeTable::new();
// create a `MessageNode`
let node = MessageNode::new("message node".to_string(), &mut node_table);
let id: &dagrs::NodeId = &node.id();

// create a graph with this node and run
let mut graph = Graph::new();
graph.add_node(node);
graph.start();

// verify the output of this node
let outputs = graph.get_outputs();
assert_eq!(outputs.len(), 1);

let content = outputs.get(id).unwrap().get_out().unwrap();
let node_output = content.get::<String>().unwrap();
assert_eq!(node_output, "hello dagrs")
}
46 changes: 46 additions & 0 deletions examples/hello_dagrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! # Example: hello_dagrs
//! Creates a `DefaultNode` that returns with "Hello Dagrs",
//! then create a new `Graph` with this node and run.
use std::sync::Arc;

use async_trait::async_trait;
use dagrs::{
Action, Content, DefaultNode, EnvVar, Graph, InChannels, Node, NodeTable, OutChannels, Output,
};

/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
#[derive(Default)]
pub struct HelloAction;
#[async_trait]
impl Action for HelloAction {
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new("Hello Dagrs".to_string())))
}
}

fn main() {
// create an empty `NodeTable`
let mut node_table = NodeTable::new();
// create a `DefaultNode` with action `HelloAction`
let hello_node = DefaultNode::with_action(
"Hello Dagrs".to_string(),
HelloAction::default(),
&mut node_table,
);
let id: &dagrs::NodeId = &hello_node.id();

// create a graph with this node and run
let mut graph = Graph::new();
graph.add_node(hello_node);

graph.start();

// verify the output of this node
let outputs = graph.get_outputs();
assert_eq!(outputs.len(), 1);

let content = outputs.get(id).unwrap().get_out().unwrap();
let node_output = content.get::<String>().unwrap();
assert_eq!(node_output, "Hello Dagrs")
}
Loading

0 comments on commit 51d9564

Please sign in to comment.