Skip to content

Commit

Permalink
Basic GraphQL support (#46)
Browse files Browse the repository at this point in the history
* initial graphql support
* add dependencies for pull and PullLevel
  • Loading branch information
li1 authored and comnik committed Apr 24, 2019
1 parent 9cc21eb commit 04ab2fc
Show file tree
Hide file tree
Showing 5 changed files with 596 additions and 302 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ serde_json = { version = "1", optional = true }
csv = { version = "1", optional = true }
chrono = { version = "0.4", optional = true }
uuid = { version = "0.7", features = ["serde"], optional = true }
graphql-parser = { version = "0.2.2", optional = true }

[features]
real-time = []
set-semantics = []
csv-source = ["csv", "chrono"]
json-source = ["serde_json", "chrono"]
graphql = ["graphql-parser", "serde_json"]

[profile.release]
opt-level = 3
Expand Down
229 changes: 229 additions & 0 deletions src/plan/graphql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
//! GraphQL expression plan.
use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::Scope;
use timely::progress::Timestamp;

use differential_dataflow::lattice::Lattice;

use graphql_parser::parse_query;
use graphql_parser::query::{Definition, Document, OperationDefinition, Selection, SelectionSet};

use crate::plan::{Dependencies, ImplContext, Implementable};
use crate::plan::{Plan, Pull, PullLevel};
use crate::{Implemented, ShutdownHandle, VariableMap};

/// A plan for GraphQL queries, e.g. `{ Heroes { name age weight } }`
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct GraphQl {
/// String representation of GraphQL query
pub query: String,
/// Cached paths
pub paths: Vec<PullLevel<Plan>>,
}

impl GraphQl {
/// Creates a new GraphQL instance by parsing the ast obtained from the provided query
pub fn new(query: String) -> Self {
let q = query.clone();
GraphQl {
query,
paths: ast_to_paths(parse_query(&q).expect("graphQL ast parsing failed")),
}
}
}

fn selection_set_to_paths(
selection_set: &SelectionSet,
parent_path: &Vec<String>,
at_root: bool,
) -> Vec<PullLevel<Plan>> {
let mut result = vec![];
let mut pull_attributes = vec![];
let variables = vec![];

for item in &selection_set.items {
match item {
Selection::Field(field) => {
if field.selection_set.items.is_empty() {
pull_attributes.push(field.name.to_string());
}

let mut new_parent_path = parent_path.to_vec();
new_parent_path.push(field.name.to_string());

result.extend(selection_set_to_paths(
&field.selection_set,
&new_parent_path,
parent_path.is_empty(),
));
}
_ => unimplemented!(),
}
}

// parent_path handles root path case
if !pull_attributes.is_empty() && !parent_path.is_empty() {
// for root, we expect a NameExpr that puts the pulled IDs in the v position
let plan = if at_root {
Plan::NameExpr(vec![0, 1], parent_path.last().unwrap().to_string())
} else {
Plan::MatchA(0, parent_path.last().unwrap().to_string(), 1)
};

let pull_level = PullLevel {
pull_attributes,
path_attributes: parent_path.to_vec(),
pull_variable: 1,
variables,
plan: Box::new(plan),
};
result.push(pull_level);
}

result
}

/// converts an ast to paths
/// The structure of a typical parsed ast looks like this:
/// ```
/// Document {
/// definitions: [
/// Operation(SelectionSet(SelectionSet {
/// items: [
/// Field(Field {
/// name: ...,
/// selection_set: SelectionSet(...}
/// }),
/// ...
/// ]
/// }))
/// ]
/// }
/// ```
fn ast_to_paths(ast: Document) -> Vec<PullLevel<Plan>> {
let mut result = vec![];
for definition in &ast.definitions {
match definition {
Definition::Operation(operation_definition) => match operation_definition {
OperationDefinition::Query(_) => unimplemented!(),
OperationDefinition::SelectionSet(selection_set) => {
result.extend(selection_set_to_paths(selection_set, &vec![], true))
}
_ => unimplemented!(),
},
Definition::Fragment(_) => unimplemented!(),
};
}

result
}

/// Converts a vector of paths to a GraphQL-like nested value.
pub fn paths_to_nested(paths: Vec<Vec<crate::Value>>) -> serde_json::Value {
use crate::Value::{Aid, Eid};
use serde_json::map::Map;

let mut acc = Map::new();
for mut path in paths {
let mut current_map = &mut acc;
let last_val = path.pop().unwrap();

if let Aid(last_key) = path.pop().unwrap() {
for attribute in path {
let attr = match attribute {
Aid(x) => x,
Eid(x) => x.to_string(),
_ => unreachable!(),
};

let entry = current_map
.entry(attr)
.or_insert_with(|| serde_json::Value::Object(Map::new()));

*entry = match entry {
serde_json::Value::Object(m) => {
serde_json::Value::Object(std::mem::replace(m, Map::new()))
}
serde_json::Value::Array(_) => unreachable!(),
_ => serde_json::Value::Object(Map::new()),
};

match entry {
serde_json::Value::Object(m) => current_map = m,
_ => unreachable!(),
};
}

match current_map.get(&last_key) {
Some(serde_json::Value::Object(_)) => (),
_ => {
current_map.insert(last_key, serde_json::json!(last_val));
}
};
} else {
unreachable!();
}
}

serde_json::Value::Object(acc)
}

impl Implementable for GraphQl {
fn dependencies(&self) -> Dependencies {
let mut dependencies = Dependencies::none();

for path in self.paths.iter() {
dependencies = Dependencies::merge(dependencies, path.dependencies());
}

dependencies
}

fn implement<'b, T, I, S>(
&self,
nested: &mut Iterative<'b, S, u64>,
local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
context: &mut I,
) -> (Implemented<'b, S>, ShutdownHandle)
where
T: Timestamp + Lattice,
I: ImplContext<T>,
S: Scope<Timestamp = T>,
{
let ast = parse_query(&self.query).expect("graphQL ast parsing failed");
let parsed = Pull {
variables: vec![],
paths: ast_to_paths(ast),
};

parsed.implement(nested, local_arrangements, context)
}
}

// relation
// .inner
// .map(|x| ((), x))
// .inspect(|x| { println!("{:?}", x); })
// .aggregate::<_,Vec<_>,_,_,_>(
// |_key, (path, _time, _diff), acc| { acc.push(path); },
// |_key, paths| {
// paths_to_nested(paths)
// // squash_nested(nested)
// },
// |_key| 1)

// /// Register a GraphQL query
// pub fn register_graph_ql(&mut self, query: String, name: &str) {
// use crate::plan::{GraphQl, Plan};

// let req = Register {
// rules: vec![Rule {
// name: name.to_string(),
// plan: Plan::GraphQl(GraphQl::new(query)),
// }],
// publish: vec![name.to_string()],
// };

// self.register(req).unwrap();
// }
17 changes: 17 additions & 0 deletions src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod aggregate;
pub mod aggregate_neu;
pub mod antijoin;
pub mod filter;
#[cfg(feature = "graphql")]
pub mod graphql;
pub mod hector;
pub mod join;
pub mod project;
Expand All @@ -39,6 +41,8 @@ pub use self::aggregate::{Aggregate, AggregationFn};
pub use self::aggregate_neu::{Aggregate, AggregationFn};
pub use self::antijoin::Antijoin;
pub use self::filter::{Filter, Predicate};
#[cfg(feature = "graphql")]
pub use self::graphql::GraphQl;
pub use self::hector::Hector;
pub use self::join::Join;
pub use self::project::Project;
Expand Down Expand Up @@ -205,6 +209,9 @@ pub enum Plan {
Pull(Pull<Plan>),
/// Single-level pull expression
PullLevel(PullLevel<Plan>),
/// GraphQl pull expression
#[cfg(feature = "graphql")]
GraphQl(GraphQl),
}

impl Plan {
Expand All @@ -226,6 +233,8 @@ impl Plan {
Plan::NameExpr(ref variables, ref _name) => variables.clone(),
Plan::Pull(ref pull) => pull.variables.clone(),
Plan::PullLevel(ref path) => path.variables.clone(),
#[cfg(feature = "graphql")]
Plan::GraphQl(_) => unimplemented!(),
}
}
}
Expand All @@ -249,6 +258,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref name) => Dependencies::name(name),
Plan::Pull(ref pull) => pull.dependencies(),
Plan::PullLevel(ref path) => path.dependencies(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.dependencies(),
}
}

