Skip to content

Commit

Permalink
feat(hydroflow): prototype a functional surface syntax using staging
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 8, 2023
1 parent fc7d27d commit be8b0c6
Show file tree
Hide file tree
Showing 43 changed files with 2,013 additions and 59 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@
"INSTA_FORCE_PASS": "1"
}
}
]
],
"files.watcherExclude": {
"**/target": true
}
}
173 changes: 158 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ members = [
"hydroflow_datalog_core",
"hydroflow_lang",
"hydroflow_macro",
"hydroflow_plus",
"hydroflow_plus_kvs_flow",
"hydroflow_plus_kvs_macro",
"hydroflow_plus_kvs",
"hydroflow_plus_kvs_server",
"lattices",
"multiplatform_test",
"pusherator",
"relalg",
"stagefright",
"stagefright_macro",
"stagefright_tool",
"variadics",
"website_playground",
]
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/adjacency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) fn rga_adjacency(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
insertAfter = source_stream(input_recv) -> tee();

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/datalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
edges = source_stream(input_recv) -> tee();
insertAfter = edges -> map(|(c, p): (Token, Timestamp) | (c.ts, p)) -> tee();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/datalog_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog_agg(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
edges = source_stream(input_recv) -> tee();
insertAfter = edges -> map(|(c, p): (Token, Timestamp)| (c.ts, p)) -> tee();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/rga/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) fn rga_minimal(
input_recv: UnboundedReceiverStream<(Token, Timestamp)>,
rga_send: UnboundedSender<(Token, Timestamp)>,
_list_send: UnboundedSender<(Timestamp, Timestamp)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
hydroflow_syntax! {
insertAfter = source_stream(input_recv);

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/bp_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn bp_flow(
shopping_bp: impl Iterator<Item = (usize, BoundedPrefix<Request>)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/client_state_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) async fn client_state_flow(
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/listener_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) async fn listener_flow(
tuple_input: UdpStream,
bp_input: UdpStream,
ssiv_input: UdpStream,
) -> Hydroflow {
) -> Hydroflow<'static> {
// Simply print what we receive.
hydroflow_syntax! {
source_stream_serde(tuple_input)
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/orig_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) async fn orig_flow(
shopping: impl Iterator<Item = (usize, LineItem)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// This is the straightforward single-transducer sequential case.
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/push_group_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn push_group_flow(
shopping_ssiv: impl Iterator<Item = (usize, SealedSetOfIndexedValues<Request>)> + 'static,
out_addr: SocketAddr,
out: SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
) -> Hydroflow {
) -> Hydroflow<'static> {
let client_class = client_class_iter();

// First define some shorthand for the merge and bot of this lattice
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/shopping/flows/rep_server_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn rep_server_flow(
remote_addr: SocketAddr,
gossip_addr: SocketAddr,
server_addrs: impl Iterator<Item = SocketAddr> + 'static,
) -> Hydroflow {
) -> Hydroflow<'static> {
let (broadcast_out, broadcast_in, _) = hydroflow::util::bind_udp_bytes(gossip_addr).await;
let client_class = client_class_iter();
let ssiv_merge =
Expand Down
Loading

0 comments on commit be8b0c6

Please sign in to comment.