diff --git a/Cargo.lock b/Cargo.lock index daa608e..fc36a23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "async-trait" version = "0.1.73" @@ -107,11 +113,14 @@ dependencies = [ name = "edgelink" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "ciborium", "edgelink_abstractions", "inventory", "log", + "serde", + "serde_json", "tokio", ] @@ -120,6 +129,7 @@ name = "edgelink_abstractions" version = "0.1.0" dependencies = [ "async-trait", + "thiserror", "tokio", ] @@ -160,6 +170,12 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1be380c410bf0595e94992a648ea89db4dd3f3354ba54af206fd2a68cf5ac8e" +[[package]] +name = "itoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" + [[package]] name = "libc" version = "0.2.149" @@ -320,6 +336,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + [[package]] name = "scopeguard" version = "1.2.0" @@ -346,6 +368,17 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "serde_json" +version = "1.0.107" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -399,6 +432,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "tokio" version = "1.33.0" diff --git a/flows.json b/flows.json new file mode 100644 index 0000000..b0d0aed --- /dev/null +++ b/flows.json @@ -0,0 +1,55 @@ +[ + { + "id": "dee0d1b0cfd62a6c", + "type": "tab", + "label": "流程 1", + "disabled": false, + "info": "", + "env": [] + }, + { + "id": "bf843d35fe7cf583", + "type": "inject", + "z": "dee0d1b0cfd62a6c", + "name": "", + "props": [ + { + "p": "payload" + }, + { + "p": "topic", + "vt": "str" + } + ], + "repeat": "", + "crontab": "", + "once": false, + "onceDelay": 0.1, + "topic": "", + "payload": "", + "payloadType": "date", + "x": 350, + "y": 180, + "wires": [ + [ + "c75509302b4b8fc1" + ] + ] + }, + { + "id": "c75509302b4b8fc1", + "type": "debug", + "z": "dee0d1b0cfd62a6c", + "name": "debug 1", + "active": true, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "false", + "statusVal": "", + "statusType": "auto", + "x": 540, + "y": 180, + "wires": [] + } +] \ No newline at end of file diff --git a/libs/abstractions/Cargo.toml b/libs/abstractions/Cargo.toml index 3b511ce..02f0c10 100644 --- a/libs/abstractions/Cargo.toml +++ b/libs/abstractions/Cargo.toml @@ -14,4 +14,5 @@ strip = true [dependencies] async-trait = "0.1.73" -tokio = "1.33.0" \ No newline at end of file +thiserror = "1.0.49" +tokio = "1.33.0" diff --git a/libs/abstractions/src/engine.rs b/libs/abstractions/src/engine.rs index e7faa9c..e26ffa4 100644 --- a/libs/abstractions/src/engine.rs +++ b/libs/abstractions/src/engine.rs @@ -1,31 +1,25 @@ -use std::{sync::Arc, cell::RefCell}; -use tokio::sync::{Mutex}; -use async_trait::async_trait; use crate::nodes::*; use crate::Variant; - +use async_trait::async_trait; +use std::cell::RefCell; +use std::sync::{Arc, Mutex}; #[async_trait] -pub trait FlowBehavior { +pub trait FlowBehavior : Send + Sync { fn id(&self) -> u64; - fn name(&self) -> &str; - async fn start(&self); - async fn stop(&self); -} + fn label(&self) -> &str; -#[async_trait] -pub trait FlowEngine { - async fn start(&self); - async fn stop(&self); + async fn start(&mut self); + async fn stop(&mut self); } -#[derive(Clone)] -pub struct Engine { - pub nodes: Arc>>>, -} +pub type Flows = Vec>>>; #[async_trait] -pub trait FlowEngineBehavior { - async fn start(&self); - async fn stop(&self); +pub trait FlowEngine: Send + Sync { + fn get_flows(&self) -> &Flows; + fn get_flows_mut(&mut self) -> &mut Flows; + + async fn start(&mut self); + async fn stop(&mut self); } diff --git a/libs/abstractions/src/lib.rs b/libs/abstractions/src/lib.rs index a3a4950..75325d2 100644 --- a/libs/abstractions/src/lib.rs +++ b/libs/abstractions/src/lib.rs @@ -1,5 +1,7 @@ -pub mod nodes; +use thiserror::Error; + pub mod engine; +pub mod nodes; pub mod variant; pub use crate::variant::Variant; @@ -20,3 +22,18 @@ pub trait Plugin { /// Callbacks can take arguments and return values fn callback2(&self, i: i32) -> i32; } + +#[derive(Error, Debug)] +pub enum EdgeLinkError { + + #[error("Invalid 'flows.json': {0}")] + BadFlowsJson(String), + + #[error("Missing attribute: {0}")] + MissingAttribute(String), + +} + +pub type Error = Box; + +pub type Result = std::result::Result; diff --git a/libs/abstractions/src/nodes.rs b/libs/abstractions/src/nodes.rs index f2046e4..b77ca7d 100644 --- a/libs/abstractions/src/nodes.rs +++ b/libs/abstractions/src/nodes.rs @@ -30,8 +30,8 @@ pub struct BaseNode { #[async_trait] pub trait NodeBehavior: Send { - async fn start(&self); - async fn stop(&self); + async fn start(&mut self); + async fn stop(&mut self); } pub struct FlowNode { diff --git a/libs/edgelink/Cargo.toml b/libs/edgelink/Cargo.toml index f1c85cd..026a968 100644 --- a/libs/edgelink/Cargo.toml +++ b/libs/edgelink/Cargo.toml @@ -19,3 +19,6 @@ async-trait = "0.1.73" ciborium = "0.2.1" log = "0.4.20" inventory = "0.3.12" +serde_json = "1.0.107" +serde = "1.0.188" +anyhow = "1.0.75" diff --git a/libs/edgelink/src/engine.rs b/libs/edgelink/src/engine.rs index 232a0f4..1fe601e 100644 --- a/libs/edgelink/src/engine.rs +++ b/libs/edgelink/src/engine.rs @@ -1,46 +1,67 @@ use async_trait::async_trait; use log; -use std::cell::{Cell, RefCell}; +use std::borrow::BorrowMut; +use std::cell::RefCell; use std::collections::BTreeMap; -use std::future::Future; -use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard}; +use std::fs::File; +use std::io::Read; +use std::sync::{Arc, Mutex}; use tokio::task::yield_now; use tokio::{spawn, task, time}; -use ciborium::Value; - use crate::nodes::*; use edgelink_abstractions::nodes::*; -use edgelink_abstractions::engine::*; use edgelink_abstractions::Variant; +use edgelink_abstractions::{engine::*, EdgeLinkError, Error, Result}; + +#[derive(Debug, serde::Deserialize)] +struct FlowConfig { + id: String, + label: String, + disabled: bool, + info: String, +} pub struct Flow { pub id: u64, - pub name: String, + pub label: String, + pub disabled: bool, pub nodes: Arc>>>, pub context: Mutex>, } impl Flow { - pub fn new(id: u64, name: String) -> Self { - log::info!("Loading flow (id={0}, name='{1}'):", id, name); + pub fn new( + flow_elem: &serde_json::Value, + elements: &Vec, + ) -> anyhow::Result { + let flow_config: FlowConfig = serde_json::from_value(flow_elem.clone())?; + + println!( + "-- Loading flow (id={0}, label='{1}'):", + flow_config.id, flow_config.label + ); + + let u64_id = u64::from_str_radix(&flow_config.id, 16)?; let mut ctx_map = BTreeMap::new(); - let hex_id = format!("{:016x}", id); - ctx_map.insert("id".to_string(), Variant::String(hex_id)); - ctx_map.insert("name".to_string(), Variant::String(name.clone())); + ctx_map.insert("id".to_string(), Variant::String(flow_config.id)); + ctx_map.insert( + "label".to_string(), + Variant::String(flow_config.label.clone()), + ); for bnd in inventory::iter:: { println!("-- kind={}, type-name={}", bnd.kind, bnd.type_name); } - Flow { - id, - name, + Ok(Flow { + id: u64_id, + label: flow_config.label.clone(), + disabled: flow_config.disabled, nodes: Arc::new(Mutex::new(Vec::new())), context: Mutex::new(RefCell::new(Variant::Object(ctx_map))), - } + }) } fn nodes(&self) -> Arc>>> { @@ -60,15 +81,73 @@ impl Flow { #[async_trait] impl FlowBehavior for Flow { - fn id(&self) -> u64 { self.id } - fn name(&self) -> &str { - &self.name + fn label(&self) -> &str { + &self.label + } + + async fn start(&mut self) { + println!("Starting Flow (id={0})...", self.id); + } + + async fn stop(&mut self) { + println!("Starting Flow (id={0})...", self.id); + } +} + +pub struct FlowEngineState { + pub flows: edgelink_abstractions::engine::Flows, +} + +impl FlowEngineState { + pub fn new(flows_json_path: &str) -> Result { + let mut file = File::open(flows_json_path)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let mut flows = Flows::new(); + let json_value: serde_json::Value = serde_json::from_str(&contents)?; + if let Some(elements) = json_value.as_array() { + for e in elements.iter() { + if let Some(item_type) = e["type"].as_str() { + if item_type == "tab" { + // let flow = &Flow::new(&e, &elements)? as &dyn FlowBehavior; + let flow = Flow::new(&e, &elements)?; + flows.push(Arc::new(Mutex::new(Box::new(flow)))); + } + } + } + } + + Ok(FlowEngineState { flows: flows }) + } +} + +#[async_trait] +impl FlowEngine for FlowEngineState { + fn get_flows(&self) -> &Flows { + &self.flows + } + + fn get_flows_mut(&mut self) -> &mut Flows { + &mut self.flows + } + + async fn start(&mut self) { + let mut flows = self.get_flows_mut(); + for flow in flows { + let flow_clone = flow.clone(); + let mut locked_flow = flow_clone.lock().unwrap(); + println!( + "Starting: Flow(id={0}, label='{1}')", + locked_flow.id(), + locked_flow.label() + ); + } } - async fn start(&self) {} - async fn stop(&self) {} + async fn stop(&mut self) {} } diff --git a/src/main.rs b/src/main.rs index a671c04..c6610d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use edgelink::engine::Flow; +use edgelink::engine::{Flow, FlowEngine}; use libloading::Library; use std::cell::{Cell, RefCell}; use std::future::Future; @@ -53,7 +53,7 @@ async fn main() { // m.run().await; println!("EdgeLink 1.0"); - let flow = Flow::new(123, "test flow".to_string()); + let engine = FlowEngineState::new("./flows.json").unwrap(); loop { time::sleep(tokio::time::Duration::from_secs(1)).await;