Skip to content

Commit

Permalink
feat(anna): Added a working CLI (#1258)
Browse files Browse the repository at this point in the history
With this change, the water flows through the pipes, with Gets, Sets &
Deletes primarily working. Some minor issues remain, but I'll do those
separately.

Next steps:
#1253
#1257
#1256
#1255
#1254
  • Loading branch information
rohitkulshreshtha committed Jun 5, 2024
1 parent c2c71ea commit 01c7a3f
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 6 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions datastores/gossip_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ clap = { version = "4.5.4", features = ["derive"] }
serde = { version = "1.0.198", features = ["derive"] }
hydroflow = { path="../../hydroflow" }
hostname = "0.4.0"
shlex = "1.3.0"

[[bin]]
name = "gossip_server"
path = "server/main.rs"

[[bin]]
name = "gossip_cli"
path = "cli/main.rs"

[lib]
name = "gossip_protocol"
path = "protocol/lib.rs"
115 changes: 115 additions & 0 deletions datastores/gossip_kv/cli/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::net::SocketAddr;

use clap::{CommandFactory, Parser, Subcommand};
use gossip_protocol::{ClientRequest, ClientResponse, Key};
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use hydroflow::{hydroflow_syntax, tokio, DemuxEnum};

/// CLI program to interact with Layer 0 gossip store.
#[derive(Debug, Parser)]
struct Opts {
#[clap(short, long, help = "Server address to connect to.")]
server_address: Option<SocketAddr>,
}

/// Dummy app for using clap to process commands for interactive CLI.
#[derive(Debug, Parser)]
#[command(multicall = true)]
struct InteractiveApp {
#[clap(subcommand)]
commands: InteractiveCommands,
}

#[derive(Debug, Subcommand, DemuxEnum)]
enum InteractiveCommands {
/// Get a value from the store.
Get {
#[arg(value_parser = parse_key, required = true, help = "Key to get")]
key: Key,
},
/// Upsert a value in the store.
Set {
#[arg(value_parser = parse_key, required = true, help = "Key to set")]
key: Key,
value: String,
},
/// Delete a value from the store.
Delete {
#[arg(value_parser = parse_key, required = true, help = "Key to delete")]
key: Key,
},
/// Exit the application.
Exit,
}

/// Allows clap to parse Keys from user input.
fn parse_key(s: &str) -> Result<Key, String> {
s.parse::<Key>().map_err(|e| e.to_string())
}

/// Parse a command from a line of input.
fn parse_command(line: String) -> Option<InteractiveCommands> {
// Override how help is handled.
if line.trim() == "help" {
InteractiveApp::command()
.help_template("\nAvailable Commands: \n{subcommands}")
.print_help()
.unwrap();
return None;
}

// Split quoted string into parts.
let line_parts = shlex::split(&line);

if line_parts.is_none() {
eprintln!("\nUnable to parse command.");
return None;
}

// Provide split parts to clap to process.
let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap());

match maybe_parsed {
Err(e) => {
// Problem with the parsed result. This displays some help.
eprintln!("\n{}", e);
None
}
Ok(cli) => Some(cli.commands),
}
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();

// Bind to OS-assigned port on localhost.
let address = ipv4_resolve("localhost:0").unwrap();

// Default to localhost:3000 if not provided.
let server_address = opts
.server_address
.unwrap_or_else(|| ipv4_resolve("localhost:3000").unwrap());

// Setup UDP sockets for communication.
let (outbound, inbound, _) = bind_udp_bytes(address).await;

let mut cli = hydroflow_syntax! {
inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response));

outbound_messages = union() -> dest_sink_serde(outbound);

// Parse commands from stdin.
commands = source_stdin()
-> filter_map(|line| parse_command(line.unwrap()))
-> demux_enum::<InteractiveCommands>();

commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages;
commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages;
commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages;
commands[Exit] -> for_each(|()| std::process::exit(0)); // TODO: Graceful shutdown https://github.com/hydro-project/hydroflow/issues/1253

};

cli.run_async().await;
}
159 changes: 159 additions & 0 deletions datastores/gossip_kv/protocol/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::fmt::Display;
use std::str::FromStr;

use serde::{Deserialize, Serialize};

use crate::KeyParseError::InvalidNamespace;

