diff --git a/Cargo.toml b/Cargo.toml index 2a7ab8d..967967b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,12 +30,14 @@ serde_json = { version = "1", optional = true } csv = { version = "1", optional = true } chrono = { version = "0.4", optional = true } uuid = { version = "0.7", features = ["serde"], optional = true } +graphql-parser = { version = "0.2.2", optional = true } [features] real-time = [] set-semantics = [] csv-source = ["csv", "chrono"] json-source = ["serde_json", "chrono"] +graphql = ["graphql-parser", "serde_json"] [profile.release] opt-level = 3 diff --git a/src/plan/graphql.rs b/src/plan/graphql.rs new file mode 100644 index 0000000..da621bc --- /dev/null +++ b/src/plan/graphql.rs @@ -0,0 +1,229 @@ +//! GraphQL expression plan. + +use timely::dataflow::scopes::child::Iterative; +use timely::dataflow::Scope; +use timely::progress::Timestamp; + +use differential_dataflow::lattice::Lattice; + +use graphql_parser::parse_query; +use graphql_parser::query::{Definition, Document, OperationDefinition, Selection, SelectionSet}; + +use crate::plan::{Dependencies, ImplContext, Implementable}; +use crate::plan::{Plan, Pull, PullLevel}; +use crate::{Implemented, ShutdownHandle, VariableMap}; + +/// A plan for GraphQL queries, e.g. `{ Heroes { name age weight } }` +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct GraphQl { + /// String representation of GraphQL query + pub query: String, + /// Cached paths + pub paths: Vec>, +} + +impl GraphQl { + /// Creates a new GraphQL instance by parsing the ast obtained from the provided query + pub fn new(query: String) -> Self { + let q = query.clone(); + GraphQl { + query, + paths: ast_to_paths(parse_query(&q).expect("graphQL ast parsing failed")), + } + } +} + +fn selection_set_to_paths( + selection_set: &SelectionSet, + parent_path: &Vec, + at_root: bool, +) -> Vec> { + let mut result = vec![]; + let mut pull_attributes = vec![]; + let variables = vec![]; + + for item in &selection_set.items { + match item { + Selection::Field(field) => { + if field.selection_set.items.is_empty() { + pull_attributes.push(field.name.to_string()); + } + + let mut new_parent_path = parent_path.to_vec(); + new_parent_path.push(field.name.to_string()); + + result.extend(selection_set_to_paths( + &field.selection_set, + &new_parent_path, + parent_path.is_empty(), + )); + } + _ => unimplemented!(), + } + } + + // parent_path handles root path case + if !pull_attributes.is_empty() && !parent_path.is_empty() { + // for root, we expect a NameExpr that puts the pulled IDs in the v position + let plan = if at_root { + Plan::NameExpr(vec![0, 1], parent_path.last().unwrap().to_string()) + } else { + Plan::MatchA(0, parent_path.last().unwrap().to_string(), 1) + }; + + let pull_level = PullLevel { + pull_attributes, + path_attributes: parent_path.to_vec(), + pull_variable: 1, + variables, + plan: Box::new(plan), + }; + result.push(pull_level); + } + + result +} + +/// converts an ast to paths +/// The structure of a typical parsed ast looks like this: +/// ``` +/// Document { +/// definitions: [ +/// Operation(SelectionSet(SelectionSet { +/// items: [ +/// Field(Field { +/// name: ..., +/// selection_set: SelectionSet(...} +/// }), +/// ... +/// ] +/// })) +/// ] +/// } +/// ``` +fn ast_to_paths(ast: Document) -> Vec> { + let mut result = vec![]; + for definition in &ast.definitions { + match definition { + Definition::Operation(operation_definition) => match operation_definition { + OperationDefinition::Query(_) => unimplemented!(), + OperationDefinition::SelectionSet(selection_set) => { + result.extend(selection_set_to_paths(selection_set, &vec![], true)) + } + _ => unimplemented!(), + }, + Definition::Fragment(_) => unimplemented!(), + }; + } + + result +} + +/// Converts a vector of paths to a GraphQL-like nested value. +pub fn paths_to_nested(paths: Vec>) -> serde_json::Value { + use crate::Value::{Aid, Eid}; + use serde_json::map::Map; + + let mut acc = Map::new(); + for mut path in paths { + let mut current_map = &mut acc; + let last_val = path.pop().unwrap(); + + if let Aid(last_key) = path.pop().unwrap() { + for attribute in path { + let attr = match attribute { + Aid(x) => x, + Eid(x) => x.to_string(), + _ => unreachable!(), + }; + + let entry = current_map + .entry(attr) + .or_insert_with(|| serde_json::Value::Object(Map::new())); + + *entry = match entry { + serde_json::Value::Object(m) => { + serde_json::Value::Object(std::mem::replace(m, Map::new())) + } + serde_json::Value::Array(_) => unreachable!(), + _ => serde_json::Value::Object(Map::new()), + }; + + match entry { + serde_json::Value::Object(m) => current_map = m, + _ => unreachable!(), + }; + } + + match current_map.get(&last_key) { + Some(serde_json::Value::Object(_)) => (), + _ => { + current_map.insert(last_key, serde_json::json!(last_val)); + } + }; + } else { + unreachable!(); + } + } + + serde_json::Value::Object(acc) +} + +impl Implementable for GraphQl { + fn dependencies(&self) -> Dependencies { + let mut dependencies = Dependencies::none(); + + for path in self.paths.iter() { + dependencies = Dependencies::merge(dependencies, path.dependencies()); + } + + dependencies + } + + fn implement<'b, T, I, S>( + &self, + nested: &mut Iterative<'b, S, u64>, + local_arrangements: &VariableMap>, + context: &mut I, + ) -> (Implemented<'b, S>, ShutdownHandle) + where + T: Timestamp + Lattice, + I: ImplContext, + S: Scope, + { + let ast = parse_query(&self.query).expect("graphQL ast parsing failed"); + let parsed = Pull { + variables: vec![], + paths: ast_to_paths(ast), + }; + + parsed.implement(nested, local_arrangements, context) + } +} + +// relation +// .inner +// .map(|x| ((), x)) +// .inspect(|x| { println!("{:?}", x); }) +// .aggregate::<_,Vec<_>,_,_,_>( +// |_key, (path, _time, _diff), acc| { acc.push(path); }, +// |_key, paths| { +// paths_to_nested(paths) +// // squash_nested(nested) +// }, +// |_key| 1) + +// /// Register a GraphQL query +// pub fn register_graph_ql(&mut self, query: String, name: &str) { +// use crate::plan::{GraphQl, Plan}; + +// let req = Register { +// rules: vec![Rule { +// name: name.to_string(), +// plan: Plan::GraphQl(GraphQl::new(query)), +// }], +// publish: vec![name.to_string()], +// }; + +// self.register(req).unwrap(); +// } diff --git a/src/plan/mod.rs b/src/plan/mod.rs index 87ae3cd..526396d 100644 --- a/src/plan/mod.rs +++ b/src/plan/mod.rs @@ -26,6 +26,8 @@ pub mod aggregate; pub mod aggregate_neu; pub mod antijoin; pub mod filter; +#[cfg(feature = "graphql")] +pub mod graphql; pub mod hector; pub mod join; pub mod project; @@ -39,6 +41,8 @@ pub use self::aggregate::{Aggregate, AggregationFn}; pub use self::aggregate_neu::{Aggregate, AggregationFn}; pub use self::antijoin::Antijoin; pub use self::filter::{Filter, Predicate}; +#[cfg(feature = "graphql")] +pub use self::graphql::GraphQl; pub use self::hector::Hector; pub use self::join::Join; pub use self::project::Project; @@ -205,6 +209,9 @@ pub enum Plan { Pull(Pull), /// Single-level pull expression PullLevel(PullLevel), + /// GraphQl pull expression + #[cfg(feature = "graphql")] + GraphQl(GraphQl), } impl Plan { @@ -226,6 +233,8 @@ impl Plan { Plan::NameExpr(ref variables, ref _name) => variables.clone(), Plan::Pull(ref pull) => pull.variables.clone(), Plan::PullLevel(ref path) => path.variables.clone(), + #[cfg(feature = "graphql")] + Plan::GraphQl(_) => unimplemented!(), } } } @@ -249,6 +258,8 @@ impl Implementable for Plan { Plan::NameExpr(_, ref name) => Dependencies::name(name), Plan::Pull(ref pull) => pull.dependencies(), Plan::PullLevel(ref path) => path.dependencies(), + #[cfg(feature = "graphql")] + Plan::GraphQl(ref q) => q.dependencies(), } } @@ -282,6 +293,8 @@ impl Implementable for Plan { Plan::NameExpr(_, ref _name) => unimplemented!(), // @TODO hmm... Plan::Pull(ref pull) => pull.into_bindings(), Plan::PullLevel(ref path) => path.into_bindings(), + #[cfg(feature = "graphql")] + Plan::GraphQl(ref q) => q.into_bindings(), } } @@ -321,6 +334,8 @@ impl Implementable for Plan { Plan::NameExpr(_, ref _name) => Vec::new(), Plan::Pull(ref pull) => pull.datafy(), Plan::PullLevel(ref path) => path.datafy(), + #[cfg(feature = "graphql")] + Plan::GraphQl(ref q) => q.datafy(), } } @@ -487,6 +502,8 @@ impl Implementable for Plan { } Plan::Pull(ref pull) => pull.implement(nested, local_arrangements, context), Plan::PullLevel(ref path) => path.implement(nested, local_arrangements, context), + #[cfg(feature = "graphql")] + Plan::GraphQl(ref query) => query.implement(nested, local_arrangements, context), } } } diff --git a/src/plan/pull.rs b/src/plan/pull.rs index be13cae..edf5976 100644 --- a/src/plan/pull.rs +++ b/src/plan/pull.rs @@ -75,7 +75,14 @@ fn interleave(values: &[Value], constants: &[Aid]) -> Vec { impl Implementable for PullLevel

