Skip to content

Commit

Permalink
Make protocol keyed
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 17, 2023
1 parent aef0255 commit 3896168
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 109 deletions.
8 changes: 0 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ members = [
"pusherator",
"relalg",
"topolotree",
"topolotree_datatypes",
"variadics",
"website_playground",
]
Expand Down
1 change: 0 additions & 1 deletion topolotree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ path = "src/latency_measure.rs"
[dependencies]
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
hydroflow_datalog = { path = "../hydroflow_datalog" }
topolotree_datatypes = { path = "../topolotree_datatypes" }

tokio = { version = "1.16", features = [ "full" ] }
serde = { version = "1", features = ["rc"] }
Expand Down
13 changes: 5 additions & 8 deletions topolotree/src/latency_measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ use hydroflow::tokio;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -78,9 +75,9 @@ async fn main() {
let increment = rand::random::<bool>();
let start = Instant::now();
inc_sender
.send(serialize_to_bytes(IncrementRequest {
tweet_id: id,
likes: if increment { 1 } else { -1 },
.send(serialize_to_bytes(OperationPayload {
key: id,
change: if increment { 1 } else { -1 },
}))
.unwrap();

Expand Down
49 changes: 28 additions & 21 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
mod tests;

use std::cell::RefCell;
use std::fmt::Display;
use std::collections::HashMap;
use std::fmt::{Display, Debug};
use std::io;
use std::rc::Rc;

Expand All @@ -13,14 +14,16 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::cli::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
};
use topolotree_datatypes::{OperationPayload, Payload};

mod protocol;
use protocol::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct NodeID(pub u32);

impl Display for NodeID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
Display::fmt(&self.0, f)
}
}

Expand All @@ -38,7 +41,7 @@ fn run_topolotree(
// Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it
// but it would require at least one more join and one more cross join just specifically for the local timestamps
// Until we need it to be proper then we can take a shortcut and use rc refcell
let self_timestamp = Rc::new(RefCell::new(0));
let self_timestamp = Rc::new(RefCell::new(HashMap::<u64, isize>::new()));

let self_timestamp1 = Rc::clone(&self_timestamp);
let self_timestamp2 = Rc::clone(&self_timestamp);
Expand All @@ -47,31 +50,32 @@ fn run_topolotree(
hydroflow_syntax! {
from_neighbors = source_stream(input_recv)
-> map(Result::unwrap)
-> map(|(src, payload)| (NodeID(src), serde_json::from_slice(&payload[..]).unwrap()))
-> map(|(src, payload)| (NodeID(src), serde_json::from_slice::<Payload<i64>>(&payload[..]).unwrap()))
-> inspect(|(src, payload): &(NodeID, Payload<i64>)| println!("received from: {src}: payload: {payload:?}"));

from_neighbors
-> fold_keyed::<'static>(|| Payload { timestamp: -1, data: Default::default() }, |acc: &mut Payload<i64>, val: Payload<i64>| {
-> map(|(src, payload)| ((payload.key, src), (payload.key, payload.contents)))
-> fold_keyed::<'static>(|| Timestamped { timestamp: -1, data: Default::default() }, |acc: &mut Timestamped<i64>, (key, val): (u64, Timestamped<i64>)| {
if val.timestamp > acc.timestamp {
*acc = val;
*self_timestamp1.borrow_mut() += 1;
*self_timestamp1.borrow_mut().entry(key).or_insert(0) += 1;
}
})
-> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}"))
-> map(|(src, payload)| (Some(src), payload.data))
-> inspect(|(src, data)| println!("data from stream+key: {src:?}: data: {data:?}"))
-> map(|((key, src), payload)| ((key, Some(src)), payload.data))
-> from_neighbors_or_local;

local_value = source_stream(increment_requests)
-> map(Result::unwrap)
-> map(|change_payload: BytesMut| (serde_json::from_slice(&change_payload[..]).unwrap()))
-> inspect(|change_payload: &OperationPayload| println!("change: {change_payload:?}"))
-> inspect(|_| {
*self_timestamp2.borrow_mut() += 1;
-> inspect(|change| {
*self_timestamp2.borrow_mut().entry(change.key).or_insert(0) += 1;
})
-> map(|change_payload: OperationPayload| change_payload.change)
-> reduce::<'static>(|agg: &mut i64, change: i64| *agg += change);
-> map(|change_payload: OperationPayload| (change_payload.key, change_payload.change))
-> reduce_keyed::<'static>(|agg: &mut i64, change: i64| *agg += change);

local_value -> map(|data| (None, data)) -> from_neighbors_or_local;
local_value -> map(|(key, data)| ((key, None), data)) -> from_neighbors_or_local;

from_neighbors_or_local = union();
from_neighbors_or_local -> [0]all_neighbor_data;
Expand All @@ -83,20 +87,23 @@ fn run_topolotree(
neighbors -> [1]all_neighbor_data;

all_neighbor_data = cross_join_multiset()
-> filter(|((aggregate_from_this_guy, _), target_neighbor)| {
-> filter(|(((key, aggregate_from_this_guy), _), target_neighbor)| {
aggregate_from_this_guy.iter().all(|source| source != target_neighbor)
})
-> map(|((_, payload), target_neighbor)| {
(target_neighbor, payload)
-> map(|(((key, _), payload), target_neighbor)| {
((key, target_neighbor), payload)
})
-> fold_keyed(|| 0, |acc: &mut i64, data: i64| {
merge(acc, data);
})
-> inspect(|(target_neighbor, data)| println!("data from neighbors: {target_neighbor:?}: data: {data:?}"))
-> map(|(target_neighbor, data)| {
-> inspect(|((key, target_neighbor), data)| println!("data from key: {key:?}, neighbors: {target_neighbor:?}: data: {data:?}"))
-> map(|((key, target_neighbor), data)| {
(target_neighbor, Payload {
timestamp: *self_timestamp3.borrow(),
data
key: key,
contents: Timestamped {
timestamp: *self_timestamp3.borrow_mut().entry(key).or_insert(0),
data
}
})
})
-> for_each(|(target_neighbor, output): (NodeID, Payload<i64>)| {
Expand Down
25 changes: 11 additions & 14 deletions topolotree/src/pn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};
use hydroflow::{hydroflow_syntax, tokio};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

type NextStateType = (u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>);
type NextStateType = (u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);

#[derive(Serialize, Deserialize, Clone, Debug)]
enum GossipOrIncrement {
Gossip(Vec<NextStateType>),
Increment(u64, i32),
Increment(u64, i64),
}

#[hydroflow::main]
Expand Down Expand Up @@ -67,7 +64,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down Expand Up @@ -102,9 +99,9 @@ async fn main() {
let mut cur_value = cur_value.as_ref().borrow_mut();

if delta > 0 {
cur_value.0[my_id] += delta as u32;
cur_value.0[my_id] += delta as u64;
} else {
cur_value.1[my_id] += (-delta) as u32;
cur_value.1[my_id] += (-delta) as u64;
}

modified_tweets.insert(counter_id);
Expand All @@ -123,8 +120,8 @@ async fn main() {
-> next_state;

source_stream(increment_requests)
-> map(|x| deserialize_from_bytes::<IncrementRequest>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes))
-> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.key, t.change))
-> next_state;

all_peers = source_iter(0..num_replicas)
Expand All @@ -143,10 +140,10 @@ async fn main() {
a.into_iter().map(|(k, rc_array)| {
let rc_borrowed = rc_array.as_ref().borrow();
let (pos, neg) = rc_borrowed.deref();
(k, pos.iter().sum::<u32>() as i32 - neg.iter().sum::<u32>() as i32)
(k, pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64)
}).collect::<Vec<_>>()
})
-> map(serialize_to_bytes::<(u64, i32)>)
-> map(serialize_to_bytes::<(u64, i64)>)
-> dest_sink(query_responses);
};

Expand Down
27 changes: 12 additions & 15 deletions topolotree/src/pn_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};
use hydroflow::{hydroflow_syntax, tokio};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

#[derive(Serialize, Deserialize, Clone, Debug)]
enum GossipOrIncrement {
Gossip(Vec<(u64, (usize, u32, u32))>),
Increment(u64, i32),
Gossip(Vec<(u64, (usize, u64, u64))>),
Increment(u64, i64),
}

type NextStateType = (u64, bool, Rc<RefCell<(Vec<u32>, Vec<u32>)>>);
type NextStateType = (u64, bool, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);

#[hydroflow::main]
async fn main() {
Expand Down Expand Up @@ -67,7 +64,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down Expand Up @@ -99,9 +96,9 @@ async fn main() {
let mut cur_value = cur_value.as_ref().borrow_mut();

if delta > 0 {
cur_value.0[my_id] += delta as u32;
cur_value.0[my_id] += delta as u64;
} else {
cur_value.1[my_id] += (-delta) as u32;
cur_value.1[my_id] += (-delta) as u64;
}

*modified_tweets.entry(counter_id).or_insert(false) |= true;
Expand All @@ -120,8 +117,8 @@ async fn main() {
-> next_state;

source_stream(increment_requests)
-> map(|x| deserialize_from_bytes::<IncrementRequest>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes))
-> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.key, t.change))
-> next_state;

all_peers = source_iter(0..num_replicas)
Expand All @@ -144,10 +141,10 @@ async fn main() {
a.into_iter().map(|(k, _, rc_array)| {
let rc_borrowed = rc_array.as_ref().borrow();
let (pos, neg) = rc_borrowed.deref();
(k, pos.iter().sum::<u32>() as i32 - neg.iter().sum::<u32>() as i32)
(k, pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64)
}).collect::<Vec<_>>()
})
-> map(serialize_to_bytes::<(u64, i32)>)
-> map(serialize_to_bytes::<(u64, i64)>)
-> dest_sink(query_responses);
};

Expand Down
15 changes: 8 additions & 7 deletions topolotree_datatypes/src/lib.rs → topolotree/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use std::fmt::Debug;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct Payload<T: Debug> {
pub struct Timestamped<T: Debug> {
pub timestamp: isize,
pub data: T,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct Payload<T: Debug> {
pub key: u64,
pub contents: Timestamped<T>
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct OperationPayload {
pub key: u64,
pub change: i64,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
Loading

0 comments on commit 3896168

Please sign in to comment.