From 6e12a755c28284d37da06d0f7a2f3b531c308b49 Mon Sep 17 00:00:00 2001 From: niko Date: Fri, 26 Apr 2019 01:35:37 +0200 Subject: [PATCH 01/13] Unify server output under Output enum --- server/src/main.rs | 245 ++++++++++++++++++--------------------------- 1 file changed, 99 insertions(+), 146 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 7fadb96..12f5efa 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -49,14 +49,12 @@ type T = Duration; const SERVER: Token = Token(usize::MAX - 1); const RESULTS: Token = Token(usize::MAX - 2); -const TENANT_RESULTS: Token = Token(usize::MAX - 3); -const ERRORS: Token = Token(usize::MAX - 4); -const SYSTEM: Token = Token(usize::MAX - 5); -const CLI: Token = Token(usize::MAX - 6); +const SYSTEM: Token = Token(usize::MAX - 3); +const CLI: Token = Token(usize::MAX - 4); /// A mutation of server state. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize, Debug)] -pub struct Command { +struct Command { /// The worker that received this command from a client originally /// and is therefore the one that should receive all outputs. pub owner: usize, @@ -67,6 +65,20 @@ pub struct Command { pub requests: Vec, } +enum Output { + /// A batch of (tuple, time, diff) triples as returned by Datalog + /// queries. + QueryDiff(String, Vec>), + /// An output diff on a multi-tenant query. + TenantDiff(String, Token, Vec>), + // /// A hash-map as returned by GraphQL queries. + // Map(serde_json::map::Map, T, isize), + /// A message forwarded to a specific client. + Message(Token, serde_json::Value), + /// An error forwarded to a specific client. + Error(Token, Error, TxId), +} + fn main() { env_logger::init(); @@ -143,13 +155,7 @@ fn main() { let (send_cli, recv_cli) = mio_extras::channel::channel(); // setup results channel - let (send_results, recv_results) = mio_extras::channel::channel::<(String, Vec>)>(); - - // setup tenant results channel - let (send_tenant_results, recv_tenant_results) = mio_extras::channel::channel::<(String, Token, Vec>)>(); - - // setup errors channel - let (send_errors, recv_errors) = mio_extras::channel::channel::<(Token, Error, TxId)>(); + let (send_results, recv_results) = mio_extras::channel::channel::(); // setup server socket // let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.port); @@ -189,20 +195,6 @@ fn main() { PollOpt::edge() | PollOpt::oneshot(), ).unwrap(); - poll.register( - &recv_tenant_results, - TENANT_RESULTS, - Ready::readable(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); - - poll.register( - &recv_errors, - ERRORS, - Ready::readable(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); - poll.register(&server_socket, SERVER, Ready::readable(), PollOpt::level()) .unwrap(); @@ -337,130 +329,84 @@ fn main() { } } RESULTS => { - while let Ok((query_name, results)) = recv_results.try_recv() { - info!("[WORKER {}] {} {} results", worker.index(), query_name, results.len()); + while let Ok(out) = recv_results.try_recv() { + let (tokens, serialized): (Box>, _) = match out { + Output::QueryDiff(name, results) => { + info!("[WORKER {}] {} {} results", worker.index(), name, results.len()); + + match server.interests.get(&name) { + None => { + warn!("result on query {} w/o interested clients", name); + (Box::new(std::iter::empty()), None) + } + Some(tokens) => { + let serialized = + serde_json::to_string::<(String, Vec>)>(&(name, results)) + .expect("failed to serialize outputs"); - match server.interests.get(&query_name) { - None => { - warn!("result on query {} w/o interested clients", query_name); - } - Some(tokens) => { - let serialized = serde_json::to_string::<(String, Vec>)>( - &(query_name, results) - ).expect("failed to serialize outputs"); - let msg = ws::Message::text(serialized); - - for token in tokens { - match connections.get_mut((*token).into()) { - None => { - warn!("client {:?} has gone away undetected, notifying", token); - sequencer.push(Command { - owner: worker.index(), - client: (*token).into(), - requests: vec![Request::Disconnect], - }); - } - Some(conn) => { - conn.send_message(msg.clone()) - .expect("failed to send message"); - - poll.reregister( - conn.socket(), - conn.token(), - conn.events(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); - } + (Box::new(tokens.iter().cloned()), Some(serialized)) } } } - } - } + Output::TenantDiff(name, token, results) => { + info!("[WORKER {}] {} results for tenant {:?} on query {}", worker.index(), results.len(), token, name); - poll.reregister( - &recv_results, - RESULTS, - Ready::readable(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); - } - TENANT_RESULTS => { - while let Ok((query_name, token, results)) = recv_tenant_results.try_recv() { - info!("[WORKER {}] {} results for tenant {:?} on query {}", worker.index(), results.len(), token, query_name); + let serialized = + serde_json::to_string::<(String, Vec>)>(&(name, results)) + .expect("failed to serialize outputs"); - let serialized = serde_json::to_string::<(String, Vec>)>(&(query_name, results)).expect("failed to serialize outputs"); - let msg = ws::Message::text(serialized); - - match connections.get_mut(token.into()) { - None => { - warn!("sent results to tenant who has gone away undetected, notifying"); - sequencer.push(Command { - owner: worker.index(), - client: token.into(), - requests: vec![Request::Disconnect], - }); + (Box::new(std::iter::once(token)), Some(serialized)) } - Some(conn) => { - conn.send_message(msg.clone()) - .expect("failed to send message"); + Output::Message(token, msg) => { + info!("[WORKER {}] {:?}", worker.index(), msg); - poll.reregister( - conn.socket(), - conn.token(), - conn.events(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); + (Box::new(std::iter::once(token)), Some(msg.to_string())) } - } - } + Output::Error(token, error, tx_id) => { + error!("[WORKER {}] {:?}", worker.index(), error); - poll.reregister( - &recv_tenant_results, - TENANT_RESULTS, - Ready::readable(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); - } - ERRORS => { - while let Ok((token, error, tx_id)) = recv_errors.try_recv() { - error!("[WORKER {}] {:?}", worker.index(), error); - - let mut serializable = serde_json::Map::new(); - serializable.insert("df.error/category".to_string(), serde_json::Value::String(error.category.to_string())); - serializable.insert("df.error/message".to_string(), serde_json::Value::String(error.message.to_string())); + let mut serializable = serde_json::Map::new(); + serializable.insert("df.error/category".to_string(), serde_json::Value::String(error.category.to_string())); + serializable.insert("df.error/message".to_string(), serde_json::Value::String(error.message.to_string())); - let serialized = serde_json::to_string::<(&'static str, Vec<(serde_json::Map<_,_>, TxId)>)>( - &("df.error", vec![(serializable, tx_id)]) - ).expect("failed to serialize errors"); + let serialized = serde_json::to_string::<(&'static str, Vec<(serde_json::Map<_,_>, TxId)>)>( + &("df.error", vec![(serializable, tx_id)]) + ).expect("failed to serialize errors"); - let msg = ws::Message::text(serialized); - - match connections.get_mut(token.into()) { - None => { - warn!("sent error to client who has gone away undetected, notifying"); - sequencer.push(Command { - owner: worker.index(), - client: token.into(), - requests: vec![Request::Disconnect], - }); + (Box::new(std::iter::once(token)), Some(serialized)) } - Some(conn) => { - conn.send_message(msg.clone()) - .expect("failed to send message"); - - poll.reregister( - conn.socket(), - conn.token(), - conn.events(), - PollOpt::edge() | PollOpt::oneshot(), - ).unwrap(); + }; + + let msg = ws::Message::text(serialized.expect("nothing to send")); + + for token in tokens { + match connections.get_mut(token.into()) { + None => { + warn!("client {:?} has gone away undetected, notifying", token); + sequencer.push(Command { + owner: worker.index(), + client: token.into(), + requests: vec![Request::Disconnect], + }); + } + Some(conn) => { + conn.send_message(msg.clone()) + .expect("failed to send message"); + + poll.reregister( + conn.socket(), + conn.token(), + conn.events(), + PollOpt::edge() | PollOpt::oneshot(), + ).unwrap(); + } } } } poll.reregister( - &recv_errors, - ERRORS, + &recv_results, + RESULTS, Ready::readable(), PollOpt::edge() | PollOpt::oneshot(), ).unwrap(); @@ -497,7 +443,7 @@ fn main() { message: serde_error.to_string(), }; - send_errors.send((token, error, next_tx - 1)).unwrap(); + send_results.send(Output::Error(token, error, next_tx - 1)).unwrap(); } Ok(requests) => { sequencer.push( @@ -518,7 +464,7 @@ fn main() { message: rmp_error.to_string(), }; - send_errors.send((token, error, next_tx - 1)).unwrap(); + send_results.send(Output::Error(token, error, next_tx - 1)).unwrap(); } Ok(requests) => { sequencer.push( @@ -607,7 +553,7 @@ fn main() { match req { Request::Transact(req) => { if let Err(error) = server.transact(req, owner, worker.index()) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } } Request::Interest(req) => { @@ -631,7 +577,6 @@ fn main() { if was_first { let send_results_handle = send_results.clone(); - let send_tenant_results_handle = send_tenant_results.clone(); let disable_logging = req.disable_logging.unwrap_or(false); let mut timely_logger = None; @@ -648,7 +593,7 @@ fn main() { match server.interest(&req.name, scope) { Err(error) => { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } Ok(relation) => { let delayed = match req.granularity { @@ -694,8 +639,8 @@ fn main() { }); for (tenant, batch) in &per_tenant { - send_tenant_results_handle - .send((name.clone(), Token(tenant), batch.collect())) + send_results_handle + .send(Output::TenantDiff(name.clone(), Token(tenant), batch.collect())) .unwrap(); } }); @@ -722,7 +667,7 @@ fn main() { input.for_each(|_time, data| { send_results_handle - .send((name.clone(), data.to_vec())) + .send(Output::QueryDiff(name.clone(), data.to_vec())) .unwrap(); }); } @@ -758,35 +703,43 @@ fn main() { Request::Uninterest(name) => server.uninterest(Token(command.client), &name), Request::Register(req) => { if let Err(error) = server.register(req) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } } Request::RegisterSource(source) => { worker.dataflow::(|scope| { if let Err(error) = server.register_source(Box::new(source), scope) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } }); } Request::CreateAttribute(CreateAttribute { name, config }) => { worker.dataflow::(|scope| { if let Err(error) = server.context.internal.create_transactable_attribute(&name, config, scope) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } }); } Request::AdvanceDomain(name, next) => { if let Err(error) = server.advance_domain(name, next.into()) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } } Request::CloseInput(name) => { if let Err(error) = server.context.internal.close_input(name) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } } Request::Disconnect => server.disconnect_client(Token(command.client)), Request::Setup => unimplemented!(), + Request::Status => { + let status = serde_json::json!({ + "category": "df/status", + "message": "running", + }); + + send_results.send(Output::Message(Token(client), status)).unwrap(); + } Request::Shutdown => { shutdown = true } @@ -801,7 +754,7 @@ fn main() { let next = Instant::now().duration_since(worker.timer()); if let Err(error) = server.advance_domain(None, next) { - send_errors.send((Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); } } } From bb7772c6386f8e2806e69fee337959bc4d957886 Mon Sep 17 00:00:00 2001 From: niko Date: Fri, 26 Apr 2019 01:36:57 +0200 Subject: [PATCH 02/13] Add Request::Status --- src/server/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/mod.rs b/src/server/mod.rs index b61f918..66a6834 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -129,6 +129,8 @@ pub enum Request { /// Requests any setup logic that needs to be executed /// deterministically across all workers. Setup, + /// Requests a heartbeat containing status information. + Status, /// Requests orderly shutdown of the system. Shutdown, } From 521a11a1fafa3eeb48cd84ea71e84dfee5c70c68 Mon Sep 17 00:00:00 2001 From: niko Date: Fri, 26 Apr 2019 01:37:32 +0200 Subject: [PATCH 03/13] [WIP] CLI GraphQL command --- cli/Cargo.toml | 3 +- cli/src/cli.yml | 25 +++++++++++++++ cli/src/main.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 1d497bd..2210c76 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Nikolas Göbel "] edition = "2018" [dependencies] -declarative-dataflow = { path = "../" } +declarative-dataflow = { path = "../", features = ["graphql"] } serde = "1" serde_json = "1" rmp-serde = "0.13.7" @@ -14,6 +14,7 @@ log = "0.4" env_logger = "0.5.6" clap = { version = "~2.33.0", features = ["yaml"] } ws = "*" +uuid = { version = "0.7", features = ["serde", "v4"] } [profile.release] opt-level = 3 diff --git a/cli/src/cli.yml b/cli/src/cli.yml index 51c0dbf..c639d4a 100644 --- a/cli/src/cli.yml +++ b/cli/src/cli.yml @@ -2,6 +2,31 @@ name: 3dfctl version: "0.1.0" author: Nikolas Göbel about: Consumer / producer CLI to 3DF. +args: + - host: + long: host + value_name: HOST + help: hostname of a peer + takes_value: true + - port: + long: port + value_name: PORT + help: port at which 3DF is listening + takes_value: true subcommands: - ping: about: attempts to retrieve a heartbeat from the cluster + - tx: + about: pushes transaction data to the cluster + args: + - TXDATA: + help: transaction data + required: false + index: 1 + - gql: + about: subscribes to a GraphQL query + args: + - QUERY: + help: a GraphQL query + required: false + index: 1 diff --git a/cli/src/main.rs b/cli/src/main.rs index c3a03d9..e8c0940 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -6,11 +6,15 @@ extern crate log; #[macro_use] extern crate clap; -use clap::App; +use std::io::Read; +use clap::App; +use uuid::Uuid; use ws::{connect, CloseCode}; -use declarative_dataflow::server::Request; +use declarative_dataflow::plan::{GraphQl, Plan}; +use declarative_dataflow::server::{Register, Request}; +use declarative_dataflow::{Rule, TxData}; fn main() { env_logger::init(); @@ -18,15 +22,85 @@ fn main() { let cli_config = load_yaml!("cli.yml"); let matches = App::from_yaml(cli_config).get_matches(); + let host = matches.value_of("host").unwrap_or("127.0.0.1"); + let port = matches.value_of("port").unwrap_or("6262"); + let addr = format!("ws://{}:{}", host, port); + if let Some(matches) = matches.subcommand_matches("ping") { - connect("ws://127.0.0.1:6262", |out| { + connect(addr.clone(), |out| { let req = serde_json::to_string::>(&vec![Request::Status]) .expect("failed to serialize request"); out.send(req).unwrap(); move |msg| { - println!("Got message: {}", msg); + info!("{:?}", msg); + out.close(CloseCode::Normal) + } + }) + .expect("failed to connect"); + } + + if let Some(matches) = matches.subcommand_matches("tx") { + connect(addr.clone(), |out| { + let tx_data: Vec = match matches.value_of("TXDATA") { + None => { + let mut buf = String::new(); + std::io::stdin() + .read_to_string(&mut buf) + .expect("failed to read from stdin"); + + serde_json::from_str(&buf).expect("failed to parse tx data") + } + Some(tx_in) => serde_json::from_str(tx_in).expect("failed to parse tx data"), + }; + + let req = serde_json::to_string::>(&vec![Request::Transact(tx_data)]) + .expect("failed to serialize request"); + + debug!("{:?}", req); + + out.send(req).unwrap(); + + move |msg| { + info!("{:?}", msg); + out.close(CloseCode::Normal) + } + }) + .expect("failed to connect"); + } + + if let Some(matches) = matches.subcommand_matches("gql") { + connect(addr.clone(), |out| { + let query: String = match matches.value_of("QUERY") { + None => { + let mut buf = String::new(); + std::io::stdin() + .read_to_string(&mut buf) + .expect("failed to read from stdin"); + + buf + } + Some(query) => query.to_string(), + }; + + let name = Uuid::new_v4(); + + let req = serde_json::to_string::>(&vec![Request::Register(Register { + rules: vec![Rule { + name: name.to_string(), + plan: Plan::GraphQl(GraphQl::new(query)), + }], + publish: vec![name.to_string()], + })]) + .expect("failed to serialize request"); + + debug!("{:?}", req); + + out.send(req).unwrap(); + + move |msg| { + info!("{:?}", msg); out.close(CloseCode::Normal) } }) From e8f486be7fcef429559a40a11e07b1b089843a18 Mon Sep 17 00:00:00 2001 From: niko Date: Fri, 26 Apr 2019 01:38:02 +0200 Subject: [PATCH 04/13] [WIP] Add AssocIn sink for GraphQL --- server/Cargo.toml | 1 + src/plan/graphql.rs | 94 ----------------------------- src/sinks/assoc_in.rs | 134 ++++++++++++++++++++++++++++++++++++++++++ src/sinks/mod.rs | 6 +- 4 files changed, 140 insertions(+), 95 deletions(-) create mode 100644 src/sinks/assoc_in.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index dcf0b3e..8e8c1a7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,6 +30,7 @@ blocking = [] real-time = ["declarative-dataflow/real-time"] csv-source = ["declarative-dataflow/csv-source"] json-source = ["declarative-dataflow/json-source"] +graphql = ["declarative-dataflow/graphql"] [profile.release] opt-level = 3 diff --git a/src/plan/graphql.rs b/src/plan/graphql.rs index 0e85589..f7f63b7 100644 --- a/src/plan/graphql.rs +++ b/src/plan/graphql.rs @@ -136,73 +136,6 @@ fn ast_to_paths(ast: Document) -> Vec> { result } -/// Outbound direction: Transform the provided query result paths into a -/// GraphQL-like / JSONy nested value to be provided to the user. -pub fn paths_to_nested(paths: Vec>) -> serde_json::Value { - use crate::Value::{Aid, Eid}; - use serde_json::map::Map; - - let mut acc = Map::new(); - - // `paths` has the structure `[[aid/eid val aid/eid val last-aid last-val], ...]` - // Starting from the result map `acc`, for each path, we want to navigate down the - // map, optionally creating intermediate map levels if they don't exist yet, and finally - // insert `last-val` at the lowest level. - // - // In short: We're rebuilding Clojure's `assoc-in` here: `(assoc-in acc (pop path) (peek path))` - for mut path in paths { - let mut current_map = &mut acc; - let last_val = path.pop().unwrap(); - - if let Aid(last_key) = path.pop().unwrap() { - - // If there are already values for the looked at attribute, we obtain - // a reference to it. Otherwise, we create a new `Map` and obtain a - // reference to it, which will be used in the next iteration. - // We repeat this process of traversing down the map and optionally creating - // new map levels until we've run out of intermediate attributes. - for attribute in path { - // keys have to be `String`s and are either `Aid`s (such as "age") - // or `Eid`s (linking to a lower `PullPath`) - let attr = match attribute { - Aid(x) => x, - Eid(x) => x.to_string(), - _ => unreachable!(), - }; - - let entry = current_map - .entry(attr) - .or_insert_with(|| serde_json::Value::Object(Map::new())); - - *entry = match entry { - serde_json::Value::Object(m) => { - serde_json::Value::Object(std::mem::replace(m, Map::new())) - } - serde_json::Value::Array(_) => unreachable!(), - _ => serde_json::Value::Object(Map::new()), - }; - - match entry { - serde_json::Value::Object(m) => current_map = m, - _ => unreachable!(), - }; - } - - // At the lowest level, we finally insert the path's value - match current_map.get(&last_key) { - Some(serde_json::Value::Object(_)) => (), - _ => { - current_map.insert(last_key, serde_json::json!(last_val)); - } - }; - } else { - unreachable!(); - } - } - - serde_json::Value::Object(acc) -} - impl Implementable for GraphQl { fn dependencies(&self) -> Dependencies { let mut dependencies = Dependencies::none(); @@ -234,30 +167,3 @@ impl Implementable for GraphQl { parsed.implement(nested, local_arrangements, context) } } - -// relation -// .inner -// .map(|x| ((), x)) -// .inspect(|x| { println!("{:?}", x); }) -// .aggregate::<_,Vec<_>,_,_,_>( -// |_key, (path, _time, _diff), acc| { acc.push(path); }, -// |_key, paths| { -// paths_to_nested(paths) -// // squash_nested(nested) -// }, -// |_key| 1) - -// /// Register a GraphQL query -// pub fn register_graph_ql(&mut self, query: String, name: &str) { -// use crate::plan::{GraphQl, Plan}; - -// let req = Register { -// rules: vec![Rule { -// name: name.to_string(), -// plan: Plan::GraphQl(GraphQl::new(query)), -// }], -// publish: vec![name.to_string()], -// }; - -// self.register(req).unwrap(); -// } diff --git a/src/sinks/assoc_in.rs b/src/sinks/assoc_in.rs new file mode 100644 index 0000000..1d26977 --- /dev/null +++ b/src/sinks/assoc_in.rs @@ -0,0 +1,134 @@ +//! Operator and utilities to write output diffs into nested maps. + +use std::collections::HashMap; + +use timely::dataflow::channels::pact::ParallelizationContract; +use timely::dataflow::operators::generic::Operator; +use timely::dataflow::{Scope, Stream}; +use timely::progress::Timestamp; + +use differential_dataflow::lattice::Lattice; + +use crate::{Error, ResultDiff}; + +use super::Sinkable; + +/// A nested hash-map sink. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct AssocIn { } + +impl Sinkable for AssocIn +where + T: Timestamp + Lattice, +{ + fn sink( + &self, + stream: &Stream>, + pact: P, + ) -> Result>, Error> + where + S: Scope, + P: ParallelizationContract>, + { + let mut paths = HashMap::new(); + let mut vector = Vec::new(); + + let sunk = stream + .unary_notify(pact, "AssocIn", vec![], move |input, output, notificator| { + input.for_each(|cap, data| { + data.swap(&mut vector); + + let mut paths_at_time = paths + .entry(cap.time().clone()) + .or_insert_with(Vec::new); + + paths_at_time.extend(vector.drain(..).map(|(x,_t,diff)| (x, diff))); + + notificator.notify_at(cap.retain()); + }); + + // pop completed views + notificator.for_each(|cap,_,_| { + if let Some(paths_at_time) = paths.remove(cap.time()) { + output + .session(&cap) + .give(paths_to_nested(paths_at_time)); + } + }); + }); + + Ok(sunk) + } +} + +/// Outbound direction: Transform the provided query result paths into +/// a GraphQL-like / JSONy nested value to be provided to the user. +fn paths_to_nested(paths: Vec<(Vec, isize)>) -> serde_json::Value { + + use serde_json::map::Map; + use serde_json::Value::Object; + + use crate::Value::{Aid, Eid}; + + let mut acc = Map::new(); + + // `paths` has the structure `[[aid/eid val aid/eid val last-aid + // last-val], ...]` Starting from the result map `acc`, for each + // path, we want to navigate down the map, optionally creating + // intermediate map levels if they don't exist yet, and finally + // insert `last-val` at the lowest level. + // + // In short: We're rebuilding Clojure's `assoc-in` here: + // `(assoc-in acc (pop path) (peek path))` + for (mut path, diff) in paths { + + // @TODO handle retractions + + let mut current_map = &mut acc; + let last_val = path.pop().unwrap(); + + if let Aid(last_key) = path.pop().unwrap() { + // If there are already values for the looked at attribute, we obtain + // a reference to it. Otherwise, we create a new `Map` and obtain a + // reference to it, which will be used in the next iteration. + // We repeat this process of traversing down the map and optionally creating + // new map levels until we've run out of intermediate attributes. + for attribute in path { + // Keys have to be `String`s and are either `Aid`s (such as "age") + // or `Eid`s (linking to a nested `PullPath`). + let k = match attribute { + Aid(x) => x, + Eid(x) => x.to_string(), + _ => unreachable!(), + }; + + let entry = current_map + .entry(k) + .or_insert_with(|| Object(Map::new())); + + *entry = match entry { + Object(m) => Object(std::mem::replace(m, Map::new())), + serde_json::Value::Array(_) => unreachable!(), + _ => Object(Map::new()), + }; + + match entry { + Object(m) => current_map = m, + _ => unreachable!(), + }; + } + + // At the lowest level, we finally insert the path's value + match current_map.get(&last_key) { + Some(Object(_)) => (), + _ => { + current_map.insert(last_key, serde_json::json!(last_val)); + } + }; + } else { + unreachable!(); + } + } + + serde_json::Value::Object(acc) +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index dbfe74b..04b7110 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -16,10 +16,14 @@ use crate::{Error, ResultDiff}; #[cfg(feature = "csv-source")] pub mod csv_file; - #[cfg(feature = "csv-source")] pub use self::csv_file::CsvFile; +// #[cfg(feature = "graphql")] +// pub mod assoc_in; +// #[cfg(feature = "graphql")] +// pub use self::assoc_in::AssocIn; + /// An external system that wants to receive result diffs. pub trait Sinkable where From b5bb56182d20373c1f31ddb6e1e7baba9105e2f4 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 01:25:13 +0200 Subject: [PATCH 05/13] Add CLI req command --- cli/src/cli.yml | 7 ++++++ cli/src/main.rs | 59 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/cli/src/cli.yml b/cli/src/cli.yml index c639d4a..70afc18 100644 --- a/cli/src/cli.yml +++ b/cli/src/cli.yml @@ -16,6 +16,13 @@ args: subcommands: - ping: about: attempts to retrieve a heartbeat from the cluster + - req: + about: pushes arbitrary requests to the cluster + args: + - REQUEST: + help: request description in json + required: false + index: 1 - tx: about: pushes transaction data to the cluster args: diff --git a/cli/src/main.rs b/cli/src/main.rs index e8c0940..01c6c77 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -13,7 +13,8 @@ use uuid::Uuid; use ws::{connect, CloseCode}; use declarative_dataflow::plan::{GraphQl, Plan}; -use declarative_dataflow::server::{Register, Request}; +use declarative_dataflow::server::{Interest, Register, Request}; +use declarative_dataflow::sinks::{AssocIn, Sink}; use declarative_dataflow::{Rule, TxData}; fn main() { @@ -29,7 +30,36 @@ fn main() { if let Some(matches) = matches.subcommand_matches("ping") { connect(addr.clone(), |out| { let req = serde_json::to_string::>(&vec![Request::Status]) - .expect("failed to serialize request"); + .expect("failed to serialize requests"); + + out.send(req).unwrap(); + + move |msg| { + info!("{:?}", msg); + out.close(CloseCode::Normal) + } + }) + .expect("failed to connect"); + } + + if let Some(matches) = matches.subcommand_matches("req") { + connect(addr.clone(), |out| { + let reqs: Vec = match matches.value_of("REQUEST") { + None => { + let mut buf = String::new(); + std::io::stdin() + .read_to_string(&mut buf) + .expect("failed to read from stdin"); + + serde_json::from_str(&buf).expect("failed to parse requests") + } + Some(arg) => serde_json::from_str(arg).expect("failed to parse requests"), + }; + + let req = + serde_json::to_string::>(&reqs).expect("failed to serialize requests"); + + debug!("{:?}", req); out.send(req).unwrap(); @@ -56,7 +86,7 @@ fn main() { }; let req = serde_json::to_string::>(&vec![Request::Transact(tx_data)]) - .expect("failed to serialize request"); + .expect("failed to serialize requests"); debug!("{:?}", req); @@ -86,14 +116,23 @@ fn main() { let name = Uuid::new_v4(); - let req = serde_json::to_string::>(&vec![Request::Register(Register { - rules: vec![Rule { + let req = serde_json::to_string::>(&vec![ + Request::Register(Register { + rules: vec![Rule { + name: name.to_string(), + plan: Plan::GraphQl(GraphQl::new(query)), + }], + publish: vec![name.to_string()], + }), + Request::Interest(Interest { name: name.to_string(), - plan: Plan::GraphQl(GraphQl::new(query)), - }], - publish: vec![name.to_string()], - })]) - .expect("failed to serialize request"); + tenant: None, + granularity: None, + sink: Some(Sink::AssocIn(AssocIn {})), + disable_logging: None, + }), + ]) + .expect("failed to serialize requests"); debug!("{:?}", req); From db53e06e3fd832a76bda70efc6e9cb7ee49780b0 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 01:26:03 +0200 Subject: [PATCH 06/13] Don't return stream from sinks --- server/src/main.rs | 24 ++++++----- src/sinks/assoc_in.rs | 21 +++++----- src/sinks/mod.rs | 93 +++++++++++++++++++++++-------------------- 3 files changed, 75 insertions(+), 63 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 12f5efa..47e0480 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -621,8 +621,11 @@ fn main() { tenant_owner.borrow().get(&Token(tenant as usize)).unwrap().clone() }); - let sunk = match req.sink { - Some(sink) => sink.sink(&delayed.inner, pact).expect("sinking failed"), + match req.sink { + Some(sink) => { + sink.sink(&delayed.inner, pact, &mut server.probe) + .expect("sinking failed"); + } None => { delayed .inner @@ -646,15 +649,17 @@ fn main() { }); } }) + .probe_with(&mut server.probe); } - }; - - sunk.probe_with(&mut server.probe); + } } else { let pact = Exchange::new(move |_| owner as u64); - let sunk = match req.sink { - Some(sink) => sink.sink(&delayed.inner, pact).expect("sinking failed"), + match req.sink { + Some(sink) => { + sink.sink(&delayed.inner, pact, &mut server.probe) + .expect("sinking failed"); + } None => { delayed .inner @@ -672,10 +677,9 @@ fn main() { }); } }) + .probe_with(&mut server.probe); } - }; - - sunk.probe_with(&mut server.probe); + } } } } diff --git a/src/sinks/assoc_in.rs b/src/sinks/assoc_in.rs index 1d26977..9665a45 100644 --- a/src/sinks/assoc_in.rs +++ b/src/sinks/assoc_in.rs @@ -4,7 +4,8 @@ use std::collections::HashMap; use timely::dataflow::channels::pact::ParallelizationContract; use timely::dataflow::operators::generic::Operator; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::operators::{Probe, Inspect}; +use timely::dataflow::{Scope, Stream, ProbeHandle}; use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; @@ -25,7 +26,8 @@ where &self, stream: &Stream>, pact: P, - ) -> Result>, Error> + probe: &mut ProbeHandle, + ) -> Result<(), Error> where S: Scope, P: ParallelizationContract>, @@ -33,16 +35,15 @@ where let mut paths = HashMap::new(); let mut vector = Vec::new(); - let sunk = stream + stream .unary_notify(pact, "AssocIn", vec![], move |input, output, notificator| { input.for_each(|cap, data| { data.swap(&mut vector); - let mut paths_at_time = paths + paths .entry(cap.time().clone()) - .or_insert_with(Vec::new); - - paths_at_time.extend(vector.drain(..).map(|(x,_t,diff)| (x, diff))); + .or_insert_with(Vec::new) + .extend(vector.drain(..).map(|(x,_t,diff)| (x, diff))); notificator.notify_at(cap.retain()); }); @@ -55,9 +56,11 @@ where .give(paths_to_nested(paths_at_time)); } }); - }); + }) + .inspect(|x| { println!("{:?}", x); }) + .probe_with(probe); - Ok(sunk) + Ok(()) } } diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 04b7110..c6a96bd 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -5,9 +5,9 @@ use std::io::{LineWriter, Write}; use std::time::{Duration, Instant}; use timely::dataflow::channels::pact::ParallelizationContract; -use timely::dataflow::operators::generic::Operator; -use timely::dataflow::operators::generic::OutputHandle; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::operators::generic::{Operator, OutputHandle}; +use timely::dataflow::operators::probe::Probe; +use timely::dataflow::{Scope, Stream, ProbeHandle}; use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; @@ -19,23 +19,24 @@ pub mod csv_file; #[cfg(feature = "csv-source")] pub use self::csv_file::CsvFile; -// #[cfg(feature = "graphql")] -// pub mod assoc_in; -// #[cfg(feature = "graphql")] -// pub use self::assoc_in::AssocIn; +#[cfg(feature = "graphql")] +pub mod assoc_in; +#[cfg(feature = "graphql")] +pub use self::assoc_in::AssocIn; /// An external system that wants to receive result diffs. pub trait Sinkable where T: Timestamp + Lattice, { - /// Creates a timely operator reading from the source andn - /// producing inputs. + /// Creates a timely operator feeding dataflow outputs to a + /// specialized data sink. fn sink( &self, stream: &Stream>, pact: P, - ) -> Result>, Error> + probe: &mut ProbeHandle, + ) -> Result<(), Error> where S: Scope, P: ParallelizationContract>; @@ -49,6 +50,9 @@ pub enum Sink { /// CSV files #[cfg(feature = "csv-source")] CsvFile(CsvFile), + /// Nested Hash-Maps + #[cfg(feature = "graphql")] + AssocIn(AssocIn), } impl Sinkable for Sink { @@ -56,16 +60,13 @@ impl Sinkable for Sink { &self, stream: &Stream>, pact: P, - ) -> Result>, Error> + probe: &mut ProbeHandle, + ) -> Result<(), Error> where S: Scope, P: ParallelizationContract>, { - match *self { - #[cfg(feature = "csv-source")] - Sink::CsvFile(ref sink) => sink.sink(stream, pact), - _ => unimplemented!(), - } + unimplemented!(); } } @@ -74,7 +75,8 @@ impl Sinkable for Sink { &self, stream: &Stream>, pact: P, - ) -> Result>, Error> + probe: &mut ProbeHandle, + ) -> Result<(), Error> where S: Scope, P: ParallelizationContract>, @@ -93,37 +95,40 @@ impl Sinkable for Sink { let mut last = Duration::from_millis(0); let mut buffer = Vec::new(); - let sunk = stream.unary_frontier(pact, "TheVoid", move |_cap, _info| { - move |input, _output: &mut OutputHandle<_, ResultDiff, _>| { - let mut received_input = false; - input.for_each(|_time, data| { - data.swap(&mut buffer); - received_input = !buffer.is_empty(); - buffer.clear(); - }); - - if input.frontier.is_empty() { - println!("[{:?}] inputs to void sink ceased", t0.elapsed()); - - if let Some(ref mut writer) = &mut writer { - writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) - .unwrap(); - } - } else if received_input && !input.frontier.frontier().less_equal(&last) { - if let Some(ref mut writer) = &mut writer { - writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) - .unwrap(); + stream + .unary_frontier(pact, "TheVoid", move |_cap, _info| { + move |input, _output: &mut OutputHandle<_, ResultDiff, _>| { + let mut received_input = false; + input.for_each(|_time, data| { + data.swap(&mut buffer); + received_input = !buffer.is_empty(); + buffer.clear(); + }); + + if input.frontier.is_empty() { + println!("[{:?}] inputs to void sink ceased", t0.elapsed()); + + if let Some(ref mut writer) = &mut writer { + writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) + .unwrap(); + } + } else if received_input && !input.frontier.frontier().less_equal(&last) { + if let Some(ref mut writer) = &mut writer { + writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) + .unwrap(); + } + + last = input.frontier.frontier()[0].clone(); + t0 = Instant::now(); } - - last = input.frontier.frontier()[0].clone(); - t0 = Instant::now(); } - } - }); - - Ok(sunk) + }) + .probe_with(probe); } + Sink::AssocIn(ref sink) => { sink.sink(stream, pact, probe)?; } _ => unimplemented!(), } + + Ok(()) } } From 67a8b1ca008d6240e1f8f8edbcfceb939447c0a8 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 02:37:32 +0200 Subject: [PATCH 07/13] Sinks can optionally return a stream of Output to be forwarded to clients --- cli/src/main.rs | 2 +- server/src/main.rs | 116 ++++++++++++++++--------------- src/domain/mod.rs | 29 +++----- src/lib.rs | 154 ++++++++++++++++++++++++++++-------------- src/server/mod.rs | 16 ++--- src/sinks/assoc_in.rs | 87 +++++++++++++----------- src/sinks/csv_file.rs | 5 +- src/sinks/mod.rs | 19 +++--- 8 files changed, 241 insertions(+), 187 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 01c6c77..57153cb 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -128,7 +128,7 @@ fn main() { name: name.to_string(), tenant: None, granularity: None, - sink: Some(Sink::AssocIn(AssocIn {})), + sink: Some(Sink::AssocIn(AssocIn { name: name.to_string() })), disable_logging: None, }), ]) diff --git a/server/src/main.rs b/server/src/main.rs index 47e0480..44121ac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,7 +16,7 @@ use getopts::Options; use itertools::Itertools; -use timely::dataflow::channels::pact::Exchange; +use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::generic::OutputHandle; use timely::dataflow::operators::{Operator, Probe}; use timely::logging::{Logger, TimelyEvent}; @@ -34,7 +34,7 @@ use ws::connection::{ConnEvent, Connection}; use declarative_dataflow::server::{Config, CreateAttribute, Request, Server, TxId}; use declarative_dataflow::sinks::Sinkable; -use declarative_dataflow::{Eid, Error, ResultDiff}; +use declarative_dataflow::{Eid, Error, Output, ResultDiff}; /// Server timestamp type. #[cfg(not(feature = "real-time"))] @@ -65,20 +65,6 @@ struct Command { pub requests: Vec, } -enum Output { - /// A batch of (tuple, time, diff) triples as returned by Datalog - /// queries. - QueryDiff(String, Vec>), - /// An output diff on a multi-tenant query. - TenantDiff(String, Token, Vec>), - // /// A hash-map as returned by GraphQL queries. - // Map(serde_json::map::Map, T, isize), - /// A message forwarded to a specific client. - Message(Token, serde_json::Value), - /// An error forwarded to a specific client. - Error(Token, Error, TxId), -} - fn main() { env_logger::init(); @@ -155,7 +141,7 @@ fn main() { let (send_cli, recv_cli) = mio_extras::channel::channel(); // setup results channel - let (send_results, recv_results) = mio_extras::channel::channel::(); + let (send_results, recv_results) = mio_extras::channel::channel::>(); // setup server socket // let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.port); @@ -257,12 +243,7 @@ fn main() { while let Ok(cli_input) = recv_cli.try_recv() { match serde_json::from_str::>(&cli_input) { Err(serde_error) => { - let error = Error { - category: "df.error.category/incorrect", - message: serde_error.to_string(), - }; - - error!("{:?} @ {}", error, next_tx - 1); + error!("{:?} @ {}", Error::incorrect(serde_error), next_tx - 1); } Ok(requests) => { sequencer.push(Command { @@ -348,21 +329,38 @@ fn main() { } } } - Output::TenantDiff(name, token, results) => { - info!("[WORKER {}] {} results for tenant {:?} on query {}", worker.index(), results.len(), token, name); + Output::TenantDiff(name, client, results) => { + info!("[WORKER {}] {} results for tenant {:?} on query {}", worker.index(), results.len(), client, name); let serialized = serde_json::to_string::<(String, Vec>)>(&(name, results)) .expect("failed to serialize outputs"); - (Box::new(std::iter::once(token)), Some(serialized)) + (Box::new(std::iter::once(client.into())), Some(serialized)) } - Output::Message(token, msg) => { + Output::Json(name, value, t, diff) => { + info!("[WORKER {}] json on query {}", worker.index(), name); + + match server.interests.get(&name) { + None => { + warn!("result on query {} w/o interested clients", name); + (Box::new(std::iter::empty()), None) + } + Some(tokens) => { + let serialized = + serde_json::to_string::<(String, Vec<(serde_json::Value, T, isize)>)>(&(name, vec![(value, t, diff)])) + .expect("failed to serialize outputs"); + + (Box::new(tokens.iter().cloned()), Some(serialized)) + } + } + } + Output::Message(client, msg) => { info!("[WORKER {}] {:?}", worker.index(), msg); - (Box::new(std::iter::once(token)), Some(msg.to_string())) + (Box::new(std::iter::once(client.into())), Some(msg.to_string())) } - Output::Error(token, error, tx_id) => { + Output::Error(client, error, tx_id) => { error!("[WORKER {}] {:?}", worker.index(), error); let mut serializable = serde_json::Map::new(); @@ -373,7 +371,7 @@ fn main() { &("df.error", vec![(serializable, tx_id)]) ).expect("failed to serialize errors"); - (Box::new(std::iter::once(token)), Some(serialized)) + (Box::new(std::iter::once(client.into())), Some(serialized)) } }; @@ -438,12 +436,9 @@ fn main() { ws::Message::Text(string) => { match serde_json::from_str::>(&string) { Err(serde_error) => { - let error = Error { - category: "df.error.category/incorrect", - message: serde_error.to_string(), - }; - - send_results.send(Output::Error(token, error, next_tx - 1)).unwrap(); + send_results + .send(Output::Error(token.into(), Error::incorrect(serde_error), next_tx - 1)) + .unwrap(); } Ok(requests) => { sequencer.push( @@ -459,12 +454,9 @@ fn main() { ws::Message::Binary(bytes) => { match rmp_serde::decode::from_slice::>(&bytes) { Err(rmp_error) => { - let error = Error { - category: "df.error.category/incorrect", - message: rmp_error.to_string(), - }; - - send_results.send(Output::Error(token, error, next_tx - 1)).unwrap(); + send_results + .send(Output::Error(token.into(), Error::incorrect(rmp_error), next_tx - 1)) + .unwrap(); } Ok(requests) => { sequencer.push( @@ -553,7 +545,7 @@ fn main() { match req { Request::Transact(req) => { if let Err(error) = server.transact(req, owner, worker.index()) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } } Request::Interest(req) => { @@ -593,7 +585,7 @@ fn main() { match server.interest(&req.name, scope) { Err(error) => { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } Ok(relation) => { let delayed = match req.granularity { @@ -643,7 +635,7 @@ fn main() { for (tenant, batch) in &per_tenant { send_results_handle - .send(Output::TenantDiff(name.clone(), Token(tenant), batch.collect())) + .send(Output::TenantDiff(name.clone(), tenant, batch.collect())) .unwrap(); } }); @@ -657,8 +649,26 @@ fn main() { match req.sink { Some(sink) => { - sink.sink(&delayed.inner, pact, &mut server.probe) + let sunk = sink.sink(&delayed.inner, pact, &mut server.probe) .expect("sinking failed"); + + if let Some(sunk) = sunk { + let mut vector = Vec::new(); + sunk + .unary(Pipeline, "SinkResults", move |_cap, _info| { + move |input, _output: &mut OutputHandle<_, ResultDiff, _>| { + input.for_each(|_time, data| { + data.swap(&mut vector); + + for out in vector.drain(..) { + send_results_handle.send(out) + .unwrap(); + } + }); + } + }) + .probe_with(&mut server.probe); + } } None => { delayed @@ -707,31 +717,31 @@ fn main() { Request::Uninterest(name) => server.uninterest(Token(command.client), &name), Request::Register(req) => { if let Err(error) = server.register(req) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } } Request::RegisterSource(source) => { worker.dataflow::(|scope| { if let Err(error) = server.register_source(Box::new(source), scope) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } }); } Request::CreateAttribute(CreateAttribute { name, config }) => { worker.dataflow::(|scope| { if let Err(error) = server.context.internal.create_transactable_attribute(&name, config, scope) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } }); } Request::AdvanceDomain(name, next) => { if let Err(error) = server.advance_domain(name, next.into()) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } } Request::CloseInput(name) => { if let Err(error) = server.context.internal.close_input(name) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } } Request::Disconnect => server.disconnect_client(Token(command.client)), @@ -742,7 +752,7 @@ fn main() { "message": "running", }); - send_results.send(Output::Message(Token(client), status)).unwrap(); + send_results.send(Output::Message(client, status)).unwrap(); } Request::Shutdown => { shutdown = true @@ -758,7 +768,7 @@ fn main() { let next = Instant::now().duration_since(worker.timer()); if let Err(error) = server.advance_domain(None, next) { - send_results.send(Output::Error(Token(client), error, last_tx)).unwrap(); + send_results.send(Output::Error(client, error, last_tx)).unwrap(); } } } diff --git a/src/domain/mod.rs b/src/domain/mod.rs index d6152d2..10a1f3f 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -69,10 +69,10 @@ where pairs: &Stream, ) -> Result<(), Error> { if self.forward.contains_key(name) { - Err(Error { - category: "df.error.category/conflict", - message: format!("An attribute of name {} already exists.", name), - }) + Err(Error::conflict(format!( + "An attribute of name {} already exists.", + name + ))) } else { let tuples = match config.input_semantics { InputSemantics::Raw => pairs.as_collection(), @@ -156,10 +156,7 @@ where for TxData(op, e, a, v) in tx_data { match self.input_sessions.get_mut(&a) { None => { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Attribute {} does not exist.", a), - }); + return Err(Error::not_found(format!("Attribute {} does not exist.", a))); } Some(handle) => { handle.update((Value::Eid(e), v), op); @@ -173,10 +170,7 @@ where /// Closes and drops an existing input. pub fn close_input(&mut self, name: String) -> Result<(), Error> { match self.input_sessions.remove(&name) { - None => Err(Error { - category: "df.error.category/not-found", - message: format!("Input {} does not exist.", name), - }), + None => Err(Error::not_found(format!("Input {} does not exist.", name))), Some(handle) => { handle.close(); Ok(()) @@ -188,13 +182,10 @@ where pub fn advance_to(&mut self, next: T) -> Result<(), Error> { if !self.now_at.less_equal(&next) { // We can't rewind time. - Err(Error { - category: "df.error.category/conflict", - message: format!( - "Domain is at {:?}, you attempted to rewind to {:?}.", - &self.now_at, &next - ), - }) + Err(Error::conflict(format!( + "Domain is at {:?}, you attempted to rewind to {:?}.", + &self.now_at, &next + ))) } else if !self.now_at.eq(&next) { trace!( "Advancing domain to {:?} ({} attributes, {} handles)", diff --git a/src/lib.rs b/src/lib.rs index ada6f03..c7c5518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,14 +117,56 @@ impl std::convert::From for Eid { } /// A client-facing, non-exceptional error. -#[derive(Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Error { /// Error category. - pub category: &'static str, + pub category: String, /// Free-frorm description. pub message: String, } +impl Error { + /// Fix client bug. + pub fn incorrect(error: E) -> Error { + Error { + category: "df.error.category/incorrect".to_string(), + message: error.to_string(), + } + } + + /// Fix client noun. + pub fn not_found(error: E) -> Error { + Error { + category: "df.error.category/not-found".to_string(), + message: error.to_string(), + } + } + + /// Coordinate with worker. + pub fn conflict(error: E) -> Error { + Error { + category: "df.error.category/conflict".to_string(), + message: error.to_string(), + } + } + + /// Fix worker bug. + pub fn fault(error: E) -> Error { + Error { + category: "df.error.category/fault".to_string(), + message: error.to_string(), + } + } + + /// Fix client verb. + pub fn unsupported(error: E) -> Error { + Error { + category: "df.error.category/unsupported".to_string(), + message: error.to_string(), + } + } +} + /// Transaction data. Conceptually a pair (Datom, diff) but it's kept /// intentionally flat to be more directly compatible with Datomic. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] @@ -133,6 +175,26 @@ pub struct TxData(pub isize, pub Eid, pub Aid, pub Value); /// A (tuple, time, diff) triple, as sent back to clients. pub type ResultDiff = (Vec, T, isize); +/// A worker-local client connection identifier. +pub type Client = usize; + +/// Anything that can be returned to clients. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Output { + /// A batch of (tuple, time, diff) triples as returned by Datalog + /// queries. + QueryDiff(String, Vec>), + /// An output diff on a multi-tenant query. + TenantDiff(String, Client, Vec>), + /// A JSON object, e.g. as returned by GraphQL queries. + #[cfg(feature = "graphql")] + Json(String, serde_json::Value, T, isize), + /// A message forwarded to a specific client. + Message(Client, serde_json::Value), + /// An error forwarded to a specific client. + Error(Client, Error, server::TxId), +} + /// An entity, attribute, value triple. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct Datom(pub Eid, pub Aid, pub Value); @@ -974,10 +1036,7 @@ where for name in names { match context.rule(name) { None => { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Unknown rule {}.", name), - }); + return Err(Error::not_found(format!("Unknown rule {}.", name))); } Some(rule) => { seen.insert(name.to_string()); @@ -992,10 +1051,7 @@ where if !seen.contains(dep_name) { match context.rule(dep_name) { None => { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Unknown rule {}", dep_name), - }); + return Err(Error::not_found(format!("Unknown rule {}", dep_name))); } Some(rule) => { seen.insert(dep_name.to_string()); @@ -1008,10 +1064,10 @@ where // Ensure all required attributes exist. for aid in dependencies.attributes.iter() { if !context.has_attribute(aid) { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Rule depends on unknown attribute {}", aid), - }); + return Err(Error::not_found(format!( + "Rule depends on unknown attribute {}", + aid + ))); } } @@ -1047,19 +1103,19 @@ where // Step 0: Canonicalize, check uniqueness of bindings. if rules.is_empty() { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Couldn't find any rules for name {}.", name), - }); + return Err(Error::not_found(format!( + "Couldn't find any rules for name {}.", + name + ))); } rules.sort_by(|x, y| x.name.cmp(&y.name)); for index in 1..rules.len() - 1 { if rules[index].name == rules[index - 1].name { - return Err(Error { - category: "df.error.category/conflict", - message: format!("Duplicate rule definitions for rule {}", rules[index].name), - }); + return Err(Error::conflict(format!( + "Duplicate rule definitions for rule {}", + rules[index].name + ))); } } @@ -1078,10 +1134,10 @@ where if let Some(relation) = local_arrangements.get(name) { result_map.insert(name.to_string(), relation.leave()); } else { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Attempted to publish undefined name {}.", name), - }); + return Err(Error::not_found(format!( + "Attempted to publish undefined name {}.", + name + ))); } } @@ -1100,13 +1156,10 @@ where for (rule, execution) in rules.iter().zip(executions.drain(..)) { match local_arrangements.remove(&rule.name) { None => { - return Err(Error { - category: "df.error.category/not-found", - message: format!( - "Rule {} should be in local arrangements, but isn't.", - &rule.name - ), - }); + return Err(Error::not_found(format!( + "Rule {} should be in local arrangements, but isn't.", + &rule.name + ))); } Some(variable) => { let (tuples, shutdown) = execution.tuples(nested, context); @@ -1151,19 +1204,19 @@ where // Step 0: Canonicalize, check uniqueness of bindings. if rules.is_empty() { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Couldn't find any rules for name {}.", name), - }); + return Err(Error::not_found(format!( + "Couldn't find any rules for name {}.", + name + ))); } rules.sort_by(|x, y| x.name.cmp(&y.name)); for index in 1..rules.len() - 1 { if rules[index].name == rules[index - 1].name { - return Err(Error { - category: "df.error.category/conflict", - message: format!("Duplicate rule definitions for rule {}", rules[index].name), - }); + return Err(Error::conflict(format!( + "Duplicate rule definitions for rule {}", + rules[index].name + ))); } } @@ -1190,10 +1243,10 @@ where if let Some(relation) = local_arrangements.get(name) { result_map.insert(name.to_string(), relation.leave()); } else { - return Err(Error { - category: "df.error.category/not-found", - message: format!("Attempted to publish undefined name {}.", name), - }); + return Err(Error::not_found(format!( + "Attempted to publish undefined name {}.", + name + ))); } } @@ -1215,13 +1268,10 @@ where for (rule, execution) in rules.iter().zip(executions.drain(..)) { match local_arrangements.remove(&rule.name) { None => { - return Err(Error { - category: "df.error.category/not-found", - message: format!( - "Rule {} should be in local arrangements, but isn't.", - &rule.name - ), - }); + return Err(Error::not_found(format!( + "Rule {} should be in local arrangements, but isn't.", + &rule.name + ))); } Some(variable) => { let (tuples, shutdown) = execution.tuples(nested, context); diff --git a/src/server/mod.rs b/src/server/mod.rs index 66a6834..ef91a99 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -317,13 +317,10 @@ where // } match rel_map.remove(name) { - None => Err(Error { - category: "df.error.category/fault", - message: format!( - "Relation of interest ({}) wasn't actually implemented.", - name - ), - }), + None => Err(Error::fault(format!( + "Relation of interest ({}) wasn't actually implemented.", + name + ))), Some(relation) => { self.shutdown_handles .insert(name.to_string(), shutdown_handle); @@ -410,10 +407,7 @@ where pub fn advance_domain(&mut self, name: Option, next: T) -> Result<(), Error> { match name { None => self.context.internal.advance_to(next), - Some(_) => Err(Error { - category: "df.error.category/unsupported", - message: "Named domains are not yet supported.".to_string(), - }), + Some(_) => Err(Error::unsupported("Named domains are not yet supported.")), } } diff --git a/src/sinks/assoc_in.rs b/src/sinks/assoc_in.rs index 9665a45..e075866 100644 --- a/src/sinks/assoc_in.rs +++ b/src/sinks/assoc_in.rs @@ -4,21 +4,24 @@ use std::collections::HashMap; use timely::dataflow::channels::pact::ParallelizationContract; use timely::dataflow::operators::generic::Operator; -use timely::dataflow::operators::{Probe, Inspect}; -use timely::dataflow::{Scope, Stream, ProbeHandle}; +use timely::dataflow::operators::{Inspect, Probe}; +use timely::dataflow::{ProbeHandle, Scope, Stream}; use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; -use crate::{Error, ResultDiff}; +use crate::{Error, Output, ResultDiff}; use super::Sinkable; /// A nested hash-map sink. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct AssocIn { } +pub struct AssocIn { + /// Query name. + pub name: String, +} -impl Sinkable for AssocIn +impl Sinkable for AssocIn where T: Timestamp + Lattice, { @@ -27,7 +30,7 @@ where stream: &Stream>, pact: P, probe: &mut ProbeHandle, - ) -> Result<(), Error> + ) -> Result>>, Error> where S: Scope, P: ParallelizationContract>, @@ -35,39 +38,50 @@ where let mut paths = HashMap::new(); let mut vector = Vec::new(); - stream - .unary_notify(pact, "AssocIn", vec![], move |input, output, notificator| { - input.for_each(|cap, data| { - data.swap(&mut vector); - - paths - .entry(cap.time().clone()) - .or_insert_with(Vec::new) - .extend(vector.drain(..).map(|(x,_t,diff)| (x, diff))); - - notificator.notify_at(cap.retain()); - }); - - // pop completed views - notificator.for_each(|cap,_,_| { - if let Some(paths_at_time) = paths.remove(cap.time()) { - output - .session(&cap) - .give(paths_to_nested(paths_at_time)); - } - }); - }) - .inspect(|x| { println!("{:?}", x); }) - .probe_with(probe); - - Ok(()) + let name = self.name.to_string(); + + let sunk = stream + .unary_notify( + pact, + "AssocIn", + vec![], + move |input, output, notificator| { + input.for_each(|cap, data| { + data.swap(&mut vector); + + paths + .entry(cap.time().clone()) + .or_insert_with(Vec::new) + .extend(vector.drain(..).map(|(x, _t, diff)| (x, diff))); + + notificator.notify_at(cap.retain()); + }); + + // pop completed views + notificator.for_each(|cap, _, _| { + if let Some(paths_at_time) = paths.remove(cap.time()) { + let map = paths_to_nested(paths_at_time); + output.session(&cap).give(Output::Json( + name.clone(), + map, + cap.time().clone(), + 1, + )); + } + }); + }, + ) + .inspect(|x| { + println!("{:?}", x); + }); + + Ok(Some(sunk)) } } /// Outbound direction: Transform the provided query result paths into /// a GraphQL-like / JSONy nested value to be provided to the user. fn paths_to_nested(paths: Vec<(Vec, isize)>) -> serde_json::Value { - use serde_json::map::Map; use serde_json::Value::Object; @@ -84,9 +98,8 @@ fn paths_to_nested(paths: Vec<(Vec, isize)>) -> serde_json::Value // In short: We're rebuilding Clojure's `assoc-in` here: // `(assoc-in acc (pop path) (peek path))` for (mut path, diff) in paths { - // @TODO handle retractions - + let mut current_map = &mut acc; let last_val = path.pop().unwrap(); @@ -105,9 +118,7 @@ fn paths_to_nested(paths: Vec<(Vec, isize)>) -> serde_json::Value _ => unreachable!(), }; - let entry = current_map - .entry(k) - .or_insert_with(|| Object(Map::new())); + let entry = current_map.entry(k).or_insert_with(|| Object(Map::new())); *entry = match entry { Object(m) => Object(std::mem::replace(m, Map::new())), diff --git a/src/sinks/csv_file.rs b/src/sinks/csv_file.rs index 69e17a1..5dc5652 100644 --- a/src/sinks/csv_file.rs +++ b/src/sinks/csv_file.rs @@ -37,10 +37,7 @@ impl Sinkable for CsvFile { .from_path(&self.path); match writer_result { - Err(error) => Err(Error { - category: "df.error.category/fault", - message: format!("Failed to create writer: {}", error), - }), + Err(error) => Err(Error::fault(format!("Failed to create writer: {}", error))), Ok(mut writer) => { let mut recvd = Vec::new(); let mut vector = Vec::new(); diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index c6a96bd..a8521d9 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -7,12 +7,12 @@ use std::time::{Duration, Instant}; use timely::dataflow::channels::pact::ParallelizationContract; use timely::dataflow::operators::generic::{Operator, OutputHandle}; use timely::dataflow::operators::probe::Probe; -use timely::dataflow::{Scope, Stream, ProbeHandle}; +use timely::dataflow::{ProbeHandle, Scope, Stream}; use timely::progress::Timestamp; use differential_dataflow::lattice::Lattice; -use crate::{Error, ResultDiff}; +use crate::{Error, Output, ResultDiff}; #[cfg(feature = "csv-source")] pub mod csv_file; @@ -36,7 +36,7 @@ where stream: &Stream>, pact: P, probe: &mut ProbeHandle, - ) -> Result<(), Error> + ) -> Result>>, Error> where S: Scope, P: ParallelizationContract>; @@ -61,7 +61,7 @@ impl Sinkable for Sink { stream: &Stream>, pact: P, probe: &mut ProbeHandle, - ) -> Result<(), Error> + ) -> Result>>, Error> where S: Scope, P: ParallelizationContract>, @@ -76,7 +76,7 @@ impl Sinkable for Sink { stream: &Stream>, pact: P, probe: &mut ProbeHandle, - ) -> Result<(), Error> + ) -> Result>>, Error> where S: Scope, P: ParallelizationContract>, @@ -112,7 +112,8 @@ impl Sinkable for Sink { writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) .unwrap(); } - } else if received_input && !input.frontier.frontier().less_equal(&last) { + } else if received_input && !input.frontier.frontier().less_equal(&last) + { if let Some(ref mut writer) = &mut writer { writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last) .unwrap(); @@ -124,11 +125,11 @@ impl Sinkable for Sink { } }) .probe_with(probe); + + Ok(None) } - Sink::AssocIn(ref sink) => { sink.sink(stream, pact, probe)?; } + Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe), _ => unimplemented!(), } - - Ok(()) } } From 458a1573362d86010185e493f04abbb8ec80cea1 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 11:55:25 +0200 Subject: [PATCH 08/13] [BREAKING ]Revert back to tagged serialization --- src/lib.rs | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c7c5518..4c64b8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,7 @@ pub type Aid = String; // u32 /// /// This enum captures the currently supported data types, and is the /// least common denominator for the types of records moved around. -#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Deserialize)] +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum Value { /// An attribute identifier Aid(Aid), @@ -87,24 +87,24 @@ pub enum Value { Uuid(Uuid), } -impl Serialize for Value { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - Value::Aid(aid) => serializer.serialize_newtype_variant("Value", 0, "Aid", &aid), - Value::String(s) => serializer.serialize_str(&s), - Value::Bool(b) => serializer.serialize_bool(*b), - Value::Number(n) => serializer.serialize_i64(*n), - Value::Rational32(r) => r.serialize(serializer), - Value::Eid(eid) => serializer.serialize_u64(*eid), - Value::Instant(i) => serializer.serialize_newtype_variant("Value", 6, "Instant", i), - #[cfg(feature = "uuid")] - Value::Uuid(uuid) => serializer.serialize_newtype_variant("Value", 7, "Uuid", &uuid), - } - } -} +// impl Serialize for Value { +// fn serialize(&self, serializer: S) -> Result +// where +// S: Serializer, +// { +// match self { +// Value::Aid(aid) => serializer.serialize_newtype_variant("Value", 0, "Aid", &aid), +// Value::String(s) => serializer.serialize_str(&s), +// Value::Bool(b) => serializer.serialize_bool(*b), +// Value::Number(n) => serializer.serialize_i64(*n), +// Value::Rational32(r) => r.serialize(serializer), +// Value::Eid(eid) => serializer.serialize_u64(*eid), +// Value::Instant(i) => serializer.serialize_newtype_variant("Value", 6, "Instant", i), +// #[cfg(feature = "uuid")] +// Value::Uuid(uuid) => serializer.serialize_newtype_variant("Value", 7, "Uuid", &uuid), +// } +// } +// } impl std::convert::From for Eid { fn from(v: Value) -> Eid { From 4227cca72b60e26927f76d76ef47f34e19a9fd31 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 13:01:07 +0200 Subject: [PATCH 09/13] Unify CLI response handling --- cli/src/main.rs | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 57153cb..f498f52 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -7,6 +7,7 @@ extern crate log; extern crate clap; use std::io::Read; +use std::time::Duration; use clap::App; use uuid::Uuid; @@ -15,7 +16,7 @@ use ws::{connect, CloseCode}; use declarative_dataflow::plan::{GraphQl, Plan}; use declarative_dataflow::server::{Interest, Register, Request}; use declarative_dataflow::sinks::{AssocIn, Sink}; -use declarative_dataflow::{Rule, TxData}; +use declarative_dataflow::{Output, Rule, TxData}; fn main() { env_logger::init(); @@ -27,7 +28,7 @@ fn main() { let port = matches.value_of("port").unwrap_or("6262"); let addr = format!("ws://{}:{}", host, port); - if let Some(matches) = matches.subcommand_matches("ping") { + if let Some(_) = matches.subcommand_matches("ping") { connect(addr.clone(), |out| { let req = serde_json::to_string::>(&vec![Request::Status]) .expect("failed to serialize requests"); @@ -35,7 +36,7 @@ fn main() { out.send(req).unwrap(); move |msg| { - info!("{:?}", msg); + handle_message(msg)?; out.close(CloseCode::Normal) } }) @@ -64,7 +65,7 @@ fn main() { out.send(req).unwrap(); move |msg| { - info!("{:?}", msg); + handle_message(msg)?; out.close(CloseCode::Normal) } }) @@ -93,7 +94,7 @@ fn main() { out.send(req).unwrap(); move |msg| { - info!("{:?}", msg); + handle_message(msg)?; out.close(CloseCode::Normal) } }) @@ -128,7 +129,7 @@ fn main() { name: name.to_string(), tenant: None, granularity: None, - sink: Some(Sink::AssocIn(AssocIn { name: name.to_string() })), + sink: Some(Sink::AssocIn(AssocIn {})), disable_logging: None, }), ]) @@ -138,11 +139,31 @@ fn main() { out.send(req).unwrap(); - move |msg| { - info!("{:?}", msg); - out.close(CloseCode::Normal) - } + move |msg| handle_message(msg) }) .expect("failed to connect"); } } + +fn handle_message(msg: ws::Message) -> ws::Result<()> { + match msg { + ws::Message::Text(msg) => { + trace!("{:?}", msg); + + match serde_json::from_str::>(&msg) { + Err(err) => error!("{:?}", err), + Ok(out) => match out { + Output::Json(_, v, _, _) => { + let pprinted = serde_json::to_string_pretty(&v).expect("failed to pprint"); + info!("{}", pprinted); + } + Output::Error(_, err, tx_id) => error!("{:?} @ {}", err, tx_id), + _ => info!("{:?}", out), + }, + } + } + ws::Message::Binary(_) => unimplemented!(), + } + + Ok(()) +} From b3b1c34362d43d65f71ee81d0a46c859ac623052 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 13:02:08 +0200 Subject: [PATCH 10/13] [BREAKING] Serialize Output directlyb --- server/src/main.rs | 62 ++++++++++++++-------------------------------- src/lib.rs | 5 ++-- 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 44121ac..5afb830 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -311,71 +311,47 @@ fn main() { } RESULTS => { while let Ok(out) = recv_results.try_recv() { - let (tokens, serialized): (Box>, _) = match out { - Output::QueryDiff(name, results) => { + let tokens: Box> = match &out { + &Output::QueryDiff(ref name, ref results) => { info!("[WORKER {}] {} {} results", worker.index(), name, results.len()); - match server.interests.get(&name) { + match server.interests.get(name) { None => { warn!("result on query {} w/o interested clients", name); - (Box::new(std::iter::empty()), None) - } - Some(tokens) => { - let serialized = - serde_json::to_string::<(String, Vec>)>(&(name, results)) - .expect("failed to serialize outputs"); - - (Box::new(tokens.iter().cloned()), Some(serialized)) + Box::new(std::iter::empty()) } + Some(tokens) => Box::new(tokens.iter().cloned()), } } - Output::TenantDiff(name, client, results) => { + &Output::TenantDiff(ref name, client, ref results) => { info!("[WORKER {}] {} results for tenant {:?} on query {}", worker.index(), results.len(), client, name); - - let serialized = - serde_json::to_string::<(String, Vec>)>(&(name, results)) - .expect("failed to serialize outputs"); - - (Box::new(std::iter::once(client.into())), Some(serialized)) + Box::new(std::iter::once(client.into())) } - Output::Json(name, value, t, diff) => { + &Output::Json(ref name, _, _, _) => { info!("[WORKER {}] json on query {}", worker.index(), name); - match server.interests.get(&name) { + match server.interests.get(name) { None => { warn!("result on query {} w/o interested clients", name); - (Box::new(std::iter::empty()), None) - } - Some(tokens) => { - let serialized = - serde_json::to_string::<(String, Vec<(serde_json::Value, T, isize)>)>(&(name, vec![(value, t, diff)])) - .expect("failed to serialize outputs"); - - (Box::new(tokens.iter().cloned()), Some(serialized)) + Box::new(std::iter::empty()) } + Some(tokens) => Box::new(tokens.iter().cloned()), } } - Output::Message(client, msg) => { + &Output::Message(client, ref msg) => { info!("[WORKER {}] {:?}", worker.index(), msg); - - (Box::new(std::iter::once(client.into())), Some(msg.to_string())) + Box::new(std::iter::once(client.into())) } - Output::Error(client, error, tx_id) => { + &Output::Error(client, ref error, _) => { error!("[WORKER {}] {:?}", worker.index(), error); - - let mut serializable = serde_json::Map::new(); - serializable.insert("df.error/category".to_string(), serde_json::Value::String(error.category.to_string())); - serializable.insert("df.error/message".to_string(), serde_json::Value::String(error.message.to_string())); - - let serialized = serde_json::to_string::<(&'static str, Vec<(serde_json::Map<_,_>, TxId)>)>( - &("df.error", vec![(serializable, tx_id)]) - ).expect("failed to serialize errors"); - - (Box::new(std::iter::once(client.into())), Some(serialized)) + Box::new(std::iter::once(client.into())) } }; - let msg = ws::Message::text(serialized.expect("nothing to send")); + let serialized = serde_json::to_string::>(&out) + .expect("failed to serialize output"); + + let msg = ws::Message::text(serialized); for token in tokens { match connections.get_mut(token.into()) { diff --git a/src/lib.rs b/src/lib.rs index 4c64b8c..ae44c49 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,8 +45,6 @@ use differential_dataflow::trace::wrappers::enter_at::TraceEnter as TraceEnterAt use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{Collection, Data}; -use serde::{Serialize, Serializer}; - #[cfg(feature = "uuid")] pub use uuid::Uuid; @@ -87,6 +85,7 @@ pub enum Value { Uuid(Uuid), } +// use serde::{Serialize, Serializer}; // impl Serialize for Value { // fn serialize(&self, serializer: S) -> Result // where @@ -120,8 +119,10 @@ impl std::convert::From for Eid { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Error { /// Error category. + #[serde(rename = "df.error/category")] pub category: String, /// Free-frorm description. + #[serde(rename = "df.error/message")] pub message: String, } From 1f9d3c27b9bb190481e73ff3a98ae1c1c4958ab5 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 13:02:33 +0200 Subject: [PATCH 11/13] Add SinkingContext --- server/src/main.rs | 12 +++---- src/server/mod.rs | 9 +++++ src/sinks/assoc_in.rs | 79 ++++++++++++++++++++----------------------- src/sinks/mod.rs | 14 +++++++- 4 files changed, 64 insertions(+), 50 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 5afb830..1edf2ca 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,7 +33,7 @@ use slab::Slab; use ws::connection::{ConnEvent, Connection}; use declarative_dataflow::server::{Config, CreateAttribute, Request, Server, TxId}; -use declarative_dataflow::sinks::Sinkable; +use declarative_dataflow::sinks::{Sinkable, SinkingContext}; use declarative_dataflow::{Eid, Error, Output, ResultDiff}; /// Server timestamp type. @@ -557,7 +557,7 @@ fn main() { } worker.dataflow::(|scope| { - let name = req.name.clone(); + let sink_context: SinkingContext = (&req).into(); match server.interest(&req.name, scope) { Err(error) => { @@ -591,7 +591,7 @@ fn main() { match req.sink { Some(sink) => { - sink.sink(&delayed.inner, pact, &mut server.probe) + sink.sink(&delayed.inner, pact, &mut server.probe, sink_context) .expect("sinking failed"); } None => { @@ -611,7 +611,7 @@ fn main() { for (tenant, batch) in &per_tenant { send_results_handle - .send(Output::TenantDiff(name.clone(), tenant, batch.collect())) + .send(Output::TenantDiff(sink_context.name.clone(), tenant, batch.collect())) .unwrap(); } }); @@ -625,7 +625,7 @@ fn main() { match req.sink { Some(sink) => { - let sunk = sink.sink(&delayed.inner, pact, &mut server.probe) + let sunk = sink.sink(&delayed.inner, pact, &mut server.probe, sink_context) .expect("sinking failed"); if let Some(sunk) = sunk { @@ -658,7 +658,7 @@ fn main() { input.for_each(|_time, data| { send_results_handle - .send(Output::QueryDiff(name.clone(), data.to_vec())) + .send(Output::QueryDiff(sink_context.name.clone(), data.to_vec())) .unwrap(); }); } diff --git a/src/server/mod.rs b/src/server/mod.rs index ef91a99..4268672 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -81,6 +81,15 @@ pub struct Interest { pub disable_logging: Option, } +impl std::convert::From<&Interest> for crate::sinks::SinkingContext { + fn from(interest: &Interest) -> Self { + Self { + name: interest.name.clone(), + granularity: interest.granularity.clone(), + } + } +} + /// A request with the intent of synthesising one or more new rules /// and optionally publishing one or more of them. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] diff --git a/src/sinks/assoc_in.rs b/src/sinks/assoc_in.rs index e075866..363d9ca 100644 --- a/src/sinks/assoc_in.rs +++ b/src/sinks/assoc_in.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use timely::dataflow::channels::pact::ParallelizationContract; use timely::dataflow::operators::generic::Operator; -use timely::dataflow::operators::{Inspect, Probe}; use timely::dataflow::{ProbeHandle, Scope, Stream}; use timely::progress::Timestamp; @@ -12,14 +11,11 @@ use differential_dataflow::lattice::Lattice; use crate::{Error, Output, ResultDiff}; -use super::Sinkable; +use super::{Sinkable, SinkingContext}; /// A nested hash-map sink. #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct AssocIn { - /// Query name. - pub name: String, -} +pub struct AssocIn {} impl Sinkable for AssocIn where @@ -29,7 +25,8 @@ where &self, stream: &Stream>, pact: P, - probe: &mut ProbeHandle, + _probe: &mut ProbeHandle, + context: SinkingContext, ) -> Result>>, Error> where S: Scope, @@ -38,42 +35,38 @@ where let mut paths = HashMap::new(); let mut vector = Vec::new(); - let name = self.name.to_string(); - - let sunk = stream - .unary_notify( - pact, - "AssocIn", - vec![], - move |input, output, notificator| { - input.for_each(|cap, data| { - data.swap(&mut vector); - - paths - .entry(cap.time().clone()) - .or_insert_with(Vec::new) - .extend(vector.drain(..).map(|(x, _t, diff)| (x, diff))); - - notificator.notify_at(cap.retain()); - }); - - // pop completed views - notificator.for_each(|cap, _, _| { - if let Some(paths_at_time) = paths.remove(cap.time()) { - let map = paths_to_nested(paths_at_time); - output.session(&cap).give(Output::Json( - name.clone(), - map, - cap.time().clone(), - 1, - )); - } - }); - }, - ) - .inspect(|x| { - println!("{:?}", x); - }); + let name = context.name; + + let sunk = stream.unary_notify( + pact, + "AssocIn", + vec![], + move |input, output, notificator| { + input.for_each(|cap, data| { + data.swap(&mut vector); + + paths + .entry(cap.time().clone()) + .or_insert_with(Vec::new) + .extend(vector.drain(..).map(|(x, _t, diff)| (x, diff))); + + notificator.notify_at(cap.retain()); + }); + + // pop completed views + notificator.for_each(|cap, _, _| { + if let Some(paths_at_time) = paths.remove(cap.time()) { + let map = paths_to_nested(paths_at_time); + output.session(&cap).give(Output::Json( + name.clone(), + map, + cap.time().clone(), + 1, + )); + } + }); + }, + ); Ok(Some(sunk)) } diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index a8521d9..6b44418 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -24,6 +24,15 @@ pub mod assoc_in; #[cfg(feature = "graphql")] pub use self::assoc_in::AssocIn; +/// A struct encapsulating any state required to create sinks. +pub struct SinkingContext { + /// The name of the dataflow feeding this sink. + pub name: String, + /// Granularity (in seconds or tx ids) at which to send + /// results. None indicates no delay. + pub granularity: Option, +} + /// An external system that wants to receive result diffs. pub trait Sinkable where @@ -36,6 +45,7 @@ where stream: &Stream>, pact: P, probe: &mut ProbeHandle, + context: SinkingContext, ) -> Result>>, Error> where S: Scope, @@ -61,6 +71,7 @@ impl Sinkable for Sink { stream: &Stream>, pact: P, probe: &mut ProbeHandle, + context: SinkingContext, ) -> Result>>, Error> where S: Scope, @@ -76,6 +87,7 @@ impl Sinkable for Sink { stream: &Stream>, pact: P, probe: &mut ProbeHandle, + context: SinkingContext, ) -> Result>>, Error> where S: Scope, @@ -128,7 +140,7 @@ impl Sinkable for Sink { Ok(None) } - Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe), + Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe, context), _ => unimplemented!(), } } From a7fbbc7637f169fb2d1bbd3c1aecd7c649e1b6e0 Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 13:24:34 +0200 Subject: [PATCH 12/13] Update toolchain --- rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain b/rust-toolchain index 2404d95..4b06b5b 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.34.0 \ No newline at end of file +1.34.1 \ No newline at end of file From 21040d8d97224539c1a1d461328467b420a18e5e Mon Sep 17 00:00:00 2001 From: niko Date: Sat, 27 Apr 2019 13:24:51 +0200 Subject: [PATCH 13/13] Cleanup, disable CSV sink --- src/lib.rs | 3 ++- src/sinks/mod.rs | 29 +++++++++++++++-------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ae44c49..7a9a79b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,9 +188,10 @@ pub enum Output { /// An output diff on a multi-tenant query. TenantDiff(String, Client, Vec>), /// A JSON object, e.g. as returned by GraphQL queries. - #[cfg(feature = "graphql")] + #[cfg(feature = "serde_json")] Json(String, serde_json::Value, T, isize), /// A message forwarded to a specific client. + #[cfg(feature = "serde_json")] Message(Client, serde_json::Value), /// An error forwarded to a specific client. Error(Client, Error, server::TxId), diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 6b44418..19b4e49 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -14,14 +14,14 @@ use differential_dataflow::lattice::Lattice; use crate::{Error, Output, ResultDiff}; -#[cfg(feature = "csv-source")] -pub mod csv_file; -#[cfg(feature = "csv-source")] -pub use self::csv_file::CsvFile; +// #[cfg(feature = "csv-source")] +// pub mod csv_file; +// #[cfg(feature = "csv-source")] +// pub use self::csv_file::CsvFile; -#[cfg(feature = "graphql")] +#[cfg(feature = "serde_json")] pub mod assoc_in; -#[cfg(feature = "graphql")] +#[cfg(feature = "serde_json")] pub use self::assoc_in::AssocIn; /// A struct encapsulating any state required to create sinks. @@ -57,21 +57,21 @@ where pub enum Sink { /// /dev/null, used for benchmarking TheVoid(Option), - /// CSV files - #[cfg(feature = "csv-source")] - CsvFile(CsvFile), + // /// CSV files + // #[cfg(feature = "csv-source")] + // CsvFile(CsvFile), /// Nested Hash-Maps - #[cfg(feature = "graphql")] + #[cfg(feature = "serde_json")] AssocIn(AssocIn), } impl Sinkable for Sink { fn sink( &self, - stream: &Stream>, - pact: P, - probe: &mut ProbeHandle, - context: SinkingContext, + _stream: &Stream>, + _pact: P, + _probe: &mut ProbeHandle, + _context: SinkingContext, ) -> Result>>, Error> where S: Scope, @@ -140,6 +140,7 @@ impl Sinkable for Sink { Ok(None) } + #[cfg(feature = "serde_json")] Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe, context), _ => unimplemented!(), }