Skip to content

Commit

Permalink
fix(hydroflow): fix example kvs/kvs_replicated tests, improve com…
Browse files Browse the repository at this point in the history
…ments (#932)
  • Loading branch information
MingweiSamuel committed Oct 10, 2023
1 parent 13fab15 commit 3ac9b16
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 38 deletions.
7 changes: 0 additions & 7 deletions hydroflow/examples/kvs/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ fn test() {

client_stdin.write_all(b"PUT a,7\n").unwrap();

wait_for_process_output(
&mut client_output,
&mut client_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "7" \}"#,
);

let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example(
"kvs",
"--role client --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
Expand All @@ -89,7 +83,6 @@ fn test() {
wait_for_process_output(&mut client2_output, &mut client2_stdout, "Client live!");

client2_stdin.write_all(b"GET a\n").unwrap();

wait_for_process_output(
&mut client2_output,
&mut client2_stdout,
Expand Down
17 changes: 7 additions & 10 deletions hydroflow/examples/kvs/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,27 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
println!("Server live!");

let mut hf: Hydroflow = hydroflow_syntax! {
// network channels
network_send = union() -> dest_sink_serde(outbound);
// Setup network channels.
network_send = dest_sink_serde(outbound);
network_recv = source_stream_serde(inbound)
-> _upcast(Some(Delta))
-> map(Result::unwrap)
-> inspect(|(msg, addr)| println!("Message received {:?} from {:?}", msg, addr))
-> map(|(msg, addr)| KvsMessageWithAddr::from_message(msg, addr))
-> demux_enum::<KvsMessageWithAddr>();
puts = network_recv[Put] -> tee();
puts = network_recv[Put];
gets = network_recv[Get];

// ack puts
puts -> map(|(key, value, client_addr)| (KvsResponse { key, value }, client_addr)) -> [0]network_send;

// join PUTs and GETs by key
// Join PUTs and GETs by key, persisting the PUTs.
puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup;
gets -> [1]lookup;
lookup = join::<'static, 'tick>();

// network_send lookup responses back to the client address from the GET
lookup[1]
// Send GET responses back to the client address.
lookup
-> inspect(|tup| println!("Found a match: {:?}", tup))
-> map(|(key, (value, client_addr))| (KvsResponse { key, value }, client_addr))
-> [1]network_send;
-> network_send;
};

if let Some(graph) = opts.graph {
Expand Down
24 changes: 11 additions & 13 deletions hydroflow/examples/kvs_replicated/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ fn test() {
use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server_1, _, mut server_1_stdout) =
run_cargo_example("kvs", "--role server --addr 127.0.0.1:2051");
run_cargo_example("kvs_replicated", "--role server --addr 127.0.0.1:2051");

let (_client_1, mut client_1_stdin, mut client_1_stdout) = run_cargo_example(
"kvs",
"kvs_replicated",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

Expand All @@ -74,33 +74,31 @@ fn test() {

client_1_stdin.write_all(b"PUT a,7\n").unwrap();

wait_for_process_output(
&mut client_1_output,
&mut client_1_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "7" \}"#,
);

let (_server_2, _, mut server_2_stdout) = run_cargo_example(
"kvs",
"kvs_replicated",
"--role server --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
);

let (_client_2, mut client_2_stdin, mut client_2_stdout) = run_cargo_example(
"kvs",
"--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2051",
"kvs_replicated",
"--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2053",
);

let mut server_2_output = String::new();
wait_for_process_output(&mut server_2_output, &mut server_2_stdout, "Server live!");
wait_for_process_output(
&mut server_2_output,
&mut server_2_stdout,
r#"Message received PeerGossip \{ key: "a", value: "7" \} from 127\.0\.0\.1:2051"#,
);

let mut client_2_output = String::new();
wait_for_process_output(&mut client_2_output, &mut client_2_stdout, "Client live!");

client_2_stdin.write_all(b"GET a\n").unwrap();

wait_for_process_output(
&mut client_2_output,
&mut client_2_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "7" \}"#,
r#"Got a Response: ServerResponse \{ key: "a", value: "7" \}"#,
);
}
18 changes: 10 additions & 8 deletions hydroflow/examples/kvs_replicated/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
let peer_server = opts.server_addr;

let mut hf: Hydroflow = hydroflow_syntax! {
// Network channels
// Setup network channels.
network_send = union() -> dest_sink_serde(outbound);
network_recv = source_stream_serde(inbound)
-> _upcast(Some(Delta))
Expand All @@ -21,40 +21,42 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
-> demux_enum::<KvsMessageWithAddr>();
network_recv[ServerResponse] -> for_each(|(key, value, addr)| eprintln!("Unexpected server response {:?}->{:?} from {:?}", key, value, addr));
peers = network_recv[PeerJoin] -> map(|(peer_addr,)| peer_addr) -> tee();
network_recv[PeerGossip] -> writes;
network_recv[ClientPut] -> writes;
network_recv[PeerGossip] -> writes;
writes = union() -> tee();
gets = network_recv[ClientGet];

// Join as a peer if peer_server is set.
source_iter_delta(peer_server) -> map(|peer_addr| (KvsMessage::PeerJoin, peer_addr)) -> network_send;

// join PUTs and GETs by key
// Join PUTs and GETs by key
writes -> map(|(key, value, _addr)| (key, value)) -> writes_store;
writes_store = persist() -> tee();
writes_store -> [0]lookup;
gets -> [1]lookup;
lookup = join();
// network_send lookup responses back to the client address from the GET

// Send GET responses back to the client address.
lookup[1]
-> inspect(|tup| println!("Found a match: {:?}", tup))
-> map(|(key, (value, client_addr))| (KvsMessage::ServerResponse { key, value }, client_addr))
-> network_send;

// Join as a peer if peer_server is set.
source_iter_delta(peer_server) -> map(|peer_addr| (KvsMessage::PeerJoin, peer_addr)) -> network_send;

// Peers: When a new peer joins, send them all data.
writes_store -> [0]peer_join;
peers -> [1]peer_join;
peer_join = cross_join()
-> map(|((key, value), peer_addr)| (KvsMessage::PeerGossip { key, value }, peer_addr))
-> network_send;

// Outbound gossip. Send received PUTs to peers.
// Outbound gossip. Send updates to peers.
peers -> peer_store;
source_iter_delta(peer_server) -> peer_store;
peer_store = union() -> persist();
writes -> [0]outbound_gossip;
peer_store -> [1]outbound_gossip;
outbound_gossip = cross_join()
// Don't send gossip back to the sender.
-> filter(|((_key, _value, writer_addr), peer_addr)| writer_addr != peer_addr)
-> map(|((key, value, _writer_addr), peer_addr)| (KvsMessage::PeerGossip { key, value }, peer_addr))
-> network_send;
Expand Down

0 comments on commit 3ac9b16

Please sign in to comment.