From 01c7a3fa5545b4a857bea890d523860872cbe764 Mon Sep 17 00:00:00 2001 From: Rohit Kulshreshtha Date: Mon, 3 Jun 2024 11:47:04 -0700 Subject: [PATCH] feat(anna): Added a working CLI (#1258) With this change, the water flows through the pipes, with Gets, Sets & Deletes primarily working. Some minor issues remain, but I'll do those separately. Next steps: https://github.com/hydro-project/hydroflow/issues/1253 https://github.com/hydro-project/hydroflow/issues/1257 https://github.com/hydro-project/hydroflow/issues/1256 https://github.com/hydro-project/hydroflow/issues/1255 https://github.com/hydro-project/hydroflow/issues/1254 --- Cargo.lock | 7 ++ datastores/gossip_kv/Cargo.toml | 9 ++ datastores/gossip_kv/cli/main.rs | 115 +++++++++++++++++++ datastores/gossip_kv/protocol/lib.rs | 159 ++++++++++++++++++++++++++ datastores/gossip_kv/server/main.rs | 114 +++++++++++++++++- datastores/gossip_kv/server/model.rs | 2 +- hydroflow_lang/src/graph/ops/state.rs | 2 +- 7 files changed, 402 insertions(+), 6 deletions(-) create mode 100644 datastores/gossip_kv/cli/main.rs create mode 100644 datastores/gossip_kv/protocol/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4ad54719554..3ce74228606 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,7 @@ dependencies = [ "hostname", "hydroflow", "serde", + "shlex", ] [[package]] @@ -2996,6 +2997,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml index 298b1ce9016..d9efe81b253 100644 --- a/datastores/gossip_kv/Cargo.toml +++ b/datastores/gossip_kv/Cargo.toml @@ -11,7 +11,16 @@ clap = { version = "4.5.4", features = ["derive"] } serde = { version = "1.0.198", features = ["derive"] } hydroflow = { path="../../hydroflow" } hostname = "0.4.0" +shlex = "1.3.0" [[bin]] name = "gossip_server" path = "server/main.rs" + +[[bin]] +name = "gossip_cli" +path = "cli/main.rs" + +[lib] +name = "gossip_protocol" +path = "protocol/lib.rs" \ No newline at end of file diff --git a/datastores/gossip_kv/cli/main.rs b/datastores/gossip_kv/cli/main.rs new file mode 100644 index 00000000000..155db30d15e --- /dev/null +++ b/datastores/gossip_kv/cli/main.rs @@ -0,0 +1,115 @@ +use std::net::SocketAddr; + +use clap::{CommandFactory, Parser, Subcommand}; +use gossip_protocol::{ClientRequest, ClientResponse, Key}; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use hydroflow::{hydroflow_syntax, tokio, DemuxEnum}; + +/// CLI program to interact with Layer 0 gossip store. +#[derive(Debug, Parser)] +struct Opts { + #[clap(short, long, help = "Server address to connect to.")] + server_address: Option, +} + +/// Dummy app for using clap to process commands for interactive CLI. +#[derive(Debug, Parser)] +#[command(multicall = true)] +struct InteractiveApp { + #[clap(subcommand)] + commands: InteractiveCommands, +} + +#[derive(Debug, Subcommand, DemuxEnum)] +enum InteractiveCommands { + /// Get a value from the store. + Get { + #[arg(value_parser = parse_key, required = true, help = "Key to get")] + key: Key, + }, + /// Upsert a value in the store. + Set { + #[arg(value_parser = parse_key, required = true, help = "Key to set")] + key: Key, + value: String, + }, + /// Delete a value from the store. + Delete { + #[arg(value_parser = parse_key, required = true, help = "Key to delete")] + key: Key, + }, + /// Exit the application. + Exit, +} + +/// Allows clap to parse Keys from user input. +fn parse_key(s: &str) -> Result { + s.parse::().map_err(|e| e.to_string()) +} + +/// Parse a command from a line of input. +fn parse_command(line: String) -> Option { + // Override how help is handled. + if line.trim() == "help" { + InteractiveApp::command() + .help_template("\nAvailable Commands: \n{subcommands}") + .print_help() + .unwrap(); + return None; + } + + // Split quoted string into parts. + let line_parts = shlex::split(&line); + + if line_parts.is_none() { + eprintln!("\nUnable to parse command."); + return None; + } + + // Provide split parts to clap to process. + let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap()); + + match maybe_parsed { + Err(e) => { + // Problem with the parsed result. This displays some help. + eprintln!("\n{}", e); + None + } + Ok(cli) => Some(cli.commands), + } +} + +#[hydroflow::main] +async fn main() { + let opts = Opts::parse(); + + // Bind to OS-assigned port on localhost. + let address = ipv4_resolve("localhost:0").unwrap(); + + // Default to localhost:3000 if not provided. + let server_address = opts + .server_address + .unwrap_or_else(|| ipv4_resolve("localhost:3000").unwrap()); + + // Setup UDP sockets for communication. + let (outbound, inbound, _) = bind_udp_bytes(address).await; + + let mut cli = hydroflow_syntax! { + inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response)); + + outbound_messages = union() -> dest_sink_serde(outbound); + + // Parse commands from stdin. + commands = source_stdin() + -> filter_map(|line| parse_command(line.unwrap())) + -> demux_enum::(); + + commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages; + commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages; + commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages; + commands[Exit] -> for_each(|()| std::process::exit(0)); // TODO: Graceful shutdown https://github.com/hydro-project/hydroflow/issues/1253 + + }; + + cli.run_async().await; +} diff --git a/datastores/gossip_kv/protocol/lib.rs b/datastores/gossip_kv/protocol/lib.rs new file mode 100644 index 00000000000..0a294775507 --- /dev/null +++ b/datastores/gossip_kv/protocol/lib.rs @@ -0,0 +1,159 @@ +use std::fmt::Display; +use std::str::FromStr; + +use serde::{Deserialize, Serialize}; + +use crate::KeyParseError::InvalidNamespace; + +/// The namespace of the key of an entry in the key-value store. +#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Hash)] +pub enum Namespace { + /// User namespace is for use by the user of the key-value store. + User, + + /// System namespace is reserved for use by the key-value store itself. + System, +} + +/// Error that can occur when parsing a key from a string. +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub enum KeyParseError { + /// The namespace in the key is invalid. Namespaces must be either `usr` or `sys`. + InvalidNamespace, + + /// The key is in an invalid format. Keys must be of the form `/namespace/table/row`. + InvalidFormat, +} + +impl Display for KeyParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + InvalidNamespace => write!(f, "Invalid namespace"), + KeyParseError::InvalidFormat => write!(f, "Invalid key format"), + } + } +} + +impl FromStr for Namespace { + type Err = KeyParseError; + fn from_str(s: &str) -> Result { + match s { + "usr" => Ok(Namespace::User), + "sys" => Ok(Namespace::System), + _ => Err(InvalidNamespace), + } + } +} + +/// The name of a table in the key-value store. +pub type TableName = String; + +/// The key of a row in a table in the key-value store. +pub type RowKey = String; + +/// A key of an entry in the key-value store. +/// +/// Data in the key-value store is organized into namespaces, tables, and rows. Namespaces are +/// either `usr` for user data or `sys` for system data. Namespaces contain tables, which contain +/// rows. Each row has a row key and a row value. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)] +pub struct Key { + /// The namespace of the key. + pub namespace: Namespace, + /// The name of the table in the key. + pub table: TableName, + /// The key of the row in the table. + pub row_key: RowKey, +} + +impl FromStr for Key { + type Err = KeyParseError; + fn from_str(s: &str) -> Result { + // TODO: Support escaping '/' in table and row keys. https://github.com/hydro-project/hydroflow/issues/1254 + let parts: Vec<&str> = s.split('/').collect(); + if parts.len() != 4 { + return Err(KeyParseError::InvalidFormat); + } + if !parts[0].is_empty() { + return Err(KeyParseError::InvalidFormat); + } + let namespace = parts[1].parse()?; + Ok(Key { + namespace, + table: parts[2].to_string(), + row_key: parts[3].to_string(), + }) + } +} + +/// A request from a client to the key-value store. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub enum ClientRequest { + /// A request to get the value of a key. + Get { key: Key }, + /// A request to set the value of a key. + Set { key: Key, value: String }, + /// A request to delete the value of a key. + Delete { key: Key }, +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub enum ClientResponse { + /// A response for a get request. The key is echoed back along with the value, if it exists. + Get { key: Key, value: Option }, + /// A response for a set request. The success field is true if the set was successful. + Set { success: bool }, + /// A response for a delete request. The success field is true if delete was successful. + Delete { success: bool }, +} + +#[cfg(test)] +mod tests { + use super::{Key, Namespace}; + + #[test] + fn test_key_parsing_sys_namespace() { + // Sys namespace + let first = "/sys/sys_table/sys_row".parse::().unwrap(); + assert_eq!(first.namespace, Namespace::System); + assert_eq!(first.table, "sys_table"); + assert_eq!(first.row_key, "sys_row"); + } + #[test] + fn test_key_parsing_user_namespace() { + // User namespace + let second = "/usr/usr_table/usr_row".parse::().unwrap(); + assert_eq!(second.namespace, Namespace::User); + assert_eq!(second.table, "usr_table"); + assert_eq!(second.row_key, "usr_row"); + } + + #[test] + fn test_key_parsing_invalid_namespace() { + // Invalid namespace + let non_existent_namespace = "/ne_namespace/ne_table/ne_row".parse::(); + assert!(non_existent_namespace.is_err()); + assert_eq!( + non_existent_namespace.unwrap_err(), + super::KeyParseError::InvalidNamespace + ); + } + + #[test] + fn test_key_parsing_invalid_format() { + // Invalid format + let invalid_format = "/not_even_a_key".parse::(); + assert!(invalid_format.is_err()); + assert_eq!( + invalid_format.unwrap_err(), + super::KeyParseError::InvalidFormat + ); + + let invalid_format = "abcd/sys/sys_table/sys_row".parse::(); + assert!(invalid_format.is_err()); + assert_eq!( + invalid_format.unwrap_err(), + super::KeyParseError::InvalidFormat + ); + } +} diff --git a/datastores/gossip_kv/server/main.rs b/datastores/gossip_kv/server/main.rs index 80c045d8320..4c3adaec2d4 100644 --- a/datastores/gossip_kv/server/main.rs +++ b/datastores/gossip_kv/server/main.rs @@ -1,16 +1,122 @@ -use std::time::Duration; +use std::collections::HashMap; +use std::net::SocketAddr; -use hydroflow::{hydroflow_syntax, tokio}; +use gossip_protocol::{ClientRequest, ClientResponse, Key, Namespace}; +use hydroflow::itertools::Itertools; +use hydroflow::lattices::map_union::{KeyedBimorphism, MapUnionHashMap}; +use hydroflow::lattices::set_union::SetUnionHashSet; +use hydroflow::lattices::PairBimorphism; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use hydroflow::{hydroflow_syntax, tokio, DemuxEnum}; + +use crate::model::{delete_row, upsert_row, Clock, RowKey, TableMap, TableName}; -#[allow(dead_code)] mod model; +#[derive(Debug, DemuxEnum)] +enum ClientRequestWithAddress { + Get { + key: Key, + addr: SocketAddr, + }, + Set { + key: Key, + value: String, + addr: SocketAddr, + }, + Delete { + key: Key, + addr: SocketAddr, + }, +} + +impl ClientRequestWithAddress { + fn from_request_and_address(request: ClientRequest, addr: SocketAddr) -> Self { + match request { + ClientRequest::Get { key } => Self::Get { key, addr }, + ClientRequest::Set { key, value } => Self::Set { key, value, addr }, + ClientRequest::Delete { key } => Self::Delete { key, addr }, + } + } +} + #[hydroflow::main] async fn main() { + let address = ipv4_resolve("localhost:3000").unwrap(); + let (outbound, inbound, _) = bind_udp_bytes(address).await; + + // let foobar: MapUnion>>> = MapUnion::new_from(HashMap::new());; + let mut server = hydroflow_syntax! { - source_interval(Duration::from_secs(5)) -> for_each(|_| println!("Coming soon!")); + outbound_messages = dest_sink_serde(outbound); + + inbound_messages = source_stream_serde(inbound) + -> map(Result::unwrap) + -> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr)) + -> demux_enum::(); + + inbound_messages[Get] + -> inspect(|req| println!("Received Get request: {:?}", req)) + -> filter(|(key, _)| { + match key.namespace { + Namespace::System => true, + _ => { + // TODO: Support user namespace https://github.com/hydro-project/hydroflow/issues/1255 + eprintln!("Unsupported namespace: {:?}", key.namespace); + false + }, + } + }) + -> map(|(key, addr) : (Key, SocketAddr)| MapUnionHashMap::new_from([(key.table, MapUnionHashMap::new_from([(key.row_key, SetUnionHashSet::new_from([addr]))]))])) + -> system_table_gets; + + inbound_messages[Set] + -> inspect(|request| println!("Received Set request: {:?}", request)) + -> map(|(key, value, _addr) : (Key, String, SocketAddr)| upsert_row(Clock::new(context.current_tick().0), key.table, key.row_key, value)) + -> system_table; + + inbound_messages[Delete] + -> inspect(|req| println!("Received Delete request: {:?}", req)) + -> map(|(key, _addr) : (Key, SocketAddr)| delete_row(Clock::new(context.current_tick().0), key.table, key.row_key)) + -> system_table; + + system_table = union() + -> state::<'static, TableMap::>(); + + system_table_gets = union() + -> state::<'tick, MapUnionHashMap>>>(); + + system_table -> [0]process_system_table_gets; + system_table_gets -> [1]process_system_table_gets; + + process_system_table_gets = lattice_bimorphism(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(PairBimorphism)), #system_table, #system_table_gets) + -> flat_map(|result| { + + let mut response: Vec<(ClientResponse, SocketAddr)> = vec![]; + + // TODO: Support multiple results. https://github.com/hydro-project/hydroflow/issues/1256 + let result = result.as_reveal_ref(); + for (table_name, table) in result.iter() { + for (row_key, row_val) in table.as_reveal_ref().iter() { + let key = Key { + namespace: Namespace::System, + table: table_name.clone(), + row_key: row_key.clone(), + }; + + let value = row_val.as_reveal_ref().0.as_reveal_ref().1.as_reveal_ref().iter().find_or_first(|_| true).unwrap(); + let socket_addr = row_val.as_reveal_ref().1.as_reveal_ref().iter().find_or_first(|_| true).unwrap(); + response.push((ClientResponse::Get { key, value: Some(value.clone()) }, *socket_addr)); + } + } + + response + }) -> outbound_messages; + // Uncomment to aid with debugging. + // source_interval(Duration::from_secs(3)) + // -> for_each(|_| println!("State: {:?}", #system_table)); }; server.run_async().await; diff --git a/datastores/gossip_kv/server/model.rs b/datastores/gossip_kv/server/model.rs index 758ebd3937f..d254c74f47c 100644 --- a/datastores/gossip_kv/server/model.rs +++ b/datastores/gossip_kv/server/model.rs @@ -22,7 +22,7 @@ pub type TableMap = MapUnionHashMap>; /// Timestamps used in the model. // TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydroflow/issues/1207. -pub type Clock = Max; +pub type Clock = Max; /// TableMap element to upsert a row in an existing TableMap. /// diff --git a/hydroflow_lang/src/graph/ops/state.rs b/hydroflow_lang/src/graph/ops/state.rs index d3ea35b3cf7..36d85300b0d 100644 --- a/hydroflow_lang/src/graph/ops/state.rs +++ b/hydroflow_lang/src/graph/ops/state.rs @@ -135,7 +135,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { ) -> impl 'a + #root::pusherator::Pusherator where Item: 'a + ::std::clone::Clone, - Push: #root::pusherator::Pusherator, + Push: 'a + #root::pusherator::Pusherator, Lat: 'static + #root::lattices::Merge, { #root::pusherator::inspect::Inspect::new(move |item| {