/// The namespace of the key of an entry in the key-value store.
#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Hash)]
pub enum Namespace {
/// User namespace is for use by the user of the key-value store.
User,

/// System namespace is reserved for use by the key-value store itself.
System,
}

/// Error that can occur when parsing a key from a string.
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum KeyParseError {
/// The namespace in the key is invalid. Namespaces must be either `usr` or `sys`.
InvalidNamespace,

/// The key is in an invalid format. Keys must be of the form `/namespace/table/row`.
InvalidFormat,
}

impl Display for KeyParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InvalidNamespace => write!(f, "Invalid namespace"),
KeyParseError::InvalidFormat => write!(f, "Invalid key format"),
}
}
}

impl FromStr for Namespace {
type Err = KeyParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"usr" => Ok(Namespace::User),
"sys" => Ok(Namespace::System),
_ => Err(InvalidNamespace),
}
}
}

/// The name of a table in the key-value store.
pub type TableName = String;

/// The key of a row in a table in the key-value store.
pub type RowKey = String;

/// A key of an entry in the key-value store.
///
/// Data in the key-value store is organized into namespaces, tables, and rows. Namespaces are
/// either `usr` for user data or `sys` for system data. Namespaces contain tables, which contain
/// rows. Each row has a row key and a row value.
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)]
pub struct Key {
/// The namespace of the key.
pub namespace: Namespace,
/// The name of the table in the key.
pub table: TableName,
/// The key of the row in the table.
pub row_key: RowKey,
}

impl FromStr for Key {
type Err = KeyParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// TODO: Support escaping '/' in table and row keys. https://github.com/hydro-project/hydroflow/issues/1254
let parts: Vec<&str> = s.split('/').collect();
if parts.len() != 4 {
return Err(KeyParseError::InvalidFormat);
}
if !parts[0].is_empty() {
return Err(KeyParseError::InvalidFormat);
}
let namespace = parts[1].parse()?;
Ok(Key {
namespace,
table: parts[2].to_string(),
row_key: parts[3].to_string(),
})
}
}

/// A request from a client to the key-value store.
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ClientRequest {
/// A request to get the value of a key.
Get { key: Key },
/// A request to set the value of a key.
Set { key: Key, value: String },
/// A request to delete the value of a key.
Delete { key: Key },
}

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ClientResponse {
/// A response for a get request. The key is echoed back along with the value, if it exists.
Get { key: Key, value: Option<String> },
/// A response for a set request. The success field is true if the set was successful.
Set { success: bool },
/// A response for a delete request. The success field is true if delete was successful.
Delete { success: bool },
}

#[cfg(test)]
mod tests {
use super::{Key, Namespace};

#[test]
fn test_key_parsing_sys_namespace() {
// Sys namespace
let first = "/sys/sys_table/sys_row".parse::<Key>().unwrap();
assert_eq!(first.namespace, Namespace::System);
assert_eq!(first.table, "sys_table");
assert_eq!(first.row_key, "sys_row");
}
#[test]
fn test_key_parsing_user_namespace() {
// User namespace
let second = "/usr/usr_table/usr_row".parse::<Key>().unwrap();
assert_eq!(second.namespace, Namespace::User);
assert_eq!(second.table, "usr_table");
assert_eq!(second.row_key, "usr_row");
}

#[test]
fn test_key_parsing_invalid_namespace() {
// Invalid namespace
let non_existent_namespace = "/ne_namespace/ne_table/ne_row".parse::<Key>();
assert!(non_existent_namespace.is_err());
assert_eq!(
non_existent_namespace.unwrap_err(),
super::KeyParseError::InvalidNamespace
);
}

#[test]
fn test_key_parsing_invalid_format() {
// Invalid format
let invalid_format = "/not_even_a_key".parse::<Key>();
assert!(invalid_format.is_err());
assert_eq!(
invalid_format.unwrap_err(),
super::KeyParseError::InvalidFormat
);

let invalid_format = "abcd/sys/sys_table/sys_row".parse::<Key>();
assert!(invalid_format.is_err());
assert_eq!(
invalid_format.unwrap_err(),
super::KeyParseError::InvalidFormat
);
}
}
Loading

0 comments on commit 01c7a3f

Please sign in to comment.