{ fn dependencies(&self) -> Dependencies { - Dependencies::none() + let mut dependencies = self.plan.dependencies(); + + for attribute in &self.pull_attributes { + let attribute_dependencies = Dependencies::attribute(&attribute); + dependencies = Dependencies::merge(dependencies, attribute_dependencies); + } + + dependencies } fn implement<'b, T, I, S>( @@ -186,7 +193,12 @@ impl Implementable for PullLevel

{ impl Implementable for Pull

{ fn dependencies(&self) -> Dependencies { - Dependencies::none() + let mut dependencies = Dependencies::none(); + for path in self.paths.iter() { + dependencies = Dependencies::merge(dependencies, path.dependencies()); + } + + dependencies } fn implement<'b, T, I, S>( diff --git a/tests/pull_test.rs b/tests/pull_test.rs index 9794755..7913ae1 100644 --- a/tests/pull_test.rs +++ b/tests/pull_test.rs @@ -1,236 +1,255 @@ use std::collections::HashSet; +use std::iter::FromIterator; use std::sync::mpsc::channel; use std::time::Duration; -use declarative_dataflow::plan::{Pull, PullLevel}; -use declarative_dataflow::server::Server; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Operator; + +use declarative_dataflow::plan::{Dependencies, Implementable, Pull, PullLevel}; +use declarative_dataflow::server::{Register, Server}; use declarative_dataflow::{AttributeConfig, InputSemantics, Plan, Rule, TxData, Value}; -use InputSemantics::Raw; use Value::{Aid, Bool, Eid, Number, String}; -#[test] -fn pull_level() { - timely::execute_directly(|worker| { - let mut server = Server::::new(Default::default()); - let (send_results, results) = channel(); +struct Case { + description: &'static str, + plan: Plan, + root_plan: Option, + transactions: Vec>, + expectations: Vec, u64, isize)>>, +} - let (e, a, v) = (1, 2, 3); - let plan = Plan::PullLevel(PullLevel { - variables: vec![], - pull_variable: e, - plan: Box::new(Plan::MatchAV(e, "admin?".to_string(), Bool(false))), - pull_attributes: vec!["name".to_string(), "age".to_string()], - path_attributes: vec![], - }); +fn run_cases(mut cases: Vec) { + for case in cases.drain(..) { + timely::execute_directly(move |worker| { + let mut server = Server::::new(Default::default()); + let (send_results, results) = channel(); - worker.dataflow::(|scope| { - server - .context - .internal - .create_transactable_attribute("admin?", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("name", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("age", AttributeConfig::tx_time(Raw), scope) - .unwrap(); + dbg!(case.description); - server - .test_single( - scope, - Rule { - name: "pull_level".to_string(), - plan, - }, - ) - .inspect(move |x| { - send_results.send((x.0.clone(), x.2)).unwrap(); - }); - }); + let mut deps = case.plan.dependencies(); + let plan = case.plan.clone(); + let root_plan = case.root_plan.clone(); - server - .transact( - vec![ - TxData(1, 100, "admin?".to_string(), Bool(true)), - TxData(1, 200, "admin?".to_string(), Bool(false)), - TxData(1, 300, "admin?".to_string(), Bool(false)), - TxData(1, 100, "name".to_string(), String("Mabel".to_string())), - TxData(1, 200, "name".to_string(), String("Dipper".to_string())), - TxData(1, 300, "name".to_string(), String("Soos".to_string())), - TxData(1, 100, "age".to_string(), Number(12)), - TxData(1, 200, "age".to_string(), Number(13)), - ], - 0, - 0, - ) - .unwrap(); + if let Some(ref root_plan) = root_plan { + deps = Dependencies::merge(deps, root_plan.dependencies()); + } + + for tx in case.transactions.iter() { + for datum in tx { + deps.attributes.insert(datum.2.clone()); + } + } - server.advance_domain(None, 1).unwrap(); + worker.dataflow::(|scope| { + for dep in deps.attributes.iter() { + server + .context + .internal + .create_transactable_attribute( + dep, + AttributeConfig::tx_time(InputSemantics::Raw), + scope, + ) + .unwrap(); + } - worker.step_while(|| server.is_any_outdated()); + if let Some(root_plan) = root_plan { + server + .register(Register { + rules: vec![Rule { + name: "root".to_string(), + plan: root_plan, + }], + publish: vec!["root".to_string()], + }) + .unwrap(); + } - let mut expected = HashSet::new(); - expected.insert((vec![Eid(200), Aid("age".to_string()), Number(13)], 1)); - expected.insert(( - vec![ - Eid(200), - Aid("name".to_string()), - String("Dipper".to_string()), - ], - 1, - )); - expected.insert(( - vec![ - Eid(300), - Aid("name".to_string()), - String("Soos".to_string()), - ], - 1, - )); + server + .test_single( + scope, + Rule { + name: "query".to_string(), + plan, + }, + ) + .inner + .sink(Pipeline, "Results", move |input| { + input.for_each(|_time, data| { + for datum in data.iter() { + send_results.send(datum.clone()).unwrap() + } + }); + }); + }); + + let mut transactions = case.transactions.clone(); + let mut next_tx = 0; + + for (tx_id, tx_data) in transactions.drain(..).enumerate() { + next_tx += 1; + + server.transact(tx_data, 0, 0).unwrap(); + server.advance_domain(None, next_tx).unwrap(); + + worker.step_while(|| server.is_any_outdated()); + + let mut expected: HashSet<(Vec, u64, isize)> = + HashSet::from_iter(case.expectations[tx_id].iter().cloned()); - for _i in 0..expected.len() { - let result = results.recv().unwrap(); - if !expected.remove(&result) { - panic!("unknown result {:?}", result); + for _i in 0..expected.len() { + match results.recv_timeout(Duration::from_millis(400)) { + Err(_err) => { + panic!("No result."); + } + Ok(result) => { + if !expected.remove(&result) { + panic!("Unknown result {:?}.", result); + } + } + } + } + + match results.recv_timeout(Duration::from_millis(400)) { + Err(_err) => {} + Ok(result) => { + panic!("Extraneous result {:?}", result); + } + } } - } + }); + } +} - assert!(results.recv_timeout(Duration::from_millis(400)).is_err()); - }); +#[test] +fn pull_level() { + run_cases(vec![Case { + description: "[:find (pull ?e [:name :age]) :where [?e :admin? false]]", + plan: Plan::PullLevel(PullLevel { + variables: vec![], + pull_variable: 0, + plan: Box::new(Plan::MatchAV(0, "admin?".to_string(), Bool(false))), + pull_attributes: vec!["name".to_string(), "age".to_string()], + path_attributes: vec![], + }), + root_plan: None, + transactions: vec![vec![ + TxData(1, 100, "admin?".to_string(), Bool(true)), + TxData(1, 200, "admin?".to_string(), Bool(false)), + TxData(1, 300, "admin?".to_string(), Bool(false)), + TxData(1, 100, "name".to_string(), String("Mabel".to_string())), + TxData(1, 200, "name".to_string(), String("Dipper".to_string())), + TxData(1, 300, "name".to_string(), String("Soos".to_string())), + TxData(1, 100, "age".to_string(), Number(12)), + TxData(1, 200, "age".to_string(), Number(13)), + ]], + expectations: vec![vec![ + (vec![Eid(200), Aid("age".to_string()), Number(13)], 0, 1), + ( + vec![ + Eid(200), + Aid("name".to_string()), + String("Dipper".to_string()), + ], + 0, + 1, + ), + ( + vec![ + Eid(300), + Aid("name".to_string()), + String("Soos".to_string()), + ], + 0, + 1, + ), + ]], + }]); } #[test] fn pull_children() { - timely::execute_directly(|worker| { - let mut server = Server::::new(Default::default()); - let (send_results, results) = channel(); + let (parent, child) = (1, 2); - let (parent, child, a, v) = (1, 2, 3, 4); - let plan = Plan::PullLevel(PullLevel { + run_cases(vec![Case { + description: "[:find (pull ?child [:name :age]) :where [_ :parent/child ?child]]", + plan: Plan::PullLevel(PullLevel { variables: vec![], pull_variable: child, plan: Box::new(Plan::MatchA(parent, "parent/child".to_string(), child)), pull_attributes: vec!["name".to_string(), "age".to_string()], path_attributes: vec!["parent/child".to_string()], - }); - - worker.dataflow::(|scope| { - server - .context - .internal - .create_transactable_attribute("parent/child", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("name", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("age", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - - server - .test_single( - scope, - Rule { - name: "pull_children".to_string(), - plan, - }, - ) - .inspect(move |x| { - send_results.send((x.0.clone(), x.2)).unwrap(); - }); - }); - - server - .transact( + }), + root_plan: None, + transactions: vec![vec![ + TxData(1, 100, "name".to_string(), String("Alice".to_string())), + TxData(1, 100, "parent/child".to_string(), Eid(300)), + TxData(1, 200, "name".to_string(), String("Bob".to_string())), + TxData(1, 200, "parent/child".to_string(), Eid(400)), + TxData(1, 300, "name".to_string(), String("Mabel".to_string())), + TxData(1, 300, "age".to_string(), Number(13)), + TxData(1, 400, "name".to_string(), String("Dipper".to_string())), + TxData(1, 400, "age".to_string(), Number(12)), + ]], + expectations: vec![vec![ + ( vec![ - TxData(1, 100, "name".to_string(), String("Alice".to_string())), - TxData(1, 100, "parent/child".to_string(), Eid(300)), - TxData(1, 200, "name".to_string(), String("Bob".to_string())), - TxData(1, 200, "parent/child".to_string(), Eid(400)), - TxData(1, 300, "name".to_string(), String("Mabel".to_string())), - TxData(1, 300, "age".to_string(), Number(13)), - TxData(1, 400, "name".to_string(), String("Dipper".to_string())), - TxData(1, 400, "age".to_string(), Number(12)), + Eid(100), + Aid("parent/child".to_string()), + Eid(300), + Aid("age".to_string()), + Number(13), ], 0, + 1, + ), + ( + vec![ + Eid(100), + Aid("parent/child".to_string()), + Eid(300), + Aid("name".to_string()), + String("Mabel".to_string()), + ], 0, - ) - .unwrap(); - - server.advance_domain(None, 1).unwrap(); - - worker.step_while(|| server.is_any_outdated()); - - let mut expected = HashSet::new(); - expected.insert(( - vec![ - Eid(100), - Aid("parent/child".to_string()), - Eid(300), - Aid("age".to_string()), - Number(13), - ], - 1, - )); - expected.insert(( - vec![ - Eid(100), - Aid("parent/child".to_string()), - Eid(300), - Aid("name".to_string()), - String("Mabel".to_string()), - ], - 1, - )); - expected.insert(( - vec![ - Eid(200), - Aid("parent/child".to_string()), - Eid(400), - Aid("age".to_string()), - Number(12), - ], - 1, - )); - expected.insert(( - vec![ - Eid(200), - Aid("parent/child".to_string()), - Eid(400), - Aid("name".to_string()), - String("Dipper".to_string()), - ], - 1, - )); - - for _i in 0..expected.len() { - let result = results.recv().unwrap(); - if !expected.remove(&result) { - panic!("unknown result {:?}", result); - } - } - - assert!(results.recv_timeout(Duration::from_millis(400)).is_err()); - }); + 1, + ), + ( + vec![ + Eid(200), + Aid("parent/child".to_string()), + Eid(400), + Aid("age".to_string()), + Number(12), + ], + 0, + 1, + ), + ( + vec![ + Eid(200), + Aid("parent/child".to_string()), + Eid(400), + Aid("name".to_string()), + String("Dipper".to_string()), + ], + 0, + 1, + ), + ]], + }]); } #[test] fn pull() { - timely::execute_directly(|worker| { - let mut server = Server::::new(Default::default()); - let (send_results, results) = channel(); + let (a, b, c) = (1, 2, 3); - let (a, b, c) = (1, 2, 3); - let plan = Plan::Pull(Pull { + run_cases(vec![Case { + description: + "[:find (pull ?a [:name {:join/binding #:pattern[e a v]}]) :where [?a :join/binding]]", + root_plan: None, + plan: Plan::Pull(Pull { variables: vec![], paths: vec![ PullLevel { @@ -252,114 +271,129 @@ fn pull() { path_attributes: vec!["name".to_string()], }, ], - }); - - worker.dataflow::(|scope| { - server - .context - .internal - .create_transactable_attribute("name", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("join/binding", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("pattern/e", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("pattern/a", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - server - .context - .internal - .create_transactable_attribute("pattern/v", AttributeConfig::tx_time(Raw), scope) - .unwrap(); - - server - .test_single( - scope, - Rule { - name: "pull".to_string(), - plan, - }, - ) - .inspect(move |x| { - send_results.send((x.0.clone(), x.2)).unwrap(); - }); - }); - - server - .transact( + }), + transactions: vec![vec![ + TxData(1, 100, "name".to_string(), String("rule".to_string())), + TxData(1, 100, "join/binding".to_string(), Eid(200)), + TxData(1, 100, "join/binding".to_string(), Eid(300)), + TxData(1, 200, "pattern/a".to_string(), Aid("xyz".to_string())), + TxData(1, 300, "pattern/e".to_string(), Eid(12345)), + TxData(1, 300, "pattern/a".to_string(), Aid("asd".to_string())), + ]], + expectations: vec![vec![ + ( vec![ - TxData(1, 100, "name".to_string(), String("rule".to_string())), - TxData(1, 100, "join/binding".to_string(), Eid(200)), - TxData(1, 100, "join/binding".to_string(), Eid(300)), - TxData(1, 200, "pattern/a".to_string(), Aid("xyz".to_string())), - TxData(1, 300, "pattern/e".to_string(), Eid(12345)), - TxData(1, 300, "pattern/a".to_string(), Aid("asd".to_string())), + Eid(100), + Aid("name".to_string()), + String("rule".to_string()), ], 0, + 1, + ), + ( + vec![ + Eid(100), + Aid("join/binding".to_string()), + Eid(200), + Aid("pattern/a".to_string()), + Aid("xyz".to_string()), + ], 0, - ) - .unwrap(); - - server.advance_domain(None, 1).unwrap(); - - worker.step_while(|| server.is_any_outdated()); + 1, + ), + ( + vec![ + Eid(100), + Aid("join/binding".to_string()), + Eid(300), + Aid("pattern/e".to_string()), + Eid(12345), + ], + 0, + 1, + ), + ( + vec![ + Eid(100), + Aid("join/binding".to_string()), + Eid(300), + Aid("pattern/a".to_string()), + Aid("asd".to_string()), + ], + 0, + 1, + ), + ]], + }]); +} - let mut expected = HashSet::new(); - expected.insert(( - vec![ - Eid(100), - Aid("name".to_string()), - String("rule".to_string()), - ], - 1, - )); - expected.insert(( - vec![ - Eid(100), - Aid("join/binding".to_string()), - Eid(200), - Aid("pattern/a".to_string()), - Aid("xyz".to_string()), - ], - 1, - )); - expected.insert(( - vec![ - Eid(100), - Aid("join/binding".to_string()), - Eid(300), - Aid("pattern/e".to_string()), - Eid(12345), - ], - 1, - )); - expected.insert(( - vec![ - Eid(100), - Aid("join/binding".to_string()), - Eid(300), - Aid("pattern/a".to_string()), - Aid("asd".to_string()), - ], - 1, - )); +#[cfg(feature = "graphql")] +#[test] +fn graph_ql() { + use declarative_dataflow::plan::GraphQl; - for _i in 0..expected.len() { - let result = results.recv_timeout(Duration::from_millis(400)).unwrap(); - if !expected.remove(&result) { - panic!("unknown result {:?}", result); - } + run_cases(vec![{ + let q = "{root {name age height mass}}"; + Case { + description: q, + plan: Plan::GraphQl(GraphQl::new(q.to_string())), + root_plan: Some(Plan::MatchA(0, "hero".to_string(), 1)), + transactions: vec![vec![ + TxData(1, 100, "name".to_string(), String("Alice".to_string())), + TxData(1, 100, "hero".to_string(), Eid(300)), + TxData(1, 200, "name".to_string(), String("Bob".to_string())), + TxData(1, 200, "hero".to_string(), Eid(400)), + TxData(1, 300, "name".to_string(), String("Mabel".to_string())), + TxData(1, 300, "age".to_string(), Number(13)), + TxData(1, 400, "name".to_string(), String("Dipper".to_string())), + TxData(1, 400, "age".to_string(), Number(12)), + ]], + expectations: vec![vec![ + ( + vec![ + Eid(100), + Aid("root".to_string()), + Eid(300), + Aid("age".to_string()), + Number(13), + ], + 0, + 1, + ), + ( + vec![ + Eid(100), + Aid("root".to_string()), + Eid(300), + Aid("name".to_string()), + String("Mabel".to_string()), + ], + 0, + 1, + ), + ( + vec![ + Eid(200), + Aid("root".to_string()), + Eid(400), + Aid("age".to_string()), + Number(12), + ], + 0, + 1, + ), + ( + vec![ + Eid(200), + Aid("root".to_string()), + Eid(400), + Aid("name".to_string()), + String("Dipper".to_string()), + ], + 0, + 1, + ), + ]], } - - assert!(results.recv_timeout(Duration::from_millis(400)).is_err()); - }); + }]); }