diff --git a/.idea/hydroflow.iml b/.idea/hydroflow.iml
index fc353ac331a9..03944299ddf0 100644
--- a/.idea/hydroflow.iml
+++ b/.idea/hydroflow.iml
@@ -36,6 +36,9 @@
+
+
+
diff --git a/Cargo.lock b/Cargo.lock
index a53092a9c76c..d4d39f542cae 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -306,6 +306,12 @@ dependencies = [
"rustc-demangle",
]
+[[package]]
+name = "base64"
+version = "0.21.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
+
[[package]]
name = "benches"
version = "0.0.0"
@@ -342,6 +348,9 @@ name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
+dependencies = [
+ "serde",
+]
[[package]]
name = "block-buffer"
@@ -628,6 +637,26 @@ dependencies = [
"crossbeam-utils",
]
+[[package]]
+name = "config"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be"
+dependencies = [
+ "async-trait",
+ "convert_case",
+ "json5",
+ "lazy_static",
+ "nom 7.1.3",
+ "pathdiff",
+ "ron",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "toml",
+ "yaml-rust",
+]
+
[[package]]
name = "console"
version = "0.15.8"
@@ -651,6 +680,35 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "const-random"
+version = "0.1.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
+dependencies = [
+ "const-random-macro",
+]
+
+[[package]]
+name = "const-random-macro"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
+dependencies = [
+ "getrandom",
+ "once_cell",
+ "tiny-keccak",
+]
+
+[[package]]
+name = "convert_case"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca"
+dependencies = [
+ "unicode-segmentation",
+]
+
[[package]]
name = "core-foundation"
version = "0.9.4"
@@ -776,9 +834,9 @@ dependencies = [
[[package]]
name = "dashmap"
-version = "6.0.1"
+version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28"
+checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
@@ -836,6 +894,15 @@ dependencies = [
"crypto-common",
]
+[[package]]
+name = "dlv-list"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
+dependencies = [
+ "const-random",
+]
+
[[package]]
name = "dunce"
version = "1.0.5"
@@ -860,6 +927,15 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
+[[package]]
+name = "encoding_rs"
+version = "0.8.34"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
+dependencies = [
+ "cfg-if",
+]
+
[[package]]
name = "env_filter"
version = "0.1.2"
@@ -926,6 +1002,18 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
+[[package]]
+name = "filetime"
+version = "0.2.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "libredox",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "fnv"
version = "1.0.7"
@@ -958,9 +1046,9 @@ dependencies = [
[[package]]
name = "futures-channel"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
+checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@@ -968,9 +1056,9 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
+checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
@@ -985,9 +1073,9 @@ dependencies = [
[[package]]
name = "futures-io"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-lite"
@@ -1004,9 +1092,9 @@ dependencies = [
[[package]]
name = "futures-macro"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
@@ -1015,21 +1103,27 @@ dependencies = [
[[package]]
name = "futures-sink"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
+checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
+
+[[package]]
+name = "futures-timer"
+version = "3.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
+checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@@ -1087,6 +1181,70 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
+[[package]]
+name = "gossip_kv"
+version = "0.1.0"
+dependencies = [
+ "clap",
+ "config",
+ "governor",
+ "hostname",
+ "hydroflow",
+ "lattices",
+ "lazy_static",
+ "notify",
+ "prometheus",
+ "rand",
+ "serde",
+ "serde_json",
+ "shlex",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+ "uuid",
+ "warp",
+]
+
+[[package]]
+name = "governor"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f"
+dependencies = [
+ "cfg-if",
+ "dashmap",
+ "futures-sink",
+ "futures-timer",
+ "futures-util",
+ "no-std-compat",
+ "nonzero_ext",
+ "parking_lot 0.12.3",
+ "portable-atomic",
+ "quanta",
+ "rand",
+ "smallvec",
+ "spinning_top",
+]
+
+[[package]]
+name = "h2"
+version = "0.3.26"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
+dependencies = [
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "futures-util",
+ "http 0.2.12",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
[[package]]
name = "half"
version = "2.4.1"
@@ -1097,6 +1255,12 @@ dependencies = [
"crunchy",
]
+[[package]]
+name = "hashbrown"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
+
[[package]]
name = "hashbrown"
version = "0.14.5"
@@ -1113,6 +1277,30 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
+[[package]]
+name = "headers"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
+dependencies = [
+ "base64",
+ "bytes",
+ "headers-core",
+ "http 0.2.12",
+ "httpdate",
+ "mime",
+ "sha1",
+]
+
+[[package]]
+name = "headers-core"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
+dependencies = [
+ "http 0.2.12",
+]
+
[[package]]
name = "heck"
version = "0.4.1"
@@ -1152,6 +1340,17 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "hostname"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "windows",
+]
+
[[package]]
name = "http"
version = "0.2.12"
@@ -1163,12 +1362,40 @@ dependencies = [
"itoa",
]
+[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "http-body"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
+dependencies = [
+ "bytes",
+ "http 0.2.12",
+ "pin-project-lite",
+]
+
[[package]]
name = "httparse"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9"
+[[package]]
+name = "httpdate"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
+
[[package]]
name = "humantime"
version = "2.1.0"
@@ -1203,7 +1430,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
- "tokio-tungstenite",
+ "tokio-tungstenite 0.20.1",
]
[[package]]
@@ -1424,6 +1651,30 @@ dependencies = [
"stageleft_tool",
]
+[[package]]
+name = "hyper"
+version = "0.14.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http 0.2.12",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@@ -1509,6 +1760,26 @@ dependencies = [
"str_stack",
]
+[[package]]
+name = "inotify"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
+dependencies = [
+ "bitflags 1.3.2",
+ "inotify-sys",
+ "libc",
+]
+
+[[package]]
+name = "inotify-sys"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "insta"
version = "1.39.0"
@@ -1593,6 +1864,37 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "json5"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
+dependencies = [
+ "pest",
+ "pest_derive",
+ "serde",
+]
+
+[[package]]
+name = "kqueue"
+version = "1.0.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
+dependencies = [
+ "kqueue-sys",
+ "libc",
+]
+
+[[package]]
+name = "kqueue-sys"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
+dependencies = [
+ "bitflags 1.3.2",
+ "libc",
+]
+
[[package]]
name = "lattices"
version = "0.5.7"
@@ -1637,6 +1939,17 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
+[[package]]
+name = "libredox"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d"
+dependencies = [
+ "bitflags 2.6.0",
+ "libc",
+ "redox_syscall 0.5.3",
+]
+
[[package]]
name = "libssh2-sys"
version = "0.3.0"
@@ -1721,6 +2034,22 @@ dependencies = [
"autocfg",
]
+[[package]]
+name = "mime"
+version = "0.3.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+
+[[package]]
+name = "mime_guess"
+version = "2.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
+dependencies = [
+ "mime",
+ "unicase",
+]
+
[[package]]
name = "minicov"
version = "0.3.5"
@@ -1731,6 +2060,12 @@ dependencies = [
"walkdir",
]
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
[[package]]
name = "miniz_oxide"
version = "0.7.4"
@@ -1740,6 +2075,18 @@ dependencies = [
"adler",
]
+[[package]]
+name = "mio"
+version = "0.8.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
+dependencies = [
+ "libc",
+ "log",
+ "wasi",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "mio"
version = "1.0.2"
@@ -1752,6 +2099,24 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "multer"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
+dependencies = [
+ "bytes",
+ "encoding_rs",
+ "futures-util",
+ "http 0.2.12",
+ "httparse",
+ "log",
+ "memchr",
+ "mime",
+ "spin",
+ "version_check",
+]
+
[[package]]
name = "multiplatform_test"
version = "0.2.0"
@@ -1798,12 +2163,51 @@ dependencies = [
"libc",
]
+[[package]]
+name = "no-std-compat"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
+
[[package]]
name = "nom"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff"
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
+[[package]]
+name = "nonzero_ext"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
+
+[[package]]
+name = "notify"
+version = "6.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
+dependencies = [
+ "bitflags 2.6.0",
+ "filetime",
+ "inotify",
+ "kqueue",
+ "libc",
+ "log",
+ "mio 0.8.11",
+ "walkdir",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -1923,6 +2327,16 @@ dependencies = [
"vcpkg",
]
+[[package]]
+name = "ordered-multimap"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.13.2",
+]
+
[[package]]
name = "overload"
version = "0.1.1"
@@ -1989,12 +2403,63 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+[[package]]
+name = "pathdiff"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd"
+
[[package]]
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
+[[package]]
+name = "pest"
+version = "2.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9"
+dependencies = [
+ "memchr",
+ "thiserror",
+ "ucd-trie",
+]
+
+[[package]]
+name = "pest_derive"
+version = "2.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4d3a6e3394ec80feb3b6393c725571754c6188490265c61aaf260810d6b95aa0"
+dependencies = [
+ "pest",
+ "pest_generator",
+]
+
+[[package]]
+name = "pest_generator"
+version = "2.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94429506bde1ca69d1b5601962c73f4172ab4726571a59ea95931218cb0e930e"
+dependencies = [
+ "pest",
+ "pest_meta",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.75",
+]
+
+[[package]]
+name = "pest_meta"
+version = "2.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac8a071862e93690b6e34e9a5fb8e33ff3734473ac0245b27232222c4906a33f"
+dependencies = [
+ "once_cell",
+ "pest",
+ "sha2",
+]
+
[[package]]
name = "pin-project"
version = "1.1.5"
@@ -2145,10 +2610,31 @@ checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c"
dependencies = [
"byteorder",
"libc",
- "nom",
+ "nom 2.2.1",
"rustc_version",
]
+[[package]]
+name = "prometheus"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
+dependencies = [
+ "cfg-if",
+ "fnv",
+ "lazy_static",
+ "memchr",
+ "parking_lot 0.12.3",
+ "protobuf",
+ "thiserror",
+]
+
+[[package]]
+name = "protobuf"
+version = "2.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
+
[[package]]
name = "pusherator"
version = "0.0.8"
@@ -2255,6 +2741,21 @@ dependencies = [
"serde",
]
+[[package]]
+name = "quanta"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5"
+dependencies = [
+ "crossbeam-utils",
+ "libc",
+ "once_cell",
+ "raw-cpuid",
+ "wasi",
+ "web-sys",
+ "winapi",
+]
+
[[package]]
name = "quick-xml"
version = "0.26.0"
@@ -2313,6 +2814,15 @@ dependencies = [
"rand",
]
+[[package]]
+name = "raw-cpuid"
+version = "11.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0"
+dependencies = [
+ "bitflags 2.6.0",
+]
+
[[package]]
name = "rayon"
version = "1.10.0"
@@ -2437,6 +2947,28 @@ dependencies = [
"bytemuck",
]
+[[package]]
+name = "ron"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
+dependencies = [
+ "base64",
+ "bitflags 2.6.0",
+ "serde",
+ "serde_derive",
+]
+
+[[package]]
+name = "rust-ini"
+version = "0.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091"
+dependencies = [
+ "cfg-if",
+ "ordered-multimap",
+]
+
[[package]]
name = "rust-sitter"
version = "0.4.3"
@@ -2648,6 +3180,18 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
[[package]]
name = "sha1"
version = "0.10.6"
@@ -2760,6 +3304,21 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "spin"
+version = "0.9.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
+
+[[package]]
+name = "spinning_top"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
+dependencies = [
+ "lock_api",
+]
+
[[package]]
name = "ssh2"
version = "0.9.4"
@@ -3036,6 +3595,15 @@ dependencies = [
"timely-logging-master",
]
+[[package]]
+name = "tiny-keccak"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
+dependencies = [
+ "crunchy",
+]
+
[[package]]
name = "tinytemplate"
version = "1.2.1"
@@ -3070,7 +3638,7 @@ dependencies = [
"backtrace",
"bytes",
"libc",
- "mio",
+ "mio 1.0.2",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
@@ -3111,7 +3679,19 @@ dependencies = [
"futures-util",
"log",
"tokio",
- "tungstenite",
+ "tungstenite 0.20.1",
+]
+
+[[package]]
+name = "tokio-tungstenite"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite 0.21.0",
]
[[package]]
@@ -3187,12 +3767,19 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tower-service"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
[[package]]
name = "tracing"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
+ "log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -3304,6 +3891,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8ddffe35a0e5eeeadf13ff7350af564c6e73993a24db62caee1822b185c2600"
+[[package]]
+name = "try-lock"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+
[[package]]
name = "try_match"
version = "0.4.2"
@@ -3361,7 +3954,26 @@ dependencies = [
"byteorder",
"bytes",
"data-encoding",
- "http",
+ "http 0.2.12",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
+[[package]]
+name = "tungstenite"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "data-encoding",
+ "http 1.1.0",
"httparse",
"log",
"rand",
@@ -3377,6 +3989,21 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
+[[package]]
+name = "ucd-trie"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
+
+[[package]]
+name = "unicase"
+version = "2.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
+dependencies = [
+ "version_check",
+]
+
[[package]]
name = "unicode-bidi"
version = "0.3.15"
@@ -3398,6 +4025,12 @@ dependencies = [
"tinyvec",
]
+[[package]]
+name = "unicode-segmentation"
+version = "1.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
+
[[package]]
name = "unicode-width"
version = "0.1.13"
@@ -3440,6 +4073,15 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
+[[package]]
+name = "uuid"
+version = "1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
+dependencies = [
+ "getrandom",
+]
+
[[package]]
name = "valuable"
version = "0.1.0"
@@ -3490,6 +4132,44 @@ dependencies = [
"winapi-util",
]
+[[package]]
+name = "want"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
+dependencies = [
+ "try-lock",
+]
+
+[[package]]
+name = "warp"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "headers",
+ "http 0.2.12",
+ "hyper",
+ "log",
+ "mime",
+ "mime_guess",
+ "multer",
+ "percent-encoding",
+ "pin-project",
+ "scoped-tls",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "tokio",
+ "tokio-tungstenite 0.21.0",
+ "tokio-util",
+ "tower-service",
+ "tracing",
+]
+
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@@ -3679,6 +4359,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+[[package]]
+name = "windows"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
+dependencies = [
+ "windows-core",
+ "windows-targets 0.52.6",
+]
+
[[package]]
name = "windows-core"
version = "0.52.0"
@@ -3920,6 +4610,15 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "yaml-rust"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
+dependencies = [
+ "linked-hash-map",
+]
+
[[package]]
name = "zerocopy"
version = "0.7.35"
diff --git a/Cargo.toml b/Cargo.toml
index 8b9deaa71ad9..ab8fef76bc85 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,6 +3,7 @@
# See "Adding new crates" and "Moving crates" addendum sections in `RELEASING.md`
members = [
"benches",
+ "datastores/gossip_kv",
"hydro_deploy/core",
"hydro_deploy/hydro_cli",
"hydro_deploy/hydro_cli_examples",
diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml
new file mode 100644
index 000000000000..b59765578698
--- /dev/null
+++ b/datastores/gossip_kv/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+description = "Gossip KV Library"
+name = "gossip_kv"
+version = "0.1.0"
+edition = "2021"
+license = "Apache-2.0"
+publish = false
+
+[dependencies]
+clap = { version = "4.5.4", features = ["derive"] }
+config = "0.14.0"
+governor = "0.7.0"
+hostname = "0.4.0"
+hydroflow = { path="../../hydroflow" }
+lattices = { path = '../../lattices'}
+lazy_static = "1.5.0"
+# The specific set of features for Notify are picked to disable the default cross-beam channels (cause problems with
+# tokio) and use std channels. See docs for more information: https://docs.rs/notify/6.1.1/notify/
+notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
+prometheus = "0.13.4"
+rand = "0.8.5"
+serde = "1.0.203"
+serde_json = "1.0.117"
+shlex = "1.3.0"
+tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] }
+tracing = "0.1.40"
+tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
+uuid = { version = "1.9.1", features = ["v4"] }
+warp = "0.3.7"
+
+[[bin]]
+name = "gossip_server"
+path = "server/main.rs"
+
+[[bin]]
+name = "load_test_server"
+path = "load_test_server/server.rs"
+
+[[bin]]
+name = "gossip_cli"
+path = "cli/main.rs"
+
+[lib]
+name = "gossip_kv"
+path = "kv/lib.rs"
\ No newline at end of file
diff --git a/datastores/gossip_kv/Makefile b/datastores/gossip_kv/Makefile
new file mode 100644
index 000000000000..dbfe144f0d78
--- /dev/null
+++ b/datastores/gossip_kv/Makefile
@@ -0,0 +1,83 @@
+# Makefile
+
+## Minikube Options.
+MINIKUBE_DISK_SIZE:=100g
+MINIKUBE_CPUS:=16
+MINIKUBE_MEMORY:=32768
+
+BASE_IMAGE_VERSION:=latest
+SERVER_IMAGE_VERSION:=latest
+CLI_IMAGE_VERSION:=latest
+LOAD_TEST_IMAGE_VERSION:=latest
+
+# Docker Image Tags
+BASE_IMAGE_TAG:=hydroflow-gossip-kv-base-image:$(BASE_IMAGE_VERSION)
+SERVER_IMAGE_TAG:=hydroflow-gossip-kv-server:$(SERVER_IMAGE_VERSION)
+CLI_IMAGE_TAG:=hydroflow-gossip-kv-cli:$(CLI_IMAGE_VERSION)
+LOAD_TEST_IMAGE_TAG:=hydroflow-gossip-kv-load-test:$(LOAD_TEST_IMAGE_VERSION)
+
+AWS_TERRAFORM_PATH=../../datastores/gossip_kv/deployment/aws/terraform
+
+# Target to start Minikube with specific options
+start_minikube:
+ minikube start --disk-size=$(MINIKUBE_DISK_SIZE) --cpus=$(MINIKUBE_CPUS) --memory=$(MINIKUBE_MEMORY)
+ @echo "Please run 'eval \$$(minikube docker-env)' to use the Minikube Docker daemon"
+
+# Target to build the Docker images
+build_docker_images: build_base_image build_server_image build_cli_image build_load_test_image
+
+build_base_image:
+ docker build -t "$(BASE_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/baseimage.Dockerfile ../..
+
+build_server_image:
+ docker build -t "$(SERVER_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/Dockerfile ../..
+
+build_cli_image:
+ docker build -t "$(CLI_IMAGE_TAG)" -f ../../datastores/gossip_kv/cli/Dockerfile ../..
+
+build_load_test_image:
+ docker build -t "$(LOAD_TEST_IMAGE_TAG)" -f ../../datastores/gossip_kv/load_test_server/Dockerfile ../..
+
+# Target to clean up the Minikube cluster
+clean_local:
+ minikube delete
+
+# Target to deploy the Gossip KV Server to the Minikube cluster
+deploy_local:
+ kubectl apply -f ../../datastores/gossip_kv/server/local
+
+# Target to delete the Minikube cluster and build again
+rebuild_local: clean_local start_minikube build_docker_images
+
+aws_terraform_init:
+ terraform -chdir="$(AWS_TERRAFORM_PATH)" init
+
+aws_terraform_apply:
+ terraform -chdir="$(AWS_TERRAFORM_PATH)" apply
+
+aws_setup_kubectl:
+ @echo "Setting up kubectl to work with AWS EKS Cluster"
+ aws eks update-kubeconfig --region $$(terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw region) --name $$(terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw cluster_name)
+
+aws_upload_docker_images: build_docker_images
+ $(eval SERVER_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_server"]'))
+ $(eval CLI_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_cli"]'))
+ $(eval LOAD_TEST_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_load_test"]'))
+ $(eval REGION := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw region))
+ docker tag $(SERVER_IMAGE_TAG) $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION)
+ docker tag $(CLI_IMAGE_TAG) $(CLI_REPO_URL):$(CLI_IMAGE_VERSION)
+ docker tag $(LOAD_TEST_IMAGE_TAG) $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION)
+ aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(SERVER_REPO_URL)
+ docker push $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION)
+ aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(CLI_REPO_URL)
+ docker push $(CLI_REPO_URL):$(CLI_IMAGE_VERSION)
+ aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(LOAD_TEST_REPO_URL)
+ docker push $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION)
+
+aws_tunnel_grafana:
+ $(eval GRAFANA_PORT := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw grafana_port))
+ kubectl port-forward svc/grafana $(GRAFANA_PORT):$(GRAFANA_PORT)
+
+aws_tunnel_prometheus:
+ $(eval PROMETHEUS_PORT := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw prometheus_port))
+ kubectl port-forward svc/prometheus $(PROMETHEUS_PORT):$(PROMETHEUS_PORT)
\ No newline at end of file
diff --git a/datastores/gossip_kv/README.md b/datastores/gossip_kv/README.md
new file mode 100644
index 000000000000..b282dafe0f1b
--- /dev/null
+++ b/datastores/gossip_kv/README.md
@@ -0,0 +1,83 @@
+# Gossip Key-Value Store
+
+# Architecture
+A gossip-based key-value store library.
+
+```
+┌─────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐
+│ Process │ │ Process │ │ Process │ │ Process │
+│ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │
+│ │ User Application │ │ │ │ User Application │ │ │ │ User Application │ │ │ │ User Application │ │
+│ └────────▲──┬─────────┘ │ │ └─────────▲─┬─────────┘ │ │ └─────────▲─┬─────────┘ │ │ └─────────▲─┬─────────┘ │
+│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
+│ ┌────────┴──▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │
+│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │
+│ └─────────────────────┘ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │
+└─────────────────────────┘ └─────────────────────────┘ └─────────────────────────┘ └─────────────────────────┘
+```
+
+## Data Model
+TODO: Elaborate
+* User Application manipulate data using client library
+* Replicated to all members of the gossip cluster
+* Eventually consistent
+
+```json
+{
+ "sys": {
+ "members": {
+ "member_id": {
+ "port": 1234,
+ "protocol": "v1"
+ }
+ }
+ },
+ "usr": {
+ "key 1": "value 1",
+ "key 2": "value 2"
+ }
+}
+```
+Data in divided into two sections: A `sys` section that contains system data used by the key-value store itself. The
+`usr` section contains user-defined data.
+
+### `sys` Data
+The `sys` data section contains system data / state that is required by the key-value store to do it's work.
+
+#### Fields
+
+### `usr` Data
+
+## Protocol
+
+## Checkpoints
+
+# Running Locally Using Minikube
+## Install Docker Desktop
+```shell
+brew install --cask docker
+```
+### Run docker (macOS)
+```
+open -a Docker
+```
+
+## Install Minikube
+Read more [here](https://minikube.sigs.k8s.io/docs/start/)
+```shell
+brew install minikube
+```
+
+## Start Minikube
+```shell
+minikube start
+```
+
+## Install `kubectl`
+```shell
+brew install kubectl
+```
+## Configure Minikube to use your Docker Environment
+```shell
+eval $(minikube -p minikube docker-env)
+```
\ No newline at end of file
diff --git a/datastores/gossip_kv/cli/Dockerfile b/datastores/gossip_kv/cli/Dockerfile
new file mode 100644
index 000000000000..208948c50766
--- /dev/null
+++ b/datastores/gossip_kv/cli/Dockerfile
@@ -0,0 +1,12 @@
+FROM "hydroflow-gossip-kv-base-image:latest" AS builder
+WORKDIR /usr/src/gossip-kv-server
+COPY . .
+RUN find .
+RUN cargo build --release --workspace -p gossip_kv
+
+FROM rustlang/rust:nightly-slim
+COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_cli /usr/local/bin/gossip_cli
+
+RUN apt-get update && apt-get install -y \
+ dnsutils \
+ && rm -rf /var/lib/apt/lists/*
diff --git a/datastores/gossip_kv/cli/main.rs b/datastores/gossip_kv/cli/main.rs
new file mode 100644
index 000000000000..8834da9ce459
--- /dev/null
+++ b/datastores/gossip_kv/cli/main.rs
@@ -0,0 +1,119 @@
+use std::net::SocketAddr;
+
+use clap::{CommandFactory, Parser, Subcommand};
+use gossip_kv::{ClientRequest, ClientResponse, Key};
+use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
+use hydroflow::{hydroflow_syntax, tokio, DemuxEnum};
+use tracing::error;
+
+/// CLI program to interact with Layer 0 gossip store.
+#[derive(Debug, Parser)]
+struct Opts {
+ #[clap(short, long, help = "Server address to connect to.")]
+ server_address: Option,
+}
+
+/// Dummy app for using clap to process commands for interactive CLI.
+#[derive(Debug, Parser)]
+#[command(multicall = true)]
+struct InteractiveApp {
+ #[clap(subcommand)]
+ commands: InteractiveCommands,
+}
+
+#[derive(Debug, Subcommand, DemuxEnum)]
+enum InteractiveCommands {
+ /// Get a value from the store.
+ Get {
+ #[arg(value_parser = parse_key, required = true, help = "Key to get")]
+ key: Key,
+ },
+ /// Upsert a value in the store.
+ Set {
+ #[arg(value_parser = parse_key, required = true, help = "Key to set")]
+ key: Key,
+ value: String,
+ },
+ /// Delete a value from the store.
+ Delete {
+ #[arg(value_parser = parse_key, required = true, help = "Key to delete")]
+ key: Key,
+ },
+ /// Exit the application.
+ Exit,
+}
+
+/// Allows clap to parse Keys from user input.
+fn parse_key(s: &str) -> Result {
+ s.parse::().map_err(|e| e.to_string())
+}
+
+/// Parse a command from a line of input.
+fn parse_command(line: String) -> Option {
+ // Override how help is handled.
+ if line.trim() == "help" {
+ InteractiveApp::command()
+ .help_template("\nAvailable Commands: \n{subcommands}")
+ .print_help()
+ .unwrap();
+ return None;
+ }
+
+ // Split quoted string into parts.
+ let line_parts = shlex::split(&line);
+
+ if line_parts.is_none() {
+ error!("\nUnable to parse command.");
+ return None;
+ }
+
+ // Provide split parts to clap to process.
+ let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap());
+
+ match maybe_parsed {
+ Err(e) => {
+ // Problem with the parsed result. This displays some help.
+ error!("\n{}", e);
+ None
+ }
+ Ok(cli) => Some(cli.commands),
+ }
+}
+
+#[hydroflow::main]
+async fn main() {
+ tracing_subscriber::fmt::init();
+
+ let opts = Opts::parse();
+
+ // Bind to OS-assigned port on localhost.
+ let address = ipv4_resolve("0.0.0.0:0").unwrap();
+
+ // Default to localhost:3000 if not provided.
+ let server_address = opts.server_address.map_or_else(
+ || ipv4_resolve("localhost:3001").unwrap(),
+ |s| ipv4_resolve(&s).unwrap(),
+ );
+
+ // Setup UDP sockets for communication.
+ let (outbound, inbound, _) = bind_udp_bytes(address).await;
+
+ let mut cli = hydroflow_syntax! {
+ inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response));
+
+ outbound_messages = union() -> dest_sink_serde(outbound);
+
+ // Parse commands from stdin.
+ commands = source_stdin()
+ -> filter_map(|line| parse_command(line.unwrap()))
+ -> demux_enum::();
+
+ commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages;
+ commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages;
+ commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages;
+ commands[Exit] -> for_each(|()| std::process::exit(0)); // TODO: Graceful shutdown https://github.com/hydro-project/hydroflow/issues/1253
+
+ };
+
+ cli.run_async().await;
+}
diff --git a/datastores/gossip_kv/deployment/aws/terraform/.gitignore b/datastores/gossip_kv/deployment/aws/terraform/.gitignore
new file mode 100644
index 000000000000..3fa8c86b7b04
--- /dev/null
+++ b/datastores/gossip_kv/deployment/aws/terraform/.gitignore
@@ -0,0 +1 @@
+.terraform
diff --git a/datastores/gossip_kv/deployment/aws/terraform/main.tf b/datastores/gossip_kv/deployment/aws/terraform/main.tf
new file mode 100644
index 000000000000..47799f259c45
--- /dev/null
+++ b/datastores/gossip_kv/deployment/aws/terraform/main.tf
@@ -0,0 +1,561 @@
+provider "aws" {
+ region = var.region
+}
+
+# Filter out local zones, which are not currently supported
+# with managed node groups
+data "aws_availability_zones" "available" {
+ filter {
+ name = "opt-in-status"
+ values = ["opt-in-not-required"]
+ }
+}
+
+data "aws_caller_identity" "current" {}
+
+locals {
+ cluster_name = "anna-load-test-${random_string.suffix.result}"
+ account_id = data.aws_caller_identity.current.account_id
+}
+
+resource "random_string" "suffix" {
+ length = 8
+ special = false
+}
+
+module "vpc" {
+ source = "terraform-aws-modules/vpc/aws"
+ version = "5.8.1"
+
+ name = "anna-load-test-vpc"
+
+ cidr = "10.0.0.0/16"
+ azs = slice(data.aws_availability_zones.available.names, 0, 3)
+
+ map_public_ip_on_launch = true
+ public_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
+
+ enable_dns_hostnames = true
+
+ public_subnet_tags = {
+ "kubernetes.io/role/elb" = 1
+ }
+}
+
+module "eks_cluster" {
+ source = "terraform-aws-modules/eks/aws"
+ version = "20.24.3"
+
+ cluster_name = local.cluster_name
+ cluster_version = "1.31"
+
+ cluster_endpoint_public_access = true
+ enable_cluster_creator_admin_permissions = true
+
+ cluster_addons = {
+ aws-ebs-csi-driver = {
+ service_account_role_arn = module.irsa-ebs-csi.iam_role_arn
+ }
+ }
+
+ vpc_id = module.vpc.vpc_id
+ subnet_ids = module.vpc.public_subnets
+
+ eks_managed_node_group_defaults = {
+ ami_type = "AL2_x86_64"
+ }
+
+ eks_managed_node_groups = {
+ one = {
+ name = "servers"
+
+ instance_types = [var.instance_type]
+
+ min_size = 1
+ max_size = 3
+ desired_size = 2
+ }
+ }
+}
+
+# https://aws.amazon.com/blogs/containers/amazon-ebs-csi-driver-is-now-generally-available-in-amazon-eks-add-ons/
+data "aws_iam_policy" "ebs_csi_policy" {
+ arn = "arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy"
+}
+
+module "irsa-ebs-csi" {
+ source = "terraform-aws-modules/iam/aws//modules/iam-assumable-role-with-oidc"
+ version = "5.39.0"
+
+ create_role = true
+ role_name = "AmazonEKSTFEBSCSIRole-${module.eks_cluster.cluster_name}"
+ provider_url = module.eks_cluster.oidc_provider
+ role_policy_arns = [data.aws_iam_policy.ebs_csi_policy.arn]
+ oidc_fully_qualified_subjects = ["system:serviceaccount:kube-system:ebs-csi-controller-sa"]
+}
+
+variable "ecr_repositories" {
+ description = "List of ECR repository names"
+ type = list(string)
+ default = ["gossip_kv_server", "gossip_kv_cli", "gossip_kv_load_test"]
+}
+
+module "ecr" {
+ source = "terraform-aws-modules/ecr/aws"
+ version = "2.3.0"
+
+ for_each = {for repo in var.ecr_repositories : repo => repo}
+ repository_name = each.value
+
+ repository_read_write_access_arns = [data.aws_caller_identity.current.arn]
+ repository_lifecycle_policy = jsonencode({
+ rules = [
+ {
+ rulePriority = 1,
+ description = "Keep last 30 images",
+ selection = {
+ tagStatus = "tagged",
+ tagPrefixList = ["v"],
+ countType = "imageCountMoreThan",
+ countNumber = 30
+ },
+ action = {
+ type = "expire"
+ }
+ }
+ ]
+ })
+
+ repository_image_tag_mutability = "MUTABLE"
+ tags = {
+ Terraform = "true"
+ Environment = "dev"
+ }
+}
+
+provider "kubernetes" {
+ host = module.eks_cluster.cluster_endpoint
+ cluster_ca_certificate = base64decode(module.eks_cluster.cluster_certificate_authority_data)
+ exec {
+ api_version = "client.authentication.k8s.io/v1beta1"
+ command = "aws"
+ args = [
+ "eks",
+ "get-token",
+ "--cluster-name",
+ module.eks_cluster.cluster_name,
+ ]
+ }
+}
+
+resource "kubernetes_stateful_set" "gossip_kv_seed_nodes" {
+ metadata {
+ name = "gossip-kv-seed-nodes"
+ labels = {
+ app = "gossip-kv-seed-nodes"
+ }
+ }
+
+ spec {
+ service_name = "gossip-kv-seed-nodes"
+ replicas = 1
+
+ selector {
+ match_labels = {
+ app = "gossip-kv-seed-nodes"
+ }
+ }
+
+ template {
+ metadata {
+ labels = {
+ app = "gossip-kv-seed-nodes"
+ }
+ annotations = {
+ "prometheus.io/scrape" : "true"
+ "prometheus.io/port" : var.pod_monitoring_port
+ }
+ }
+
+ spec {
+ termination_grace_period_seconds = 5
+
+ container {
+ name = "gossip-kv-server"
+ image = "${module.ecr.gossip_kv_load_test.repository_url}:latest"
+ image_pull_policy = "Always"
+
+ env {
+ name = "RUST_LOG"
+ value = "trace"
+ }
+
+ env {
+ name = "RUST_BACKTRACE"
+ value = "full"
+ }
+
+ port {
+ container_port = 3001
+ protocol = "UDP"
+ }
+
+ port {
+ container_port = var.pod_monitoring_port
+ protocol = "TCP"
+ }
+
+ volume_mount {
+ name = "gossip-kv-dynamic-config"
+ mount_path = "/config/dynamic"
+ }
+ }
+
+ volume {
+ name = "gossip-kv-dynamic-config"
+
+ config_map {
+ name = "gossip-kv-dynamic-config"
+ }
+ }
+ }
+ }
+ }
+}
+
+resource "kubernetes_deployment" "gossip_kv_cli" {
+ metadata {
+ name = "gossip-kv-cli"
+ labels = {
+ app = "gossip-kv-cli"
+ }
+ }
+
+ spec {
+ replicas = 1
+
+ selector {
+ match_labels = {
+ app = "gossip-kv-cli"
+ }
+ }
+
+ template {
+ metadata {
+ labels = {
+ app = "gossip-kv-cli"
+ }
+ }
+
+ spec {
+ termination_grace_period_seconds = 5
+
+ container {
+ name = "gossip-kv-cli"
+ image = "${module.ecr.gossip_kv_cli.repository_url}:latest"
+ image_pull_policy = "Always"
+ command = ["/bin/sh"]
+ args = ["-c", "while true; do sleep 3600; done"]
+ tty = true
+
+ env {
+ name = "RUST_LOG"
+ value = "info"
+ }
+ }
+ }
+ }
+ }
+}
+
+resource "kubernetes_service" "gossip_kv_seed_nodes" {
+ metadata {
+ name = "gossip-kv-seed-nodes"
+ labels = {
+ app = "gossip-kv-seed-nodes"
+ }
+ }
+
+ spec {
+ cluster_ip = "None"
+ selector = {
+ app = "gossip-kv-seed-nodes"
+ }
+
+ port {
+ port = 3001
+ target_port = 3001
+ protocol = "UDP"
+ }
+ }
+}
+
+resource "kubernetes_config_map" "gossip_kv_dynamic_config" {
+ metadata {
+ name = "gossip-kv-dynamic-config"
+ }
+
+ data = {
+ "dynamic.toml" = < details.repository_url }
+}
+
+output "grafana_port" {
+ description = "Port for Grafana UI"
+ value = var.grafana_port
+}
+
+output "prometheus_port" {
+ description = "Port for Prometheus UI"
+ value = var.prometheus_port
+}
\ No newline at end of file
diff --git a/datastores/gossip_kv/deployment/aws/terraform/terraform.tf b/datastores/gossip_kv/deployment/aws/terraform/terraform.tf
new file mode 100644
index 000000000000..6b54c550a221
--- /dev/null
+++ b/datastores/gossip_kv/deployment/aws/terraform/terraform.tf
@@ -0,0 +1,30 @@
+terraform {
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = "~> 5.70.0"
+ }
+
+ random = {
+ source = "hashicorp/random"
+ version = "~> 3.6.1"
+ }
+
+ tls = {
+ source = "hashicorp/tls"
+ version = "~> 4.0.5"
+ }
+
+ cloudinit = {
+ source = "hashicorp/cloudinit"
+ version = "~> 2.3.4"
+ }
+
+ kubernetes = {
+ source = "hashicorp/kubernetes"
+ version = ">= 2.32.0"
+ }
+ }
+
+ required_version = "~> 1.3"
+}
\ No newline at end of file
diff --git a/datastores/gossip_kv/deployment/aws/terraform/variables.tf b/datastores/gossip_kv/deployment/aws/terraform/variables.tf
new file mode 100644
index 000000000000..fb8ff7995be5
--- /dev/null
+++ b/datastores/gossip_kv/deployment/aws/terraform/variables.tf
@@ -0,0 +1,29 @@
+variable "region" {
+ description = "AWS region where the resources will be created"
+ type = string
+ default = "us-east-2"
+}
+
+variable "instance_type" {
+ description = "Instance type for the EC2 instances"
+ type = string
+ default = "t3.small"
+}
+
+variable "grafana_port" {
+ description = "Port for Grafana UI"
+ type = number
+ default = 4001
+}
+
+variable "prometheus_port" {
+ description = "Port for Prometheus UI"
+ type = number
+ default = 4002
+}
+
+variable "pod_monitoring_port" {
+ description = "Port for monitoring pods using prometheus. Every pod runs a prometheus exporter on this port."
+ type = number
+ default = 4003
+}
\ No newline at end of file
diff --git a/datastores/gossip_kv/deployment/local/objects.yaml b/datastores/gossip_kv/deployment/local/objects.yaml
new file mode 100644
index 000000000000..7421df268e1e
--- /dev/null
+++ b/datastores/gossip_kv/deployment/local/objects.yaml
@@ -0,0 +1,90 @@
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: gossip-kv-seed-nodes
+ labels:
+ app: gossip-kv-seed-nodes
+spec:
+ replicas: 3
+ serviceName: gossip-kv-seed-nodes
+ selector:
+ matchLabels:
+ app: gossip-kv-seed-nodes
+ template:
+ metadata:
+ labels:
+ app: gossip-kv-seed-nodes
+ spec:
+ terminationGracePeriodSeconds: 5 # Really aggressive, but makes teardown faster. Not recommended beyond benchmarking.
+ containers:
+ - name: gossip-kv-server
+ image: docker.io/hydroflow/gossip-kv-server:latest
+ imagePullPolicy: IfNotPresent
+# Uncomment the following for debugging
+# command: [ "/bin/sh" ]
+# args: [ "-c", "while true; do sleep 3600; done" ]
+ env:
+ - name: RUST_LOG
+ value: "trace"
+ - name: RUST_BACKTRACE
+ value: "full"
+ ports:
+ - containerPort: 3001
+ protocol: UDP
+ volumeMounts:
+ - name: gossip-kv-dynamic-config
+ mountPath: /config/dynamic
+ volumes:
+ - name: gossip-kv-dynamic-config
+ configMap:
+ name: gossip-kv-dynamic-config
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: gossip-kv-cli
+ labels:
+ app: gossip-kv-cli
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: gossip-kv-cli
+ template:
+ metadata:
+ labels:
+ app: gossip-kv-cli
+ spec:
+ terminationGracePeriodSeconds: 5 # Really aggressive, but makes teardown faster. Not recommended beyond benchmarking.
+ containers:
+ - name: gossip-kv-cli
+ image: docker.io/hydroflow/gossip-kv-cli:latest
+ imagePullPolicy: IfNotPresent
+ command: ["/bin/sh"]
+ args: ["-c", "while true; do sleep 3600; done"]
+ tty: true
+ env:
+ - name: RUST_LOG
+ value: "info"
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: gossip-kv-seed-nodes
+ labels:
+ app: gossip-kv-seed-nodes
+spec:
+ ports:
+ - port: 3001
+ targetPort: 3001
+ protocol: UDP
+ clusterIP: None
+ selector:
+ app: gossip-kv-seed-nodes
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: gossip-kv-dynamic-config
+data:
+ dynamic.toml: |
\ No newline at end of file
diff --git a/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml b/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml
new file mode 100644
index 000000000000..bebeda8cb5e4
--- /dev/null
+++ b/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml
@@ -0,0 +1,17 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: gossip-kv-dynamic-config
+data:
+ dynamic.toml: |
+ [[seed_nodes]]
+ id = "gossip-kv-seed-nodes-0"
+ address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3000"
+
+ [[seed_nodes]]
+ id = "gossip-kv-seed-nodes-1"
+ address = "gossip-kv-seed-nodes-1.gossip-kv-seed-nodes.default.svc.cluster.local:3000"
+
+ [[seed_nodes]]
+ id = "gossip-kv-seed-nodes-2"
+ address = "gossip-kv-seed-nodes-2.gossip-kv-seed-nodes.default.svc.cluster.local:3000"
\ No newline at end of file
diff --git a/datastores/gossip_kv/kv/lattices/mod.rs b/datastores/gossip_kv/kv/lattices/mod.rs
new file mode 100644
index 000000000000..1aa5883b4bda
--- /dev/null
+++ b/datastores/gossip_kv/kv/lattices/mod.rs
@@ -0,0 +1,248 @@
+use std::cmp::Ordering;
+use std::collections::HashSet;
+use std::hash::Hash;
+
+use hydroflow::lattices::{IsBot, IsTop, LatticeFrom, LatticeOrd, Merge};
+use serde::{Deserialize, Serialize};
+
+/// A bounded set union lattice with a fixed size N.
+///
+/// Once the set reaches size N, it becomes top. The items in the set are no longer tracked to
+/// reclaim associated memory.
+#[derive(Debug, Clone, Eq, Serialize, Deserialize)]
+
+pub struct BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ // The set of items in the lattice with invariant:
+ // is_top => items.is_empty() ... i.e. the items are dropped when the lattice reaches top.
+ items: HashSet,
+ is_top: bool,
+}
+
+impl LatticeFrom> for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn lattice_from(other: BoundedSetLattice) -> Self {
+ other
+ }
+}
+
+impl Default for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn default() -> Self {
+ Self {
+ items: HashSet::new(),
+ is_top: N == 0, // This lattice is effectively a unit lattice `()`, if N == 0
+ }
+ }
+}
+
+impl From<()> for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn from(_: ()) -> Self {
+ Default::default()
+ }
+}
+
+impl From> for ()
+where
+ T: Eq + Hash,
+{
+ fn from(_: BoundedSetLattice) -> Self {}
+}
+
+impl BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ pub fn new_from(items: U) -> Self
+ where
+ U: IntoIterator- ,
+ {
+ let mut lattice = Self::new();
+ lattice.merge(items);
+ lattice
+ }
+}
+
+impl IsBot for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn is_bot(&self) -> bool {
+ match N {
+ 0 => true,
+ _ => self.items.is_empty() && !self.is_top,
+ }
+ }
+}
+
+impl IsTop for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn is_top(&self) -> bool {
+ self.is_top
+ }
+}
+
+impl Merge for BoundedSetLattice
+where
+ U: IntoIterator
- ,
+ T: Eq + Hash,
+{
+ fn merge(&mut self, other: U) -> bool {
+ if self.is_top {
+ return false;
+ }
+
+ let old_len = self.items.len();
+ self.items.extend(other);
+ let new_len = self.items.len();
+
+ if new_len >= N {
+ self.is_top = true;
+ self.items.clear();
+ }
+
+ new_len != old_len
+ }
+}
+
+impl PartialOrd for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn partial_cmp(&self, other: &Self) -> Option {
+ match (self.is_top, other.is_top) {
+ (true, true) => Some(Ordering::Equal),
+ (true, false) => Some(Ordering::Greater),
+ (false, true) => Some(Ordering::Less),
+ (false, false) => match self.items.len().cmp(&other.items.len()) {
+ Ordering::Greater => {
+ if other.items.iter().all(|key| self.items.contains(key)) {
+ Some(Ordering::Greater)
+ } else {
+ None
+ }
+ }
+ Ordering::Less => {
+ if self.items.iter().all(|key| other.items.contains(key)) {
+ Some(Ordering::Less)
+ } else {
+ None
+ }
+ }
+ Ordering::Equal => {
+ if self.items.iter().all(|key| other.items.contains(key)) {
+ Some(Ordering::Equal)
+ } else {
+ None
+ }
+ }
+ },
+ }
+ }
+}
+
+impl PartialEq for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn eq(&self, other: &Self) -> bool {
+ match (self.is_top, other.is_top) {
+ (true, true) => true,
+ (true, false) => false,
+ (false, true) => false,
+ (false, false) => self.items == other.items,
+ }
+ }
+}
+
+impl LatticeOrd for BoundedSetLattice where T: Eq + Hash {}
+
+impl Merge> for BoundedSetLattice
+where
+ T: Eq + Hash,
+{
+ fn merge(&mut self, other: BoundedSetLattice) -> bool {
+ match (self.is_top, other.is_top) {
+ (true, _) => false,
+ (false, true) => {
+ self.is_top = true;
+ self.items.clear();
+ true
+ }
+ (false, false) => self.merge(other.items),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use hydroflow::lattices::test::check_all;
+
+ use super::*;
+
+ #[test]
+ fn test_0_bounded_set_lattice() {
+ let mut lat: BoundedSetLattice = ().into();
+ assert!(lat.is_bot() && lat.is_top());
+
+ // Merges should always return false.
+ assert!(!lat.merge([1]));
+
+ // No changes to top/bot status.
+ assert!(lat.is_bot() && lat.is_top());
+ }
+
+ #[test]
+ fn test_1_bounded_set_lattice() {
+ // The bounded lattice with N = 1 is effectively a WithBottom lattice.
+ let mut lat = BoundedSetLattice::::new();
+ assert!(lat.is_bot() && !lat.is_top());
+ assert!(lat.items.is_empty());
+
+ assert!(lat.merge([1]));
+ assert!(!lat.is_bot() && lat.is_top());
+ assert!(lat.items.is_empty()); // Check that the items were dropped.
+
+ assert!(!lat.merge([2]));
+ }
+
+ #[test]
+ fn test_2_bounded_set_lattice() {
+ let mut a = BoundedSetLattice::::new();
+ let b: BoundedSetLattice = BoundedSetLattice::new_from([1, 2]);
+
+ assert!(a.is_bot() && !a.is_top());
+ assert!(!b.is_bot() && b.is_top());
+
+ assert!(a.merge(b));
+ assert!(!a.is_bot() && a.is_top());
+
+ assert!(!a.merge([3]));
+ }
+
+ #[test]
+ fn test_lattice_properties() {
+ check_all(&[
+ Default::default(),
+ BoundedSetLattice::::new_from([1]),
+ BoundedSetLattice::::new_from([1, 2]),
+ BoundedSetLattice::::new_from([1, 2, 3]),
+ BoundedSetLattice::::new_from([1, 2, 3, 4]),
+ ]);
+ }
+}
diff --git a/datastores/gossip_kv/kv/lib.rs b/datastores/gossip_kv/kv/lib.rs
new file mode 100644
index 000000000000..15badc703be3
--- /dev/null
+++ b/datastores/gossip_kv/kv/lib.rs
@@ -0,0 +1,273 @@
+pub mod membership;
+pub mod model;
+
+pub mod server;
+
+pub mod lattices;
+
+pub mod util;
+
+use std::collections::HashSet;
+use std::fmt::Display;
+use std::str::FromStr;
+
+use serde::{Deserialize, Serialize};
+
+use crate::model::{Clock, Namespaces};
+use crate::KeyParseError::InvalidNamespace;
+
+/// The namespace of the key of an entry in the key-value store.
+#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Hash)]
+pub enum Namespace {
+ /// User namespace is for use by the user of the key-value store.
+ User,
+
+ /// System namespace is reserved for use by the key-value store itself.
+ System,
+}
+
+/// Error that can occur when parsing a key from a string.
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub enum KeyParseError {
+ /// The namespace in the key is invalid. Namespaces must be either `usr` or `sys`.
+ InvalidNamespace,
+
+ /// The key is in an invalid format. Keys must be of the form `/namespace/table/row`.
+ InvalidFormat,
+}
+
+impl Display for KeyParseError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ InvalidNamespace => write!(f, "Invalid namespace"),
+ KeyParseError::InvalidFormat => write!(f, "Invalid key format"),
+ }
+ }
+}
+
+impl FromStr for Namespace {
+ type Err = KeyParseError;
+ fn from_str(s: &str) -> Result {
+ match s {
+ "usr" => Ok(Namespace::User),
+ "sys" => Ok(Namespace::System),
+ _ => Err(InvalidNamespace),
+ }
+ }
+}
+
+/// The name of a table in the key-value store.
+pub type TableName = String;
+
+/// The key of a row in a table in the key-value store.
+pub type RowKey = String;
+
+/// A key of an entry in the key-value store.
+///
+/// Data in the key-value store is organized into namespaces, tables, and rows. Namespaces are
+/// either `usr` for user data or `sys` for system data. Namespaces contain tables, which contain
+/// rows. Each row has a row key and a row value.
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)]
+pub struct Key {
+ /// The namespace of the key.
+ pub namespace: Namespace,
+ /// The name of the table in the key.
+ pub table: TableName,
+ /// The key of the row in the table.
+ pub row_key: RowKey,
+}
+
+impl FromStr for Key {
+ type Err = KeyParseError;
+ fn from_str(s: &str) -> Result {
+ let mut parts: Vec = vec![];
+ let mut current_part = String::new();
+ let mut escaping = false;
+
+ for c in s.chars() {
+ match (escaping, c) {
+ (true, '\\') | (true, '/') => {
+ current_part.push(c);
+ escaping = false;
+ }
+ (true, _) => return Err(KeyParseError::InvalidFormat),
+ (false, '\\') => {
+ escaping = true;
+ }
+ (false, '/') => {
+ parts.push(current_part);
+ current_part = String::new();
+ }
+ (false, _) => {
+ current_part.push(c);
+ }
+ }
+ }
+
+ if escaping {
+ return Err(KeyParseError::InvalidFormat);
+ }
+
+ if !current_part.is_empty() {
+ parts.push(current_part);
+ }
+
+ if parts.len() != 4 {
+ return Err(KeyParseError::InvalidFormat);
+ }
+
+ if !parts[0].is_empty() {
+ return Err(KeyParseError::InvalidFormat);
+ }
+
+ if parts[2].is_empty() || parts[3].is_empty() {
+ return Err(KeyParseError::InvalidFormat);
+ }
+
+ let namespace = parts[1].parse()?;
+ Ok(Key {
+ namespace,
+ table: parts[2].to_string(),
+ row_key: parts[3].to_string(),
+ })
+ }
+}
+
+/// A request from a client to the key-value store.
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+pub enum ClientRequest {
+ /// A request to get the value of a key.
+ Get { key: Key },
+ /// A request to set the value of a key.
+ Set { key: Key, value: String },
+ /// A request to delete the value of a key.
+ Delete { key: Key },
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+pub enum ClientResponse {
+ /// A response for a get request. The key is echoed back along with the value, if it exists.
+ /// Multiple values are returned if there were concurrent writes to the key.
+ Get { key: Key, value: HashSet },
+ /// A response for a set request. The success field is true if the set was successful.
+ Set { success: bool },
+ /// A response for a delete request. The success field is true if delete was successful.
+ Delete { success: bool },
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+pub enum GossipMessage {
+ /// An "infecting message" to share updates with a peer.
+ Gossip {
+ message_id: String,
+ member_id: String,
+ writes: Namespaces,
+ },
+ /// An acknowledgement message sent by a peer in response to a Gossip message, to indicate
+ /// that it hasn't seen some of the writes in the Gossip message before.
+ Ack {
+ message_id: String,
+ member_id: String,
+ },
+ /// A negative acknowledgement sent by a peer in response to a Gossip message, to indicate
+ /// that it has seen all of the writes in the Gossip message before.
+ Nack {
+ message_id: String,
+ member_id: String,
+ },
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{Key, Namespace};
+
+ #[test]
+ fn test_key_parsing_sys_namespace() {
+ // Sys namespace
+ let first = "/sys/sys_table/sys_row".parse::().unwrap();
+ assert_eq!(first.namespace, Namespace::System);
+ assert_eq!(first.table, "sys_table");
+ assert_eq!(first.row_key, "sys_row");
+ }
+ #[test]
+ fn test_key_parsing_user_namespace() {
+ // User namespace
+ let second = "/usr/usr_table/usr_row".parse::().unwrap();
+ assert_eq!(second.namespace, Namespace::User);
+ assert_eq!(second.table, "usr_table");
+ assert_eq!(second.row_key, "usr_row");
+ }
+
+ #[test]
+ fn test_key_empty_table() {
+ // Empty table
+ let empty_table = "/usr//usr_row".parse::();
+ assert!(empty_table.is_err());
+ assert_eq!(
+ empty_table.unwrap_err(),
+ super::KeyParseError::InvalidFormat
+ );
+ }
+
+ #[test]
+ fn test_key_empty_row() {
+ // Empty row
+ let empty_row = "/usr/usr_table/".parse::();
+ assert!(empty_row.is_err());
+ assert_eq!(empty_row.unwrap_err(), super::KeyParseError::InvalidFormat);
+ }
+
+ #[test]
+ fn test_key_parsing_invalid_namespace() {
+ // Invalid namespace
+ let non_existent_namespace = "/ne_namespace/ne_table/ne_row".parse::();
+ assert!(non_existent_namespace.is_err());
+ assert_eq!(
+ non_existent_namespace.unwrap_err(),
+ super::KeyParseError::InvalidNamespace
+ );
+ }
+
+ #[test]
+ fn test_key_parsing_invalid_format() {
+ // Invalid format
+ let invalid_format = "/not_even_a_key".parse::();
+ assert!(invalid_format.is_err());
+ assert_eq!(
+ invalid_format.unwrap_err(),
+ super::KeyParseError::InvalidFormat
+ );
+
+ let invalid_format = "abcd/sys/sys_table/sys_row".parse::();
+ assert!(invalid_format.is_err());
+ assert_eq!(
+ invalid_format.unwrap_err(),
+ super::KeyParseError::InvalidFormat
+ );
+ }
+
+ #[test]
+ fn test_key_parsing_escaping() {
+ // Escape \
+ let key = r"/usr/usr\/table/usr\/row".parse::().unwrap();
+ assert_eq!(key.namespace, Namespace::User);
+ assert_eq!(key.table, r"usr/table");
+ assert_eq!(key.row_key, r"usr/row");
+
+ // Escaping /
+ let key = r"/usr/usr\\table/usr\\row".parse::().unwrap();
+ assert_eq!(key.namespace, Namespace::User);
+ assert_eq!(key.table, r"usr\table");
+ assert_eq!(key.row_key, r"usr\row");
+
+ // Escaping any character
+ let key = r"/usr/usr\table/usr\row".parse::();
+ assert!(key.is_err());
+ assert_eq!(key.unwrap_err(), super::KeyParseError::InvalidFormat);
+
+ // Dangling escape
+ let key = r"/usr/usr_table/usr_row\".parse::();
+ assert!(key.is_err());
+ assert_eq!(key.unwrap_err(), super::KeyParseError::InvalidFormat);
+ }
+}
diff --git a/datastores/gossip_kv/kv/membership.rs b/datastores/gossip_kv/kv/membership.rs
new file mode 100644
index 000000000000..3657c87d3cd2
--- /dev/null
+++ b/datastores/gossip_kv/kv/membership.rs
@@ -0,0 +1,85 @@
+use std::fmt::Debug;
+use std::hash::Hash;
+
+use serde::{Deserialize, Serialize};
+
+pub type MemberId = String;
+
+/// Information about a member in the cluster.
+///
+/// A member is a transducer that is part of the cluster. Leaving or failing is a terminal
+/// state for a member. When a transducer restarts and rejoins the cluster, it is considered a
+/// new member.
+///
+/// # Generic Parameters
+/// -- `A`: The transport of the endpoint on which the protocol is running. In production, this will
+/// likely be a `SocketAddr`.
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+pub struct MemberData
+where
+ A: Debug + Clone + Eq + Hash + Serialize,
+{
+ /// The name of the member. Usually, this is a randomly generated identifier, based on the
+ /// hostname on which the member is running.
+ pub id: MemberId,
+
+ /// The protocols that the member supports.
+ pub protocols: Vec>,
+}
+
+/// A builder for `MemberData`.
+pub struct MemberDataBuilder
+where
+ A: Debug + Clone + Eq + Hash + Serialize,
+{
+ id: MemberId,
+ protocols: Vec>,
+}
+
+impl MemberDataBuilder
+where
+ A: Debug + Clone + Eq + Hash + Serialize,
+{
+ /// Creates a new `MemberDataBuilder`.
+ pub fn new(id: MemberId) -> Self {
+ MemberDataBuilder {
+ id,
+ protocols: Vec::new(),
+ }
+ }
+
+ /// Adds a protocol to the member.
+ pub fn add_protocol(mut self, protocol: Protocol) -> Self {
+ self.protocols.push(protocol);
+ self
+ }
+
+ /// Builds the `MemberData`.
+ pub fn build(self) -> MemberData {
+ MemberData {
+ id: self.id,
+ protocols: self.protocols,
+ }
+ }
+}
+
+/// A protocol supported by a member.
+///
+/// # Generic Parameters
+/// -- `A`: The transport of the endpoint on which the protocol is running. In production, this will
+/// likely be a `SocketAddr`.
+#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+pub struct Protocol {
+ /// The name of the protocol.
+ pub name: String,
+
+ /// The endpoint on which the protocol is running.
+ pub endpoint: A,
+}
+
+impl Protocol {
+ /// Creates a new `Protocol`.
+ pub fn new(name: String, endpoint: A) -> Self {
+ Protocol { name, endpoint }
+ }
+}
diff --git a/datastores/gossip_kv/kv/model.rs b/datastores/gossip_kv/kv/model.rs
new file mode 100644
index 000000000000..3d17d78d3b68
--- /dev/null
+++ b/datastores/gossip_kv/kv/model.rs
@@ -0,0 +1,166 @@
+use hydroflow::lattices::map_union::MapUnionHashMap;
+use hydroflow::lattices::set_union::SetUnionHashSet;
+use hydroflow::lattices::{DomPair, Max};
+
+use crate::Namespace;
+
+/// Primary key for entries in a table.
+pub type RowKey = String;
+
+/// Value stored in a table. Modelled as a timestamped set of strings.
+///
+/// Each value is timestamped with the time at which it was last updated. Concurrent updates at
+/// the same timestamp are stored as a set.
+pub type RowValue = DomPair>;
+
+/// A map from row keys to values in a table.
+pub type Table = MapUnionHashMap;
+
+/// Name of a table in the data store.
+pub type TableName = String;
+
+/// A map from table names to tables.
+pub type TableMap = MapUnionHashMap>;
+
+pub type NamespaceMap = MapUnionHashMap>;
+
+pub type Namespaces = NamespaceMap>;
+
+/// Timestamps used in the model.
+// TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydroflow/issues/1207.
+pub type Clock = Max;
+
+/// TableMap element to upsert a row in an existing TableMap.
+///
+/// Merge this into an existing TableMap to upsert a row in a table. If the table does not exist,
+/// it gets created. There's no explicit "create table" operation.
+///
+/// Parameters:
+/// - `row_ts`: New timestamp of the row being upserted.
+/// - `table_name`: Name of the table.
+/// - `key`: Primary key of the row.
+/// - `val`: Row value.
+pub fn upsert_row(
+ row_ts: C,
+ ns: Namespace,
+ table_name: TableName,
+ key: RowKey,
+ val: String,
+) -> Namespaces {
+ let value: RowValue = RowValue::new_from(row_ts, SetUnionHashSet::new_from([val]));
+ let row: Table> = Table::new_from([(key, value)]);
+ let table: TableMap> = TableMap::new_from([(table_name, row)]);
+ Namespaces::new_from([(ns, table)])
+}
+
+/// TableMap element to delete a row from an existing TableMap.
+///
+/// Merge this into an existing TableMap to delete a row from a table.
+///
+/// Parameters:
+/// - `row_ts`: New timestamp of the row being deleted.
+/// - `table_name`: Name of the table.
+/// - `key`: Primary key of the row.
+pub fn delete_row(
+ row_ts: C,
+ ns: Namespace,
+ table_name: TableName,
+ key: RowKey,
+) -> Namespaces {
+ let value: RowValue = RowValue::new_from(row_ts, SetUnionHashSet::new_from([]));
+ let row: Table> = Table::new_from([(key, value)]);
+ let table = TableMap::new_from([(table_name, row)]);
+ Namespaces::new_from([(ns, table)])
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashSet;
+
+ use hydroflow::lattices::Merge;
+
+ use crate::model::{delete_row, upsert_row, Clock, Namespaces, RowKey, TableName};
+ use crate::Namespace::System;
+
+ #[test]
+ fn test_table_map() {
+ let mut namespaces: Namespaces = Namespaces::default();
+
+ let first_tick: Clock = Clock::new(0);
+ let second_tick: Clock = Clock::new(1);
+
+ let members_table = TableName::from("members");
+ let key_1 = RowKey::from("key1");
+ let value_1: String = "value1".to_string();
+
+ // Table starts out empty.
+ assert_eq!(
+ namespaces.as_reveal_ref().len(),
+ 0,
+ "Expected no namespaces."
+ );
+
+ let insert = upsert_row(
+ first_tick,
+ System,
+ members_table.clone(),
+ key_1.clone(),
+ value_1.clone(),
+ );
+ Merge::merge(&mut namespaces, insert);
+ {
+ let table = namespaces
+ .as_reveal_ref()
+ .get(&System)
+ .unwrap()
+ .as_reveal_ref()
+ .get(&members_table)
+ .unwrap();
+
+ let row = table.as_reveal_ref().get(&key_1);
+ assert!(row.is_some(), "Row should exist");
+ assert_eq!(
+ *row.unwrap().as_reveal_ref().0,
+ first_tick,
+ "Unexpected row timestamp"
+ );
+
+ let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
+ assert_eq!(
+ value,
+ &HashSet::from([value_1.to_string()]),
+ "Unexpected row value"
+ );
+ }
+
+ let delete_row = delete_row(
+ second_tick,
+ System,
+ members_table.clone(),
+ key_1.to_string(),
+ );
+ Merge::merge(&mut namespaces, delete_row);
+ {
+ let table = namespaces
+ .as_reveal_ref()
+ .get(&System)
+ .unwrap()
+ .as_reveal_ref()
+ .get(&members_table)
+ .unwrap();
+
+ // Deletion in this case leaves a "tombstone"
+ let row = table.as_reveal_ref().get(&key_1);
+
+ assert!(row.is_some(), "Row should exist");
+ assert_eq!(
+ *row.unwrap().as_reveal_ref().0,
+ second_tick,
+ "Unexpected row timestamp"
+ );
+
+ let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
+ assert_eq!(value, &HashSet::from([]), "Row should be empty");
+ }
+ }
+}
diff --git a/datastores/gossip_kv/kv/server.rs b/datastores/gossip_kv/kv/server.rs
new file mode 100644
index 000000000000..a36d89013c40
--- /dev/null
+++ b/datastores/gossip_kv/kv/server.rs
@@ -0,0 +1,763 @@
+use std::collections::{HashMap, HashSet};
+use std::fmt::Debug;
+use std::hash::Hash;
+
+use hydroflow::futures::{Sink, Stream};
+use hydroflow::hydroflow_syntax;
+use hydroflow::itertools::Itertools;
+use hydroflow::lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
+use hydroflow::lattices::set_union::SetUnionHashSet;
+use hydroflow::lattices::{Lattice, PairBimorphism};
+use hydroflow::scheduled::graph::Hydroflow;
+use lattices::set_union::SetUnion;
+use lattices::{IsTop, Max, Pair};
+use lazy_static::lazy_static;
+use prometheus::{register_int_counter, IntCounter};
+use rand::seq::IteratorRandom;
+use rand::thread_rng;
+use serde::de::DeserializeOwned;
+use serde::{Deserialize, Serialize};
+use tracing::{info, trace};
+
+use crate::lattices::BoundedSetLattice;
+use crate::membership::{MemberData, MemberId};
+use crate::model::{
+ delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName,
+};
+use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress};
+use crate::GossipMessage::{Ack, Nack};
+use crate::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace};
+
+/// A trait that represents an abstract network address. In production, this will typically be
+/// SocketAddr.
+pub trait Address: Hash + Debug + Clone + Eq + Serialize {}
+impl Address for A where A: Hash + Debug + Clone + Eq + Serialize {}
+
+#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
+pub struct SeedNode
+where
+ A: Address,
+{
+ pub id: MemberId,
+ pub address: A,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, Lattice)]
+pub struct InfectingWrite {
+ write: Namespaces,
+ members: BoundedSetLattice,
+}
+
+pub type MessageId = String;
+
+lazy_static! {
+ pub static ref SETS_COUNTER: IntCounter =
+ register_int_counter!("sets", "Counts the number of SET requests processed.").unwrap();
+}
+
+/// Creates a L0 key-value store server using Hydroflow.
+///
+/// # Arguments
+/// -- `client_inputs`: The input stream of client requests for the client protocol.
+/// -- `client_outputs`: The output sink of client responses for the client protocol.
+/// -- `member_info`: The membership information of the server.
+/// -- `seed_nodes`: A list of seed nodes that can be used to bootstrap the gossip cluster.
+#[expect(clippy::too_many_arguments)]
+pub fn server<
+ ClientInput,
+ ClientOutput,
+ ClientOutputError,
+ GossipInput,
+ GossipOutput,
+ GossipOutputError,
+ GossipTrigger,
+ SeedNodeStream,
+ Addr,
+>(
+ client_inputs: ClientInput,
+ client_outputs: ClientOutput,
+ gossip_inputs: GossipInput,
+ gossip_outputs: GossipOutput,
+ gossip_trigger: GossipTrigger,
+ member_info: MemberData,
+ seed_nodes: Vec>,
+ seed_node_stream: SeedNodeStream,
+) -> Hydroflow<'static>
+where
+ ClientInput: Stream
- + Unpin + 'static,
+ ClientOutput: Sink<(ClientResponse, Addr), Error = ClientOutputError> + Unpin + 'static,
+ GossipInput: Stream
- + Unpin + 'static,
+ GossipOutput: Sink<(GossipMessage, Addr), Error = GossipOutputError> + Unpin + 'static,
+ GossipTrigger: Stream
- + Unpin + 'static,
+ SeedNodeStream: Stream
- >> + Unpin + 'static,
+ Addr: Address + DeserializeOwned + 'static,
+ ClientOutputError: Debug + 'static,
+ GossipOutputError: Debug + 'static,
+{
+ let my_member_id = member_info.id.clone();
+ // TODO: This is ugly, but the only way this works at the moment.
+ let member_id_2 = my_member_id.clone();
+ let member_id_3 = my_member_id.clone();
+ let member_id_4 = my_member_id.clone();
+ let member_id_5 = my_member_id.clone();
+ let member_id_6 = my_member_id.clone();
+
+ hydroflow_syntax! {
+
+ on_start = initialize() -> tee();
+ on_start -> for_each(|_| info!("{:?}: Transducer {} started.", context.current_tick(), member_id_6));
+
+ seed_nodes = source_stream(seed_node_stream)
+ -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec>| {
+ **last_seed_nodes = new_seed_nodes;
+ info!("Updated seed nodes: {:?}", **last_seed_nodes);
+ });
+
+ // Setup member metadata for this process.
+ on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
+ -> writes;
+
+ client_out =
+ inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr))
+ -> dest_sink(client_outputs);
+
+ client_in = source_stream(client_inputs)
+ -> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr))
+ -> demux_enum::>();
+
+ client_in[Get]
+ -> inspect(|req| trace!("{:?}: Received Get request: {:?}.", context.current_tick(), req))
+ -> map(|(key, addr) : (Key, Addr)| {
+ let row = MapUnionHashMap::new_from([
+ (
+ key.row_key,
+ SetUnionHashSet::new_from([addr /* to respond with the result later*/])
+ ),
+ ]);
+ let table = MapUnionHashMap::new_from([(key.table, row)]);
+ MapUnionHashMap::new_from([(key.namespace, table)])
+ })
+ -> reads;
+
+ client_in[Set]
+ -> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request))
+ -> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value))
+ -> inspect(|_| {
+ SETS_COUNTER.inc(); // Bump SET metrics
+ })
+ -> writes;
+
+ client_in[Delete]
+ -> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req))
+ -> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key))
+ -> writes;
+
+ gossip_in = source_stream(gossip_inputs)
+ -> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr))
+ -> demux_enum::>();
+
+ incoming_gossip_messages = gossip_in[Gossip]
+ -> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request))
+ -> tee();
+
+ gossip_in[Ack]
+ -> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request))
+ -> null();
+
+ gossip_in[Nack]
+ -> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request))
+ -> map( |(message_id, member_id, _addr)| {
+ MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) }))
+ })
+ -> infecting_writes;
+
+ gossip_out = union() -> dest_sink(gossip_outputs);
+
+ incoming_gossip_messages
+ -> map(|(_msg_id, _member_id, writes, _addr)| writes )
+ -> writes;
+
+ gossip_processing_pipeline = incoming_gossip_messages
+ -> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces>, Addr)| {
+ let namespaces = namespaces;
+ let all_data: &HashMap>> = namespaces.as_reveal_ref();
+ let possible_new_data: &HashMap>>>= writes.as_reveal_ref();
+
+ // Check if any of the data is new
+ /* TODO: This logic is duplicated in MapUnion::Merge and ideally should be accessed
+ from the pass-through streaming output from `state`. See
+ https://www.notion.so/hydro-project/Proposal-for-State-API-10a2a586262f8080b981d1a2948a69ac
+ for more. */
+ let gossip_has_new_data = possible_new_data.iter()
+ .flat_map(|(namespace, tables)| {
+ tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{
+ rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref()))
+ })
+ })
+ .any(|(ns,table, row_key, new_ts)| {
+ let existing_tables = all_data.get(ns);
+ let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table));
+ let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key));
+ let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref());
+
+ if let Some(existing_ts) = existing_ts {
+ trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts);
+ new_ts > existing_ts
+ } else {
+ true
+ }
+ });
+
+ if gossip_has_new_data {
+ (Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes))
+ } else {
+ (Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None)
+ }
+ })
+ -> tee();
+
+ gossip_processing_pipeline
+ -> map(|(response, address, _writes)| (response, address))
+ -> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr))
+ -> gossip_out;
+
+ gossip_processing_pipeline
+ -> filter(|(_, _, writes)| writes.is_some())
+ -> map(|(_, _, writes)| writes.unwrap())
+ -> writes;
+
+ writes = union();
+
+ writes -> namespaces;
+
+ namespaces = state::<'static, Namespaces::>();
+ new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs
+
+ reads = state::<'tick, MapUnionHashMap>>>>();
+
+ new_writes -> [0]process_system_table_reads;
+ reads -> [1]process_system_table_reads;
+
+ process_system_table_reads = lattice_bimorphism(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(PairBimorphism))), #namespaces, #reads)
+ -> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick.
+ -> flat_map(|result: NamespaceMap, SetUnion>>>| {
+
+ let mut response: Vec<(ClientResponse, Addr)> = vec![];
+
+ let result = result.as_reveal_ref();
+
+ for (namespace, tables) in result.iter() {
+ for (table_name, table) in tables.as_reveal_ref().iter() {
+ for (row_key, join_results) in table.as_reveal_ref().iter() {
+ let key = Key {
+ namespace: *namespace,
+ table: table_name.clone(),
+ row_key: row_key.clone(),
+ };
+
+ let timestamped_values = join_results.as_reveal_ref().0;
+ let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref();
+
+ let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref();
+ let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap();
+
+ response.push((
+ ClientResponse::Get {key, value: all_values.clone()},
+ socket_addr.clone(),
+ ));
+ }
+ }
+ }
+ response
+ }) -> client_out;
+
+ new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x));
+
+ // Step 1: Put the new writes in a map, with the write as the key and a SetBoundedLattice as the value.
+ infecting_writes = union() -> state::<'static, MapUnionHashMap>();
+
+ new_writes -> map(|write| {
+ // Ideally, the write itself is the key, but writes are a hashmap and hashmaps don't
+ // have a hash implementation. So we just generate a GUID identifier for the write
+ // for now.
+ let id = uuid::Uuid::new_v4().to_string();
+ MapUnionSingletonMap::new_from((id, InfectingWrite { write, members: BoundedSetLattice::new() }))
+ }) -> infecting_writes;
+
+ gossip_trigger = source_stream(gossip_trigger);
+
+ gossip_messages = gossip_trigger
+ -> flat_map( |_|
+ {
+ let infecting_writes = #infecting_writes.as_reveal_ref().clone();
+ trace!("{:?}: Currently gossipping {} infecting writes.", context.current_tick(), infecting_writes.iter().filter(|(_, write)| !write.members.is_top()).count());
+ infecting_writes
+ }
+ )
+ -> filter(|(_id, infecting_write)| !infecting_write.members.is_top())
+ -> map(|(id, infecting_write)| {
+ trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write);
+ let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone();
+
+ let mut peer_names = HashSet::new();
+ peers.iter().for_each(|(row_key, _)| {
+ peer_names.insert(row_key.clone());
+ });
+
+ let seed_nodes = seed_nodes;
+ seed_nodes.iter().for_each(|seed_node| {
+ peer_names.insert(seed_node.id.clone());
+ });
+
+ // Exclude self from the list of peers.
+ peer_names.remove(&member_id_5);
+
+ trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names);
+
+ let chosen_peer_name = peer_names.iter().choose(&mut thread_rng());
+
+ if chosen_peer_name.is_none() {
+ trace!("{:?}: No peers to gossip to.", context.current_tick());
+ return None;
+ }
+
+ let chosen_peer_name = chosen_peer_name.unwrap();
+ let gossip_address = if peers.contains_key(chosen_peer_name) {
+ let peer_info_value = peers.get(chosen_peer_name).unwrap().as_reveal_ref().1.as_reveal_ref().iter().next().unwrap().clone();
+ let peer_info_deserialized = serde_json::from_str::>(&peer_info_value).unwrap();
+ peer_info_deserialized.protocols.iter().find(|protocol| protocol.name == "gossip").unwrap().clone().endpoint
+ } else {
+ seed_nodes.iter().find(|seed_node| seed_node.id == *chosen_peer_name).unwrap().address.clone()
+ };
+
+ trace!("Chosen peer: {:?}:{:?}", chosen_peer_name, gossip_address);
+ Some((id, infecting_write, gossip_address))
+ })
+ -> flatten()
+ -> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address))
+ -> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| {
+ let gossip_request = GossipMessage::Gossip {
+ message_id: message_id.clone(),
+ member_id: member_id_4.clone(),
+ writes: infecting_write.write.clone(),
+ };
+ (gossip_request, peer_gossip_address)
+ })
+ -> gossip_out;
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashSet;
+
+ use hydroflow::tokio_stream::empty;
+ use hydroflow::util::simulation::{Address, Fleet, Hostname};
+
+ use super::*;
+ use crate::membership::{MemberDataBuilder, Protocol};
+
+ #[hydroflow::test]
+ async fn test_member_init() {
+ let mut fleet = Fleet::new();
+
+ let server_name: Hostname = "server".to_string();
+
+ let server_client_address = Address::new(server_name.clone(), "client".to_string());
+ let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
+
+ let (_, gossip_trigger_rx) = hydroflow::util::unbounded_channel::<()>();
+
+ // Create the kv server
+ fleet.add_host(server_name.clone(), |ctx| {
+ let client_input = ctx.new_inbox::("client".to_string());
+ let client_output = ctx.new_outbox::("client".to_string());
+
+ let gossip_input = ctx.new_inbox::("gossip".to_string());
+ let gossip_output = ctx.new_outbox::("gossip".to_string());
+
+ let member_data = MemberDataBuilder::new(server_name.clone())
+ .add_protocol(Protocol::new(
+ "client".into(),
+ server_client_address.clone(),
+ ))
+ .add_protocol(Protocol::new(
+ "gossip".into(),
+ server_gossip_address.clone(),
+ ))
+ .build();
+
+ server(
+ client_input,
+ client_output,
+ gossip_input,
+ gossip_output,
+ gossip_trigger_rx,
+ member_data,
+ vec![],
+ empty(),
+ )
+ });
+
+ let client_name: Hostname = "client".to_string();
+
+ let key = "/sys/members/server".parse::().unwrap();
+
+ let (trigger_tx, trigger_rx) = hydroflow::util::unbounded_channel::<()>();
+ let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::();
+
+ let key_clone = key.clone();
+ let server_client_address_clone = server_client_address.clone();
+
+ fleet.add_host(client_name.clone(), |ctx| {
+ let client_tx = ctx.new_outbox::("client".to_string());
+ let client_rx = ctx.new_inbox::("client".to_string());
+
+ hydroflow_syntax! {
+
+ client_output = dest_sink(client_tx);
+
+ source_stream(trigger_rx)
+ -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
+ -> client_output;
+
+ client_input = source_stream(client_rx)
+ -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
+
+ }
+ });
+
+ // Send a trigger to the client to send a get request.
+ trigger_tx.send(()).unwrap();
+
+ let expected_member_data = MemberDataBuilder::new(server_name.clone())
+ .add_protocol(Protocol::new(
+ "client".to_string(),
+ server_client_address.clone(),
+ ))
+ .add_protocol(Protocol::new(
+ "gossip".to_string(),
+ server_gossip_address.clone(),
+ ))
+ .build();
+
+ loop {
+ fleet.run_single_tick_all_hosts().await;
+
+ let responses =
+ hydroflow::util::collect_ready_async::, _>(&mut response_rx).await;
+
+ if !responses.is_empty() {
+ assert_eq!(
+ responses,
+ &[(ClientResponse::Get {
+ key: key.clone(),
+ value: HashSet::from([
+ serde_json::to_string(&expected_member_data).unwrap()
+ ])
+ })]
+ );
+ break;
+ }
+ }
+ }
+
+ #[hydroflow::test]
+ async fn test_multiple_values_same_tick() {
+ let mut fleet = Fleet::new();
+
+ let server_name: Hostname = "server".to_string();
+
+ let server_client_address = Address::new(server_name.clone(), "client".to_string());
+
+ let (_, gossip_trigger_rx) = hydroflow::util::unbounded_channel::<()>();
+
+ // Create the kv server
+ fleet.add_host(server_name.clone(), |ctx| {
+ let client_input = ctx.new_inbox::("client".to_string());
+ let client_output = ctx.new_outbox::("client".to_string());
+
+ let gossip_input = ctx.new_inbox::("gossip".to_string());
+ let gossip_output = ctx.new_outbox::("gossip".to_string());
+ let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
+
+ let member_data = MemberDataBuilder::new(server_name.clone())
+ .add_protocol(Protocol::new(
+ "client".into(),
+ server_client_address.clone(),
+ ))
+ .add_protocol(Protocol::new(
+ "gossip".into(),
+ server_gossip_address.clone(),
+ ))
+ .build();
+
+ server(
+ client_input,
+ client_output,
+ gossip_input,
+ gossip_output,
+ gossip_trigger_rx,
+ member_data,
+ vec![],
+ empty(),
+ )
+ });
+
+ let key = Key {
+ namespace: Namespace::System,
+ table: "table".to_string(),
+ row_key: "row".to_string(),
+ };
+ let val_a = "A".to_string();
+ let val_b = "B".to_string();
+
+ let writer_name: Hostname = "writer".to_string();
+
+ let (writer_trigger_tx, writer_trigger_rx) = hydroflow::util::unbounded_channel::();
+ let key_clone = key.clone();
+ let server_client_address_clone = server_client_address.clone();
+
+ fleet.add_host(writer_name.clone(), |ctx| {
+ let client_tx = ctx.new_outbox::("client".to_string());
+ hydroflow_syntax! {
+ client_output = dest_sink(client_tx);
+
+ source_stream(writer_trigger_rx)
+ -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_client_address_clone.clone()) )
+ -> client_output;
+ }
+ });
+
+ // Send two messages from the writer.
+ let writer = fleet.get_host_mut(&writer_name).unwrap();
+ writer_trigger_tx.send(val_a.clone()).unwrap();
+ writer.run_tick();
+
+ writer_trigger_tx.send(val_b.clone()).unwrap();
+ writer.run_tick();
+
+ // Transmit messages across the network.
+ fleet.process_network().await;
+
+ // Run the server.
+ let server = fleet.get_host_mut(&server_name).unwrap();
+ server.run_tick();
+
+ // Read the value back.
+ let reader_name: Hostname = "reader".to_string();
+
+ let (reader_trigger_tx, reader_trigger_rx) = hydroflow::util::unbounded_channel::<()>();
+ let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::();
+
+ let key_clone = key.clone();
+ let server_client_address_clone = server_client_address.clone();
+
+ fleet.add_host(reader_name.clone(), |ctx| {
+ let client_tx = ctx.new_outbox::("client".to_string());
+ let client_rx = ctx.new_inbox::("client".to_string());
+
+ hydroflow_syntax! {
+ client_output = dest_sink(client_tx);
+
+ source_stream(reader_trigger_rx)
+ -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
+ -> client_output;
+
+ client_input = source_stream(client_rx)
+ -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
+
+ }
+ });
+
+ reader_trigger_tx.send(()).unwrap();
+
+ loop {
+ fleet.run_single_tick_all_hosts().await;
+
+ let responses =
+ hydroflow::util::collect_ready_async::, _>(&mut response_rx).await;
+
+ if !responses.is_empty() {
+ assert_eq!(
+ responses,
+ &[ClientResponse::Get {
+ key,
+ value: HashSet::from([val_a, val_b])
+ }]
+ );
+ break;
+ }
+ }
+ }
+
+ #[hydroflow::test]
+ async fn test_gossip() {
+ let subscriber = tracing_subscriber::FmtSubscriber::builder()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .with_test_writer()
+ .finish();
+
+ let _ = tracing::subscriber::set_global_default(subscriber);
+
+ let mut fleet = Fleet::new();
+
+ let server_a: Hostname = "server_a".to_string();
+ let server_b: Hostname = "server_b".to_string();
+
+ let server_a_client_address = Address::new(server_a.clone(), "client".to_string());
+ let server_b_client_address = Address::new(server_b.clone(), "client".to_string());
+
+ let server_a_gossip_address = Address::new(server_a.clone(), "gossip".to_string());
+ let server_b_gossip_address = Address::new(server_b.clone(), "gossip".to_string());
+
+ let seed_nodes = vec![
+ SeedNode {
+ id: server_a.clone(),
+ address: server_a_gossip_address.clone(),
+ },
+ SeedNode {
+ id: server_b.clone(),
+ address: server_b_gossip_address.clone(),
+ },
+ ];
+
+ let (gossip_trigger_tx_a, gossip_trigger_rx_a) = hydroflow::util::unbounded_channel::<()>();
+
+ let seed_nodes_clone = seed_nodes.clone();
+ fleet.add_host(server_a.clone(), |ctx| {
+ let client_input = ctx.new_inbox::("client".to_string());
+ let client_output = ctx.new_outbox::("client".to_string());
+
+ let gossip_input = ctx.new_inbox::("gossip".to_string());
+ let gossip_output = ctx.new_outbox::("gossip".to_string());
+
+ let member_data = MemberDataBuilder::new(server_a.clone())
+ .add_protocol(Protocol::new(
+ "client".into(),
+ server_a_client_address.clone(),
+ ))
+ .add_protocol(Protocol::new(
+ "gossip".into(),
+ server_a_gossip_address.clone(),
+ ))
+ .build();
+
+ server(
+ client_input,
+ client_output,
+ gossip_input,
+ gossip_output,
+ gossip_trigger_rx_a,
+ member_data,
+ seed_nodes_clone,
+ empty(),
+ )
+ });
+
+ let (_, gossip_trigger_rx_b) = hydroflow::util::unbounded_channel::<()>();
+
+ let seed_nodes_clone = seed_nodes.clone();
+ fleet.add_host(server_b.clone(), |ctx| {
+ let client_input = ctx.new_inbox::("client".to_string());
+ let client_output = ctx.new_outbox::("client".to_string());
+
+ let gossip_input = ctx.new_inbox::("gossip".to_string());
+ let gossip_output = ctx.new_outbox::("gossip".to_string());
+
+ let member_data = MemberDataBuilder::new(server_b.clone())
+ .add_protocol(Protocol::new(
+ "client".into(),
+ server_b_client_address.clone(),
+ ))
+ .add_protocol(Protocol::new(
+ "gossip".into(),
+ server_b_gossip_address.clone(),
+ ))
+ .build();
+
+ server(
+ client_input,
+ client_output,
+ gossip_input,
+ gossip_output,
+ gossip_trigger_rx_b,
+ member_data,
+ seed_nodes_clone,
+ empty(),
+ )
+ });
+
+ let key = Key {
+ namespace: Namespace::User,
+ table: "table".to_string(),
+ row_key: "row".to_string(),
+ };
+
+ let writer_name: Hostname = "writer".to_string();
+
+ let (writer_trigger_tx, writer_trigger_rx) = hydroflow::util::unbounded_channel::();
+
+ let key_clone = key.clone();
+ let server_a_client_address_clone = server_a_client_address.clone();
+
+ fleet.add_host(writer_name.clone(), |ctx| {
+ let client_tx = ctx.new_outbox::("client".to_string());
+ hydroflow_syntax! {
+ client_output = dest_sink(client_tx);
+
+ source_stream(writer_trigger_rx)
+ -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_a_client_address_clone.clone()) )
+ -> client_output;
+ }
+ });
+
+ let reader_name: Hostname = "reader".to_string();
+
+ let (reader_trigger_tx, reader_trigger_rx) = hydroflow::util::unbounded_channel::<()>();
+ let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::();
+
+ let key_clone = key.clone();
+ let server_b_client_address_clone = server_b_client_address.clone();
+
+ fleet.add_host(reader_name.clone(), |ctx| {
+ let client_tx = ctx.new_outbox::("client".to_string());
+ let client_rx = ctx.new_inbox::("client".to_string());
+
+ hydroflow_syntax! {
+ client_output = dest_sink(client_tx);
+
+ source_stream(reader_trigger_rx)
+ -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_b_client_address_clone.clone()) )
+ -> client_output;
+
+ client_input = source_stream(client_rx)
+ -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
+
+ }
+ });
+
+ let value = "VALUE".to_string();
+ writer_trigger_tx.send(value.clone()).unwrap();
+
+ loop {
+ reader_trigger_tx.send(()).unwrap();
+ fleet.run_single_tick_all_hosts().await;
+ let responses =
+ hydroflow::util::collect_ready_async::, _>(&mut response_rx).await;
+
+ if !responses.is_empty() {
+ assert_eq!(
+ responses,
+ &[ClientResponse::Get {
+ key,
+ value: HashSet::from([value.clone()])
+ }]
+ );
+ break;
+ }
+
+ gossip_trigger_tx_a.send(()).unwrap();
+ }
+ }
+}
diff --git a/datastores/gossip_kv/kv/util.rs b/datastores/gossip_kv/kv/util.rs
new file mode 100644
index 000000000000..4cc391da6e7a
--- /dev/null
+++ b/datastores/gossip_kv/kv/util.rs
@@ -0,0 +1,87 @@
+use hydroflow::DemuxEnum;
+
+use crate::model::{Clock, Namespaces};
+use crate::{ClientRequest, GossipMessage, Key};
+
+/// Convenience enum to represent a client request with the address of the client. Makes it
+/// possible to use `demux_enum` in the surface syntax.
+#[derive(Debug, DemuxEnum)]
+pub enum ClientRequestWithAddress {
+ /// A get request with the key and the address of the client.
+ Get { key: Key, addr: A },
+ /// A set request with the key, value and the address of the client.
+ Set { key: Key, value: String, addr: A },
+ /// A delete request with the key and the address of the client.
+ Delete { key: Key, addr: A },
+}
+
+impl ClientRequestWithAddress {
+ /// Create a `ClientRequestWithAddress` from a `ClientRequest` and an address.
+ pub fn from_request_and_address(request: ClientRequest, addr: A) -> Self {
+ match request {
+ ClientRequest::Get { key } => Self::Get { key, addr },
+ ClientRequest::Set { key, value } => Self::Set { key, value, addr },
+ ClientRequest::Delete { key } => Self::Delete { key, addr },
+ }
+ }
+}
+
+/// Convenience enum to represent a gossip request with the address of the client. Makes it
+/// possible to use `demux_enum` in the surface syntax.
+#[derive(Debug, DemuxEnum)]
+pub enum GossipRequestWithAddress {
+ /// A gossip request with the message id, writes and the address of the client.
+ Gossip {
+ message_id: String,
+ member_id: String,
+ writes: Namespaces,
+ addr: A,
+ },
+ /// An ack request with the message id and the address of the client.
+ Ack {
+ message_id: String,
+ member_id: String,
+ addr: A,
+ },
+ /// A nack request with the message id and the address of the client.
+ Nack {
+ message_id: String,
+ member_id: String,
+ addr: A,
+ },
+}
+
+impl GossipRequestWithAddress {
+ /// Create a `GossipRequestWithAddress` from a `GossipMessage` and an address.
+ pub fn from_request_and_address(request: GossipMessage, addr: A) -> Self {
+ match request {
+ GossipMessage::Gossip {
+ message_id,
+ member_id,
+ writes,
+ } => Self::Gossip {
+ message_id,
+ member_id,
+ writes,
+ addr,
+ },
+
+ GossipMessage::Ack {
+ message_id,
+ member_id,
+ } => Self::Ack {
+ message_id,
+ addr,
+ member_id,
+ },
+ GossipMessage::Nack {
+ message_id,
+ member_id,
+ } => Self::Nack {
+ message_id,
+ addr,
+ member_id,
+ },
+ }
+ }
+}
diff --git a/datastores/gossip_kv/load_test_server/Dockerfile b/datastores/gossip_kv/load_test_server/Dockerfile
new file mode 100644
index 000000000000..91f27fdd22d5
--- /dev/null
+++ b/datastores/gossip_kv/load_test_server/Dockerfile
@@ -0,0 +1,11 @@
+FROM "hydroflow-gossip-kv-base-image:latest" AS builder
+WORKDIR /usr/src/gossip-kv-server
+COPY . .
+RUN find .
+RUN cargo build --release --workspace -p gossip_kv
+
+FROM rustlang/rust:nightly-slim
+COPY --from=builder /usr/src/gossip-kv-server/target/release/load_test_server /usr/local/bin/load_test_server
+
+# Don't skip the trailing slash in the destination directory
+CMD ["load_test_server"]
diff --git a/datastores/gossip_kv/load_test_server/server.rs b/datastores/gossip_kv/load_test_server/server.rs
new file mode 100644
index 000000000000..88bd40e3fb32
--- /dev/null
+++ b/datastores/gossip_kv/load_test_server/server.rs
@@ -0,0 +1,226 @@
+use std::convert::Infallible;
+use std::num::{NonZeroU32, ParseFloatError};
+use std::thread::sleep;
+use std::time::Duration;
+
+use clap::Parser;
+use gossip_kv::membership::{MemberDataBuilder, Protocol};
+use gossip_kv::{ClientRequest, GossipMessage};
+use governor::{Quota, RateLimiter};
+use hydroflow::util::{unbounded_channel, unsync_channel};
+use prometheus::{gather, Encoder, TextEncoder};
+use tokio::sync::mpsc::UnboundedSender;
+use tokio::task;
+use tracing::{error, info, trace};
+use warp::Filter;
+
+type LoadTestAddress = u64;
+
+use gossip_kv::server::{server, SeedNode};
+use hydroflow::futures::sink::drain;
+use hydroflow::futures::stream;
+use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
+use hydroflow::tokio_stream::StreamExt;
+use lattices::cc_traits::Iter;
+
+const UNKNOWN_ADDRESS: LoadTestAddress = 9999999999;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)]
+struct Opts {
+ /// Number of threads to run. Each thread will run an instance of the gossip-kv server transducer.
+ #[clap(short, long, default_value = "5")]
+ thread_count: usize,
+
+ /// Frequency (in seconds) at which to send gossip messages.
+ #[clap(short, long, default_value = "10", value_parser = clap_duration_from_secs)]
+ gossip_frequency: Duration,
+
+ /// Maximum number of SET requests to send per second.
+ #[clap(short, long, default_value = "1")]
+ max_set_throughput: u32,
+}
+
+/// Parse duration from float string for clap args.
+fn clap_duration_from_secs(arg: &str) -> Result {
+ arg.parse().map(Duration::from_secs_f32)
+}
+
+fn run_server(
+ server_name: String,
+ gossip_address: LoadTestAddress,
+ gossip_input_rx: UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>,
+ switchboard: Switchboard,
+ seed_nodes: Vec>,
+ opts: Opts,
+) {
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None);
+
+ let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel();
+
+ let member_data = MemberDataBuilder::new(server_name.clone())
+ .add_protocol(Protocol::new("gossip".into(), gossip_address))
+ .build();
+
+ rt.block_on(async {
+ let local = task::LocalSet::new();
+
+ let (client_input_tx, client_input_rx) = unbounded_channel();
+
+ let put_throughput = opts.max_set_throughput;
+ local.spawn_local(async move {
+ let rate_limiter = RateLimiter::direct(Quota::per_second(
+ NonZeroU32::new(put_throughput).unwrap(),
+ ));
+ loop {
+ rate_limiter.until_ready().await;
+ let key = "/usr/table/key".parse().unwrap();
+ let request = ClientRequest::Set {
+ key,
+ value: "FOOBAR".to_string(),
+ };
+ client_input_tx.send((request, UNKNOWN_ADDRESS)).unwrap();
+ }
+ });
+
+ let gossip_frequency = opts.gossip_frequency;
+ local.spawn_local(async move {
+ loop {
+ tokio::time::sleep(gossip_frequency).await;
+ gossip_trigger_tx.send(()).unwrap();
+ }
+ });
+
+ // Networking
+ local.spawn_local(async move {
+ while let Some((msg, addr)) = gossip_output_rx.next().await {
+ trace!("Sending gossip message: {:?} to {}", msg, addr);
+ let outbox = switchboard.gossip_outboxes.get(addr as usize).unwrap();
+ if let Err(e) = outbox.send((msg, gossip_address)) {
+ error!("Failed to send gossip message: {:?}", e);
+ }
+ }
+ });
+
+ local.spawn_local(async {
+ let mut server = server(
+ client_input_rx,
+ drain(), // Ignoring client responses for now.
+ gossip_input_rx,
+ gossip_output_tx,
+ gossip_trigger_rx,
+ member_data,
+ seed_nodes,
+ stream::empty(),
+ );
+
+ server.run_async().await
+ });
+
+ local.await
+ });
+ });
+}
+
+struct Switchboard {
+ gossip_outboxes: Vec>,
+}
+
+impl Clone for Switchboard {
+ fn clone(&self) -> Self {
+ Self {
+ gossip_outboxes: self.gossip_outboxes.clone(),
+ }
+ }
+}
+
+impl Switchboard {
+ fn new() -> Self {
+ Self {
+ gossip_outboxes: Vec::new(),
+ }
+ }
+ fn new_outbox(
+ &mut self,
+ ) -> (
+ LoadTestAddress,
+ UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>,
+ ) {
+ let addr: LoadTestAddress = self.gossip_outboxes.len() as LoadTestAddress;
+ let (tx, rx) = unbounded_channel();
+ self.gossip_outboxes.push(tx);
+ (addr, rx)
+ }
+}
+
+async fn metrics_handler() -> Result {
+ let encoder = TextEncoder::new();
+ let metric_families = gather();
+ let mut buffer = Vec::new();
+ encoder.encode(&metric_families, &mut buffer).unwrap();
+
+ Ok(warp::reply::with_header(
+ buffer,
+ "Content-Type",
+ encoder.format_type(),
+ ))
+}
+
+fn main() {
+ tracing_subscriber::fmt::init();
+
+ let opts: Opts = Opts::parse();
+
+ std::thread::spawn(move || {
+ let metrics_route = warp::path("metrics").and_then(metrics_handler);
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ info!("Starting metrics server on port 4003");
+ warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await;
+ });
+ });
+
+ info!("Starting load test with with {} threads", opts.thread_count);
+
+ let mut switchboard = Switchboard::new();
+
+ let outboxes: Vec<_> = (0..opts.thread_count)
+ .map(|_| {
+ let (addr, rx) = switchboard.new_outbox();
+ (format!("SERVER-{}", addr), addr, rx)
+ })
+ .collect();
+
+ let seed_nodes: Vec<_> = outboxes
+ .iter()
+ .map(|(name, addr, _)| SeedNode {
+ id: name.clone(),
+ address: *addr,
+ })
+ .collect();
+
+ outboxes.into_iter().for_each(|(name, addr, outbox)| {
+ run_server(
+ name,
+ addr,
+ outbox,
+ switchboard.clone(),
+ seed_nodes.clone(),
+ opts,
+ );
+ });
+
+ loop {
+ sleep(Duration::from_secs(1));
+ }
+}
diff --git a/datastores/gossip_kv/server/.gitignore b/datastores/gossip_kv/server/.gitignore
new file mode 100644
index 000000000000..9d1173a06139
--- /dev/null
+++ b/datastores/gossip_kv/server/.gitignore
@@ -0,0 +1 @@
+config/local.toml
\ No newline at end of file
diff --git a/datastores/gossip_kv/server/Dockerfile b/datastores/gossip_kv/server/Dockerfile
new file mode 100644
index 000000000000..4032b4056fbe
--- /dev/null
+++ b/datastores/gossip_kv/server/Dockerfile
@@ -0,0 +1,13 @@
+FROM "hydroflow-gossip-kv-base-image:latest" AS builder
+WORKDIR /usr/src/gossip-kv-server
+COPY . .
+RUN find .
+RUN cargo build --release --workspace -p gossip_kv
+
+FROM rustlang/rust:nightly-slim
+COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_server /usr/local/bin/gossip_server
+
+RUN mkdir -p /config/static
+# Don't skip the trailing slash in the destination directory
+COPY datastores/gossip_kv/server/config/static/*.toml /config/static/
+CMD ["gossip_server"]
diff --git a/datastores/gossip_kv/server/README.md b/datastores/gossip_kv/server/README.md
new file mode 100644
index 000000000000..434315e29f4f
--- /dev/null
+++ b/datastores/gossip_kv/server/README.md
@@ -0,0 +1,44 @@
+From the `hydroflow` directory, run
+
+## Minikube
+
+### Start Minikube
+Disk allocation is done by the driver used to create the VM. Setting this to a high value will do nothing if the
+driver isn't correctly configured. You'll only notice that hydroflow runs out of disk space while compiling.
+For Docker, the disk size is set in the Docker Desktop settings. Also, provide as many CPUs here as possible, since
+building the code is CPU-intensive.
+```shell
+minikube start --disk-size=100g --cpus=16 --memory=32768
+```
+
+### Use the Docker daemon from minikube
+```shell
+eval $(minikube docker-env)
+```
+
+## Build Docker Base Image
+Speeds up code changes by caching build dependencies.
+```shell
+docker build -t "hydroflow/gossip-kv-server-base-image:latest" -f datastores/gossip_kv/server/baseimage.Dockerfile .
+```
+
+## Build Docker Image for Gossip Server
+```shell
+docker build -t "hydroflow/gossip-kv-server:latest" -f datastores/gossip_kv/server/Dockerfile .
+```
+
+## Build Docker Image for Gossip CLI
+```shell
+docker build -t "hydroflow/gossip-kv-cli:latest" -f datastores/gossip_kv/cli/Dockerfile .
+```
+
+## Check if minikube has the image
+You should see "hydroflow/gossip-kv"
+```shell
+minikube image ls --format tablemin
+```
+
+## Deploy to Minikube
+```shell
+kubectl apply -f datastores/gossip_kv/server/deployment/local/objects.yaml
+```
\ No newline at end of file
diff --git a/datastores/gossip_kv/server/baseimage.Dockerfile b/datastores/gossip_kv/server/baseimage.Dockerfile
new file mode 100644
index 000000000000..911de64a7f75
--- /dev/null
+++ b/datastores/gossip_kv/server/baseimage.Dockerfile
@@ -0,0 +1,23 @@
+FROM rustlang/rust:nightly AS builder
+WORKDIR /usr/src/gossip-kv-server-base-image
+COPY . .
+
+RUN apt-get update && apt-get install -y \
+ python3 \
+ python3.11-dev \
+ libpython3.11 \
+ build-essential \
+ && rm -rf /var/lib/apt/lists/*
+
+## Build everything, including dependencies. The built dependencies will be cached, so only changing the server
+## code requires lesser build time.
+RUN cargo build --release --workspace -p gossip_kv
+
+## Copy the built file to where the actual app will be built subsequently.
+RUN mkdir -p /usr/src/gossip-kv-server/target/release
+RUN mv /usr/src/gossip-kv-server-base-image/target/release/build/ /usr/src/gossip-kv-server/target/release/build/
+RUN mv /usr/src/gossip-kv-server-base-image/target/release/deps/ /usr/src/gossip-kv-server/target/release/deps/
+RUN mv /usr/src/gossip-kv-server-base-image/target/release/.fingerprint/ /usr/src/gossip-kv-server/target/release/.fingerprint/
+
+## Delete all the source code
+RUN rm -rf /usr/src/gossip-kv-server-base-image
\ No newline at end of file
diff --git a/datastores/gossip_kv/server/config/mod.rs b/datastores/gossip_kv/server/config/mod.rs
new file mode 100644
index 000000000000..13ff54873ad2
--- /dev/null
+++ b/datastores/gossip_kv/server/config/mod.rs
@@ -0,0 +1,130 @@
+use std::path::PathBuf;
+
+use config::{Config, ConfigError, File};
+use hydroflow::futures::future::ready;
+use hydroflow::futures::{Stream, StreamExt};
+use notify::{Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
+use serde::{Deserialize, Serialize};
+use tokio::sync::mpsc::UnboundedSender;
+use tracing::trace;
+
+/// L0 Data Store settings.
+#[derive(Debug, Deserialize, Serialize)]
+pub struct ServerSettings {
+ /// An initial set of "seed nodes" that can be used to bootstrap the gossip cluster. These
+ /// won't be the only nodes in the cluster, but they can be used to discover other nodes.
+ pub seed_nodes: Vec,
+}
+
+const CONFIG_ROOT: &str = "config";
+const STATIC_CONFIG_PATH: &str = "static";
+const DYNAMIC_CONFIG_PATH: &str = "dynamic";
+
+fn static_config_path(subpath: &str) -> PathBuf {
+ PathBuf::from(CONFIG_ROOT)
+ .join(STATIC_CONFIG_PATH)
+ .join(subpath)
+}
+
+fn dynamic_config_path(subpath: &str) -> PathBuf {
+ PathBuf::from(CONFIG_ROOT)
+ .join(DYNAMIC_CONFIG_PATH)
+ .join(subpath)
+}
+
+impl ServerSettings {
+ /// Load the settings from the configuration files.
+ pub fn new() -> Result {
+ let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
+
+ let settings = Config::builder()
+ /* Load the default settings from the `config/default.toml` file. */
+ .add_source(File::from(static_config_path("default.toml")).required(false))
+
+ /* Load additional overrides based on context (alpha, beta, production, etc.), if they exist. */
+ .add_source(File::from(static_config_path(&run_mode)).required(false))
+
+ /* Load the local settings, if they exist. These are .gitignored to prevent accidental
+ check-in. */
+ .add_source(File::from(static_config_path("local")).required(false))
+
+ /* Load the dynamic settings, if they exist. These always override any static configuration*/
+ .add_source(File::from(dynamic_config_path("dynamic.toml")).required(false))
+ .build()?;
+
+ settings.try_deserialize()
+ }
+}
+
+/// A seed node that can be used to bootstrap the gossip cluster.
+#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)]
+pub struct SeedNodeSettings {
+ /// The ID of the seed node.
+ pub id: String,
+
+ /// The address on which the seed node is listening for gossip messages.
+ pub address: String,
+}
+
+/// Setup a watcher for the settings files and return a stream of settings changes.
+///
+/// Returns the watcher, the initial settings, and a stream of settings changes. The watcher is
+/// returned so that it can be kept alive for the lifetime of the application. Also returns a
+/// snapshot of the current settings.
+pub fn setup_settings_watch() -> (
+ RecommendedWatcher,
+ ServerSettings,
+ impl Stream
- ,
+) {
+ let (tx, rx) = hydroflow::util::unbounded_channel();
+
+ // Setup the watcher
+ let mut watcher = notify::RecommendedWatcher::new(
+ UnboundedSenderEventHandler::new(tx),
+ notify::Config::default(),
+ )
+ .unwrap();
+ watcher
+ .watch(&PathBuf::from(CONFIG_ROOT), RecursiveMode::Recursive)
+ .unwrap();
+
+ // Read initial settings
+ let initial_settings = ServerSettings::new().unwrap();
+
+ let change_stream = rx
+ .map(Result::unwrap)
+ .map(|event| {
+ trace!("Event: {:?}", event);
+ match event.kind {
+ EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => {
+ Some(ServerSettings::new().unwrap())
+ }
+ _ => {
+ trace!("Unhandled event: {:?}", event);
+ None
+ }
+ }
+ })
+ .filter_map(ready);
+
+ // If the watcher is dropped, the stream will stop producing events. So, returning the watcher.
+ (watcher, initial_settings, change_stream)
+}
+
+/// Wraps an UnboundedSender to implement the notify::EventHandler trait. This allows sending
+/// file notification evnts to UnboundedSender instances.
+struct UnboundedSenderEventHandler {
+ tx: UnboundedSender>,
+}
+
+impl UnboundedSenderEventHandler {
+ fn new(tx: UnboundedSender>) -> Self {
+ Self { tx }
+ }
+}
+
+impl EventHandler for UnboundedSenderEventHandler {
+ fn handle_event(&mut self, event: notify::Result) {
+ self.tx.send(event).unwrap();
+ }
+}
diff --git a/datastores/gossip_kv/server/config/static/default.toml b/datastores/gossip_kv/server/config/static/default.toml
new file mode 100644
index 000000000000..b651079e8aa8
--- /dev/null
+++ b/datastores/gossip_kv/server/config/static/default.toml
@@ -0,0 +1,5 @@
+seed_nodes = []
+
+#[[seed_nodes]]
+#id = "gossip-kv-seed-nodes-0"
+#address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3000"
\ No newline at end of file
diff --git a/datastores/gossip_kv/server/config/static/development.toml b/datastores/gossip_kv/server/config/static/development.toml
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/datastores/gossip_kv/server/main.rs b/datastores/gossip_kv/server/main.rs
new file mode 100644
index 000000000000..cb7dac2c8b39
--- /dev/null
+++ b/datastores/gossip_kv/server/main.rs
@@ -0,0 +1,179 @@
+use std::convert::Infallible;
+use std::fmt::Debug;
+use std::future::ready;
+use std::hash::Hash;
+use std::io::Error;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use clap::Parser;
+use gossip_kv::membership::{MemberDataBuilder, Protocol};
+use gossip_kv::server::{server, SeedNode};
+use gossip_kv::{ClientRequest, GossipMessage};
+use hydroflow::futures::{SinkExt, StreamExt};
+use hydroflow::tokio_stream::wrappers::IntervalStream;
+use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
+use hydroflow::{bincode, tokio};
+use prometheus::{gather, Encoder, TextEncoder};
+use tracing::{error, info, trace};
+use warp::Filter;
+
+use crate::config::{setup_settings_watch, SeedNodeSettings};
+use crate::membership::member_name;
+
+mod config;
+
+mod membership;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)]
+struct Opts {
+ /// Port to listen for gossip messages.
+ #[clap(short, long, default_value = "3000")]
+ gossip_port: u16,
+
+ /// Port to listen for client requests.
+ #[clap(short, long, default_value = "3001")]
+ client_port: u16,
+}
+
+fn make_seed_node(settings: &SeedNodeSettings) -> SeedNode {
+ SeedNode {
+ id: settings.id.clone(),
+ address: ipv4_resolve(&settings.address).unwrap(),
+ }
+}
+
+async fn metrics_handler() -> Result {
+ let encoder = TextEncoder::new();
+ let metric_families = gather();
+ let mut buffer = Vec::new();
+ encoder.encode(&metric_families, &mut buffer).unwrap();
+
+ Ok(warp::reply::with_header(
+ buffer,
+ "Content-Type",
+ encoder.format_type(),
+ ))
+}
+
+#[hydroflow::main]
+async fn main() {
+ tracing_subscriber::fmt::init();
+
+ let opts: Opts = Opts::parse();
+
+ let metrics_route = warp::path("metrics").and_then(metrics_handler);
+ tokio::spawn(async move {
+ info!("Starting metrics server on port 4003");
+ warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await;
+ });
+
+ // Setup protocol information in the member metadata.
+ let client_protocol_address =
+ SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), opts.client_port);
+ let gossip_protocol_address =
+ SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), opts.gossip_port);
+
+ let member_data = MemberDataBuilder::new(member_name().clone())
+ .add_protocol(Protocol::new("gossip".into(), gossip_protocol_address))
+ .add_protocol(Protocol::new("client".into(), client_protocol_address))
+ .build();
+
+ let (client_outbound, client_inbound, _) = bind_udp_bytes(client_protocol_address).await;
+ let (gossip_outbound, gossip_inbound, _) = bind_udp_bytes(gossip_protocol_address).await;
+
+ info!(
+ "Server {:?} listening for client requests on: {:?}",
+ member_data.id, client_protocol_address
+ );
+
+ // TODO: Remove code duplication here.
+ // Setup message serialization for outbound client responses.
+ let client_ob = client_outbound.with(|(msg, addr)| {
+ ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>((
+ hydroflow::util::serialize_to_bytes(msg),
+ addr,
+ )))
+ });
+
+ // Setup message deserialization for inbound client requests.
+ let client_ib = client_inbound.filter_map(|input| {
+ let mapped = match input {
+ Ok((bytes, addr)) => {
+ let msg: bincode::Result =
+ hydroflow::util::deserialize_from_bytes(&bytes);
+ match msg {
+ Ok(msg) => Some((msg, addr)),
+ Err(e) => {
+ error!("Error deserializing message: {:?}", e);
+ None
+ }
+ }
+ }
+ Err(e) => {
+ error!("Error receiving message: {:?}", e);
+ None
+ }
+ };
+ ready(mapped)
+ });
+
+ // Setup message serialization for outbound client responses.
+ let gossip_ob = gossip_outbound.with(|(msg, addr)| {
+ ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>((
+ hydroflow::util::serialize_to_bytes(msg),
+ addr,
+ )))
+ });
+
+ // Setup message deserialization for inbound client requests.
+ let gossip_ib = gossip_inbound.filter_map(|input| {
+ let mapped = match input {
+ Ok((bytes, addr)) => {
+ let msg: bincode::Result =
+ hydroflow::util::deserialize_from_bytes(&bytes);
+ match msg {
+ Ok(msg) => Some((msg, addr)),
+ Err(e) => {
+ error!("Error deserializing message: {:?}", e);
+ None
+ }
+ }
+ }
+ Err(e) => {
+ error!("Error receiving message: {:?}", e);
+ None
+ }
+ };
+ ready(mapped)
+ });
+
+ let gossip_rx =
+ IntervalStream::new(tokio::time::interval(tokio::time::Duration::from_secs(5))).map(|_| ());
+
+ let (_watcher, server_settings, settings_stream) = setup_settings_watch();
+
+ let seed_nodes = server_settings
+ .seed_nodes
+ .iter()
+ .map(make_seed_node)
+ .collect::>();
+
+ let seed_node_stream = settings_stream.map(|settings| {
+ trace!("Settings updated. Reloading seed nodes");
+ settings.seed_nodes.iter().map(make_seed_node).collect()
+ });
+
+ // Create and run the server
+ let mut server = server(
+ client_ib,
+ client_ob,
+ gossip_ib,
+ gossip_ob,
+ gossip_rx,
+ member_data,
+ seed_nodes,
+ seed_node_stream,
+ );
+
+ server.run_async().await;
+}
diff --git a/datastores/gossip_kv/server/membership.rs b/datastores/gossip_kv/server/membership.rs
new file mode 100644
index 000000000000..2e20ab534bf0
--- /dev/null
+++ b/datastores/gossip_kv/server/membership.rs
@@ -0,0 +1,37 @@
+use std::sync::OnceLock;
+
+use gossip_kv::membership::MemberId;
+// use rand::distributions::Distribution;
+// use rand::{Rng};
+
+/// This is a simple distribution that generates a random lower-case alphanumeric
+// struct LowercaseAlphanumeric;
+//
+// impl Distribution for LowercaseAlphanumeric {
+// fn sample(&self, rng: &mut R) -> char {
+// let choices = b"abcdefghijklmnopqrstuvwxyz0123456789";
+// choices[rng.gen_range(0..choices.len())] as char
+// }
+// }
+
+/// Gets a name for the current process.
+pub fn member_name() -> &'static MemberId {
+ static MEMBER_NAME: OnceLock = OnceLock::new();
+ MEMBER_NAME.get_or_init(|| {
+ // Generate a lower-case alphanumeric suffix of length 4
+
+ // TODO: Random suffixes are good, but make benchmarking a pain. For now, we'll just use
+ // the hostname as-is.
+
+ // let suffix: String = thread_rng()
+ // .sample_iter(&LowercaseAlphanumeric)
+ // .take(4)
+ // .map(char::from)
+ // .collect();
+
+ // Retrieve hostname
+ hostname::get().unwrap().to_str().unwrap().to_string()
+
+ // format!("{}-{}", hostname, suffix)
+ })
+}