Skip to content

Commit

Permalink
Get PN-counter benchmarks to run again
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 17, 2023
1 parent 8e95d13 commit 18ecdf1
Show file tree
Hide file tree
Showing 5 changed files with 388 additions and 58 deletions.
1 change: 0 additions & 1 deletion hydro_cli/src/core/hydroflow_crate/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ impl ServerConfig {

ServerConfig::TaggedUnwrap(underlying) => {
let loaded = underlying.load_instantiated(select).await;
dbg!(&loaded);
if let ServerPort::Tagged(underlying, _) = loaded {
*underlying
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct IncrementRequest {
likes: i32,
}

#[hydroflow::main]
#[tokio::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let mut start_node = ports
Expand Down
8 changes: 6 additions & 2 deletions hydro_cli_examples/topolotree_latency.hydro.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ def create_tree(depth, deployment, create_machine) -> Optional[Tree]:
right
)

async def run_experiment(deployment, machine_pool, experiment_id, summaries_file, tree_arg, depth_arg, clients_arg, is_gcp, gcp_vpc):
async def run_experiment(
deployment: hydro.Deployment,
machine_pool,
experiment_id, summaries_file, tree_arg, depth_arg, clients_arg, is_gcp, gcp_vpc):
assert tree_arg == "pn" or tree_arg == "pn_delta" or tree_arg == "once"
tree_depth = int(depth_arg)
is_tree = tree_arg == "topolo" # or "pn"
is_tree = tree_arg == "once"

num_replicas = 2 ** tree_depth - 1

Expand Down
56 changes: 2 additions & 54 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,52 +124,6 @@ fn run_topolotree(
let serialized = BytesMut::from(serde_json::to_string(&output).unwrap().as_str()).freeze();
output_send.send((target_neighbor, serialized)).unwrap();
});

// src
// -> map(|(from, _data)| from)
// -> enumerate()
// -> [0]cj1;

// source_iter(NEIGHBORS)
// -> persist()
// -> [1]cj1;

// cj1 = cross_join::<HalfMultisetJoinState>()
// -> filter(|((_req_id, from), to)| to != from)
// -> map(|((req_id, _from), to)| (to, req_id))
// -> [0]cj2;

// source_iter(NEIGHBORS)
// -> persist()
// -> [1]cj2;

// cj2 = cross_join::<HalfMultisetJoinState>()
// -> filter(|((to, _req_id), node_id)| node_id != to)
// -> map(|((to, req_id), node_id)| (node_id, (req_id, to)))
// -> [0]j;



// all_neighbor_data -> neighbors_and_myself;
// operations_input -> fold::<'static>(0, |agg: &mut i64, op: i64| *agg += op) -> map(|total| (my_id, total)) -> neighbors_and_myself;
// neighbors_and_myself = union();

// // Cross Join
// neighbors = source_iter(neighbors) -> persist();
// neighbors_and_myself -> [0]aggregated_data;
// neighbors -> [1]aggregated_data;

// // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function
// aggregated_data = cross_join_multiset()
// -> filter(|((src, (payload, tick)), dst)| src != dst)
// -> map(|((src, (payload, tick)), dst)| (dst, payload));

// aggregated_data
// -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze()))
// -> for_each(|x| {
// output_send.send(x).unwrap();
// });

}
}

Expand All @@ -195,19 +149,13 @@ async fn main() {
.into_sink();

let operations_send = ports
.port("input")
.port("operations")
// connect to the port with a single recipient
.connect::<ConnectedDirect>()
.await
.into_source();

let _increment_requests = ports
.port("increment_requests")
.connect::<ConnectedDirect>()
.await
.into_source();

let _query_responses = ports
let query_responses = ports
.port("query_responses")
.connect::<ConnectedDirect>()
.await
Expand Down
Loading

0 comments on commit 18ecdf1

Please sign in to comment.