Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: anna v2 #1039

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};

use crate::helpers::parse_command;
use crate::protocol::ServerResp;
use crate::Opts;

pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Client live!");

let server_addr = opts.server_addr.unwrap();
let mut hf = hydroflow_syntax! {
// set up channels
outbound_chan = dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap);

// read in commands from stdin and forward to server
source_stdin()
-> filter_map(|line| parse_command(line.unwrap()))
-> map(|msg| { (msg, server_addr) })
-> outbound_chan;

// print inbound msgs
inbound_chan -> for_each(|(response, addr): (ServerResp, _)| println!("Got a Response: {:?} from: {:?}", response, addr));
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
25 changes: 25 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use regex::Regex;

use crate::protocol::ServerReq;

pub fn parse_command(line: String) -> Option<ServerReq> {
let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap();
let caps = re.captures(line.as_str())?;

let binding = caps.get(1).unwrap().as_str().to_uppercase();
let cmdstr = binding.as_str();
let args = caps.get(2).unwrap().as_str();
match cmdstr {
"PUT" => {
let kv = args.split_once(',')?;
Some(ServerReq::ClientPut {
key: kv.0.trim().to_string(),
value: kv.1.trim().to_string(),
})
}
"GET" => Some(ServerReq::ClientGet {
key: args.trim().to_string(),
}),
_ => None,
}
}
16 changes: 16 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/left_outer_join.hf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
lhs = mod[0] -> tee();
rhs = mod[1] -> tee();

lhs -> [0]joined;
rhs -> [1]joined;

joined = join_multiset() -> map(|(k, (lhs, rhs))| (k, (lhs, Some(rhs)))) -> combined;

lhs -> [pos]missed;
rhs -> map(|(k, _v)| k) -> [neg]missed;

missed = anti_join()
-> map(|(k, v)| (k, (v, None)))
-> combined;

combined = union() -> mod;
223 changes: 223 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::net::SocketAddr;
use std::pin::Pin;

Check failure on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `std::pin::Pin`

Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `std::pin::Pin`

Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `std::pin::Pin`

Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, --examples, pinned-nightly)

unused import: `std::pin::Pin`

use bytes::{Bytes, BytesMut};

Check failure on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `BytesMut`

Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `BytesMut`

Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `BytesMut`

Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, --examples, pinned-nightly)

unused import: `BytesMut`
use clap::{Parser, ValueEnum};
use client::run_client;
use futures::stream::{SplitSink, SplitStream};
use futures::task::noop_waker;

Check failure on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `futures::task::noop_waker`

Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `futures::task::noop_waker`

Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `futures::task::noop_waker`

Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, --examples, pinned-nightly)

unused import: `futures::task::noop_waker`
use futures::SinkExt;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::tokio;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve, TcpFramedStream};

Check failure on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `TcpFramedStream`

Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `TcpFramedStream`

Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `TcpFramedStream`

Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, --examples, pinned-nightly)

unused import: `TcpFramedStream`
use multiplatform_test::multiplatform_test;

Check failure on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`

Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`

Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`

Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, --examples, pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`
use server::run_server;
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::udp::UdpFramed;

use crate::protocol::{ServerReq, ServerResp};

mod client;
mod helpers;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: SocketAddr,
#[clap(long, value_parser = ipv4_resolve)]
server_addr: Option<SocketAddr>,
#[clap(long)]
graph: Option<WriteGraphType>,
#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
let addr = opts.addr;

match opts.role {
Role::Client => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Client is bound to {:?}", addr);
println!("Attempting to connect to server at {:?}", opts.server_addr);
run_client(outbound, inbound, opts).await;
}
Role::Server => {
run_server(opts.addr).await;
}
}
}

async fn send(
outbound: &mut SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
x: ServerReq,
addr: SocketAddr,
) {
outbound
.send((hydroflow::util::serialize_to_bytes(x), addr))
.await
.unwrap();
}

async fn read(inbound: &mut SplitStream<UdpFramed<LengthDelimitedCodec>>) -> ServerResp {
use futures::StreamExt;

let Some(Ok((bytes, src))) = inbound.next().await else {
panic!()
};

hydroflow::util::deserialize_from_bytes(bytes).unwrap()
}

// #[multiplatform_test(hydroflow, env_tracing)]
#[hydroflow::test]
async fn test_server() {
let server_addr_1 = "127.0.0.1:2098".parse().unwrap();
let server_addr_2: SocketAddr = "127.0.0.1:2096".parse().unwrap();

tokio::task::spawn_local(run_server(server_addr_1));
// tokio::task::spawn_local(run_server(server_addr_2));
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let (mut outbound, mut inbound, _) = bind_udp_bytes("127.0.0.1:0".parse().unwrap()).await;

send(
&mut outbound,
ServerReq::AddNode {
node_id: server_addr_1,
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::AddNode {
node_id: server_addr_2,
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// send(
// &mut outbound,
// ServerReq::RemoveNode {
// node_id: server_addr_2,
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::ClientPut {
key: "mykey".to_owned(),
value: "myval".to_owned(),
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// send(
// &mut outbound,
// ServerReq::ClientPut {
// key: "mykey".to_owned(),
// value: "myval2".to_owned(),
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::ClientGet {
key: "mykey".to_owned(),
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

println!("received from srv: {:?}", read(&mut inbound).await);

// send(
// &mut outbound,
// ServerReq::ClientGet {
// key: "mykey2".to_owned(),
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Need this sleep otherwise the last sent messages won't get processed before the whole process terminates.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}

#[test]
fn test() {
use std::io::Write;

use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server_1, _, mut server_1_stdout) =
run_cargo_example("kvs_replicated_v2", "--role server --addr 127.0.0.1:2051");

let mut server_1_output = String::new();
wait_for_process_output(&mut server_1_output, &mut server_1_stdout, "Server live!");

let (_client_1, mut client_1_stdin, mut client_1_stdout) = run_cargo_example(
"kvs_replicated_v2",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

let mut client_1_output = String::new();
wait_for_process_output(&mut client_1_output, &mut client_1_stdout, "Client live!");

client_1_stdin.write_all(b"PUT a,7\n").unwrap();

// let (_server_2, _, mut server_2_stdout) = run_cargo_example(
// "kvs_replicated_v2",
// "--role server --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
// );

// let (_client_2, mut client_2_stdin, mut client_2_stdout) = run_cargo_example(
// "kvs_replicated_v2",
// "--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2053",
// );

// let mut server_2_output = String::new();
// wait_for_process_output(&mut server_2_output, &mut server_2_stdout, "Server live!");
// wait_for_process_output(
// &mut server_2_output,
// &mut server_2_stdout,
// r#"Message received PeerGossip \{ key: "a", value: "7" \} from 127\.0\.0\.1:2051"#,
// );

// let mut client_2_output = String::new();
// wait_for_process_output(&mut client_2_output, &mut client_2_stdout, "Client live!");

// client_2_stdin.write_all(b"GET a\n").unwrap();
// wait_for_process_output(
// &mut client_2_output,
// &mut client_2_stdout,
// r#"Got a Response: ServerResponse \{ key: "a", value: "7" \}"#,
// );
}
Loading
Loading