Skip to content

Commit

Permalink
refactor(hydroflow): cleanup kvs example with lattice properties (#924)
Browse files Browse the repository at this point in the history
Open graphs when `--graph` is specified
  • Loading branch information
MingweiSamuel committed Oct 4, 2023
1 parent 21140f0 commit db9f270
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
4 changes: 2 additions & 2 deletions hydroflow/examples/kvs/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Simple single-node key-value store example based on a join of PUTs and GETs.
Simple single-node key-value store example based on a join of PUTs and GETs.
Current semantics are purely monotone:
- PUTs are appended: we remember them all forever
- GETs are also remembered forever, akin to SUBSCRIBE: once a client issues a GET for key k they will receive a response on the current values of key k (if non-empty) and every future PUT for key k.
- GETs for empty keys get no acknowledgement, but will receive responses when a subsequent PUT arrives for that key

Clients accept commands on stdin. Command syntax is as follows:
- `PUT <key>, <value>`
- `GET <key>'
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.


Expand Down
5 changes: 2 additions & 3 deletions hydroflow/examples/kvs/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ pub(crate) async fn run_client(
.expect("No graph found, maybe failed to parse.");
match graph {
GraphType::Mermaid => {
println!("{}", serde_graph.to_mermaid());
serde_graph.open_mermaid().unwrap();
}
GraphType::Dot => {
println!("{}", serde_graph.to_dot())
serde_graph.open_dot().unwrap();
}
GraphType::Json => {
unimplemented!();
// println!("{}", serde_graph.to_json())
}
}
}
Expand Down
31 changes: 16 additions & 15 deletions hydroflow/examples/kvs/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,30 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, graph: Opt
println!("Server live!");

let mut df: Hydroflow = hydroflow_syntax! {
// NW channels
outbound_chan = union() -> dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound)
// network channels
network_send = union() -> dest_sink_serde(outbound);
network_recv = source_stream_serde(inbound)
-> _upcast(Some(Delta))
-> map(Result::unwrap)
-> inspect(|(msg, addr)| println!("Message received {:?} from {:?}", msg, addr))
-> map(|(msg, addr)| KvsMessageWithAddr::from_message(msg, addr))
-> demux_enum::<KvsMessageWithAddr>();
puts = inbound_chan[Put] -> tee();
gets = inbound_chan[Get] -> tee();

puts -> for_each(|(key, value, addr)| println!("Got a Put {:?}->{:?} from {:?}", key, value, addr));
gets -> for_each(|(key, addr)| println!("Got a Get {:?} from {:?}", key, addr));
puts = network_recv[Put] -> tee();
gets = network_recv[Get];

// ack puts
puts -> map(|(key, value, client_addr)| (KvsResponse { key, value }, client_addr)) -> [0]outbound_chan;
puts -> map(|(key, value, client_addr)| (KvsResponse { key, value }, client_addr)) -> [0]network_send;

// join PUTs and GETs by key
puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup;
gets -> [1]lookup;
lookup = join::<'static>() -> tee();
lookup[0] -> for_each(|t| println!("Found a match: {:?}", t));
lookup = join::<'static, 'tick>();

// send lookup responses back to the client address from the GET
lookup[1] -> map(|(key, (value, client_addr))| (KvsResponse { key, value }, client_addr)) -> [1]outbound_chan;
// network_send lookup responses back to the client address from the GET
lookup[1]
-> inspect(|tup| println!("Found a match: {:?}", tup))
-> map(|(key, (value, client_addr))| (KvsResponse { key, value }, client_addr))
-> [1]network_send;
};

if let Some(graph) = graph {
Expand All @@ -40,10 +41,10 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, graph: Opt
.expect("No graph found, maybe failed to parse.");
match graph {
GraphType::Mermaid => {
println!("{}", serde_graph.to_mermaid());
serde_graph.open_mermaid().unwrap();
}
GraphType::Dot => {
println!("{}", serde_graph.to_dot())
serde_graph.open_dot().unwrap();
}
GraphType::Json => {
unimplemented!();
Expand Down

0 comments on commit db9f270

Please sign in to comment.