Expand Down Expand Up @@ -282,6 +293,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref _name) => unimplemented!(), // @TODO hmm...
Plan::Pull(ref pull) => pull.into_bindings(),
Plan::PullLevel(ref path) => path.into_bindings(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.into_bindings(),
}
}

Expand Down Expand Up @@ -321,6 +334,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref _name) => Vec::new(),
Plan::Pull(ref pull) => pull.datafy(),
Plan::PullLevel(ref path) => path.datafy(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.datafy(),
}
}

Expand Down Expand Up @@ -487,6 +502,8 @@ impl Implementable for Plan {
}
Plan::Pull(ref pull) => pull.implement(nested, local_arrangements, context),
Plan::PullLevel(ref path) => path.implement(nested, local_arrangements, context),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref query) => query.implement(nested, local_arrangements, context),
}
}
}
16 changes: 14 additions & 2 deletions src/plan/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ fn interleave(values: &[Value], constants: &[Aid]) -> Vec<Value> {

impl<P: Implementable> Implementable for PullLevel<P> {
fn dependencies(&self) -> Dependencies {
Dependencies::none()
let mut dependencies = self.plan.dependencies();

for attribute in &self.pull_attributes {
let attribute_dependencies = Dependencies::attribute(&attribute);
dependencies = Dependencies::merge(dependencies, attribute_dependencies);
}

dependencies
}

fn implement<'b, T, I, S>(
Expand Down Expand Up @@ -186,7 +193,12 @@ impl<P: Implementable> Implementable for PullLevel<P> {

impl<P: Implementable> Implementable for Pull<P> {
fn dependencies(&self) -> Dependencies {
Dependencies::none()
let mut dependencies = Dependencies::none();
for path in self.paths.iter() {
dependencies = Dependencies::merge(dependencies, path.dependencies());
}

dependencies
}

fn implement<'b, T, I, S>(
Expand Down
Loading

0 comments on commit 04ab2fc

Please sign in to comment.