From 3143bf5010600c0825db5f9348c03883cb49fdc0 Mon Sep 17 00:00:00 2001 From: Rohit Kulshreshtha Date: Tue, 5 Nov 2024 11:11:09 -0800 Subject: [PATCH] feat(Gossip KV): Merge Gossip KV into `main`. (#1535) There are some TODOs in the code for which I have created issues for 0.10 milestone. [P0 - Must Have for 0.10](https://github.com/hydro-project/hydroflow/issues?q=is:open+label:Datastores/Gossip-KV+milestone:0.10+label:P0) [P1 - Good to have / stretch goals](https://github.com/hydro-project/hydroflow/issues?q=is:open+label:Datastores/Gossip-KV+milestone:0.10+label:P1) [P2 - Low Priority](https://github.com/hydro-project/hydroflow/issues?q=is:open+label:Datastores/Gossip-KV+milestone:0.10+label:P2) Closes #1530. --------- Co-authored-by: Mingwei Samuel --- .idea/hydroflow.iml | 3 + Cargo.lock | 741 ++++++++++++++++- Cargo.toml | 1 + datastores/gossip_kv/Cargo.toml | 45 ++ datastores/gossip_kv/Makefile | 83 ++ datastores/gossip_kv/README.md | 83 ++ datastores/gossip_kv/cli/Dockerfile | 12 + datastores/gossip_kv/cli/main.rs | 119 +++ .../deployment/aws/terraform/.gitignore | 1 + .../deployment/aws/terraform/main.tf | 561 +++++++++++++ .../deployment/aws/terraform/outputs.tf | 34 + .../deployment/aws/terraform/terraform.tf | 30 + .../deployment/aws/terraform/variables.tf | 29 + .../gossip_kv/deployment/local/objects.yaml | 90 +++ .../local/updated_seed_node_config.yaml | 17 + datastores/gossip_kv/kv/lattices/mod.rs | 248 ++++++ datastores/gossip_kv/kv/lib.rs | 273 +++++++ datastores/gossip_kv/kv/membership.rs | 85 ++ datastores/gossip_kv/kv/model.rs | 166 ++++ datastores/gossip_kv/kv/server.rs | 763 ++++++++++++++++++ datastores/gossip_kv/kv/util.rs | 87 ++ .../gossip_kv/load_test_server/Dockerfile | 11 + .../gossip_kv/load_test_server/server.rs | 226 ++++++ datastores/gossip_kv/server/.gitignore | 1 + datastores/gossip_kv/server/Dockerfile | 13 + datastores/gossip_kv/server/README.md | 44 + .../gossip_kv/server/baseimage.Dockerfile | 23 + datastores/gossip_kv/server/config/mod.rs | 130 +++ .../server/config/static/default.toml | 5 + .../server/config/static/development.toml | 0 datastores/gossip_kv/server/main.rs | 179 ++++ datastores/gossip_kv/server/membership.rs | 37 + 32 files changed, 4119 insertions(+), 21 deletions(-) create mode 100644 datastores/gossip_kv/Cargo.toml create mode 100644 datastores/gossip_kv/Makefile create mode 100644 datastores/gossip_kv/README.md create mode 100644 datastores/gossip_kv/cli/Dockerfile create mode 100644 datastores/gossip_kv/cli/main.rs create mode 100644 datastores/gossip_kv/deployment/aws/terraform/.gitignore create mode 100644 datastores/gossip_kv/deployment/aws/terraform/main.tf create mode 100644 datastores/gossip_kv/deployment/aws/terraform/outputs.tf create mode 100644 datastores/gossip_kv/deployment/aws/terraform/terraform.tf create mode 100644 datastores/gossip_kv/deployment/aws/terraform/variables.tf create mode 100644 datastores/gossip_kv/deployment/local/objects.yaml create mode 100644 datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml create mode 100644 datastores/gossip_kv/kv/lattices/mod.rs create mode 100644 datastores/gossip_kv/kv/lib.rs create mode 100644 datastores/gossip_kv/kv/membership.rs create mode 100644 datastores/gossip_kv/kv/model.rs create mode 100644 datastores/gossip_kv/kv/server.rs create mode 100644 datastores/gossip_kv/kv/util.rs create mode 100644 datastores/gossip_kv/load_test_server/Dockerfile create mode 100644 datastores/gossip_kv/load_test_server/server.rs create mode 100644 datastores/gossip_kv/server/.gitignore create mode 100644 datastores/gossip_kv/server/Dockerfile create mode 100644 datastores/gossip_kv/server/README.md create mode 100644 datastores/gossip_kv/server/baseimage.Dockerfile create mode 100644 datastores/gossip_kv/server/config/mod.rs create mode 100644 datastores/gossip_kv/server/config/static/default.toml create mode 100644 datastores/gossip_kv/server/config/static/development.toml create mode 100644 datastores/gossip_kv/server/main.rs create mode 100644 datastores/gossip_kv/server/membership.rs diff --git a/.idea/hydroflow.iml b/.idea/hydroflow.iml index fc353ac331a..03944299ddf 100644 --- a/.idea/hydroflow.iml +++ b/.idea/hydroflow.iml @@ -36,6 +36,9 @@ + + + diff --git a/Cargo.lock b/Cargo.lock index a53092a9c76..d4d39f542ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,6 +306,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "benches" version = "0.0.0" @@ -342,6 +348,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "block-buffer" @@ -628,6 +637,26 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "config" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be" +dependencies = [ + "async-trait", + "convert_case", + "json5", + "lazy_static", + "nom 7.1.3", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", +] + [[package]] name = "console" version = "0.15.8" @@ -651,6 +680,35 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -776,9 +834,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", @@ -836,6 +894,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "dunce" version = "1.0.5" @@ -860,6 +927,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_filter" version = "0.1.2" @@ -926,6 +1002,18 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "filetime" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fnv" version = "1.0.7" @@ -958,9 +1046,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -968,9 +1056,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -985,9 +1073,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1004,9 +1092,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1015,21 +1103,27 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1087,6 +1181,70 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gossip_kv" +version = "0.1.0" +dependencies = [ + "clap", + "config", + "governor", + "hostname", + "hydroflow", + "lattices", + "lazy_static", + "notify", + "prometheus", + "rand", + "serde", + "serde_json", + "shlex", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", + "warp", +] + +[[package]] +name = "governor" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.3", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", +] + +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -1097,6 +1255,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1113,6 +1277,30 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heck" version = "0.4.1" @@ -1152,6 +1340,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "hostname" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" +dependencies = [ + "cfg-if", + "libc", + "windows", +] + [[package]] name = "http" version = "0.2.12" @@ -1163,12 +1362,40 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" @@ -1203,7 +1430,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.20.1", ] [[package]] @@ -1424,6 +1651,30 @@ dependencies = [ "stageleft_tool", ] +[[package]] +name = "hyper" +version = "0.14.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1509,6 +1760,26 @@ dependencies = [ "str_stack", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "insta" version = "1.39.0" @@ -1593,6 +1864,37 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lattices" version = "0.5.7" @@ -1637,6 +1939,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.3", +] + [[package]] name = "libssh2-sys" version = "0.3.0" @@ -1721,6 +2034,22 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minicov" version = "0.3.5" @@ -1731,6 +2060,12 @@ dependencies = [ "walkdir", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.4" @@ -1740,6 +2075,18 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -1752,6 +2099,24 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multiplatform_test" version = "0.2.0" @@ -1798,12 +2163,51 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "filetime", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1923,6 +2327,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +dependencies = [ + "dlv-list", + "hashbrown 0.13.2", +] + [[package]] name = "overload" version = "0.1.1" @@ -1989,12 +2403,63 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pathdiff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" + [[package]] name = "percent-encoding" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3a6e3394ec80feb3b6393c725571754c6188490265c61aaf260810d6b95aa0" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94429506bde1ca69d1b5601962c73f4172ab4726571a59ea95931218cb0e930e" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.75", +] + +[[package]] +name = "pest_meta" +version = "2.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac8a071862e93690b6e34e9a5fb8e33ff3734473ac0245b27232222c4906a33f" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2145,10 +2610,31 @@ checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" dependencies = [ "byteorder", "libc", - "nom", + "nom 2.2.1", "rustc_version", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.3", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "pusherator" version = "0.0.8" @@ -2255,6 +2741,21 @@ dependencies = [ "serde", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -2313,6 +2814,15 @@ dependencies = [ "rand", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2437,6 +2947,28 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +dependencies = [ + "base64", + "bitflags 2.6.0", + "serde", + "serde_derive", +] + +[[package]] +name = "rust-ini" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rust-sitter" version = "0.4.3" @@ -2648,6 +3180,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2760,6 +3304,21 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "ssh2" version = "0.9.4" @@ -3036,6 +3595,15 @@ dependencies = [ "timely-logging-master", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -3070,7 +3638,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", @@ -3111,7 +3679,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.20.1", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", ] [[package]] @@ -3187,12 +3767,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3304,6 +3891,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8ddffe35a0e5eeeadf13ff7350af564c6e73993a24db62caee1822b185c2600" +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "try_match" version = "0.4.2" @@ -3361,7 +3954,26 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", "httparse", "log", "rand", @@ -3377,6 +3989,21 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -3398,6 +4025,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.1.13" @@ -3440,6 +4073,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" @@ -3490,6 +4132,44 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -3679,6 +4359,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -3920,6 +4610,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/Cargo.toml b/Cargo.toml index 8b9deaa71ad..ab8fef76bc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ # See "Adding new crates" and "Moving crates" addendum sections in `RELEASING.md` members = [ "benches", + "datastores/gossip_kv", "hydro_deploy/core", "hydro_deploy/hydro_cli", "hydro_deploy/hydro_cli_examples", diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml new file mode 100644 index 00000000000..b5976557869 --- /dev/null +++ b/datastores/gossip_kv/Cargo.toml @@ -0,0 +1,45 @@ +[package] +description = "Gossip KV Library" +name = "gossip_kv" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +publish = false + +[dependencies] +clap = { version = "4.5.4", features = ["derive"] } +config = "0.14.0" +governor = "0.7.0" +hostname = "0.4.0" +hydroflow = { path="../../hydroflow" } +lattices = { path = '../../lattices'} +lazy_static = "1.5.0" +# The specific set of features for Notify are picked to disable the default cross-beam channels (cause problems with +# tokio) and use std channels. See docs for more information: https://docs.rs/notify/6.1.1/notify/ +notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } +prometheus = "0.13.4" +rand = "0.8.5" +serde = "1.0.203" +serde_json = "1.0.117" +shlex = "1.3.0" +tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] } +tracing = "0.1.40" +tracing-subscriber = {version = "0.3.18", features = ["env-filter"]} +uuid = { version = "1.9.1", features = ["v4"] } +warp = "0.3.7" + +[[bin]] +name = "gossip_server" +path = "server/main.rs" + +[[bin]] +name = "load_test_server" +path = "load_test_server/server.rs" + +[[bin]] +name = "gossip_cli" +path = "cli/main.rs" + +[lib] +name = "gossip_kv" +path = "kv/lib.rs" \ No newline at end of file diff --git a/datastores/gossip_kv/Makefile b/datastores/gossip_kv/Makefile new file mode 100644 index 00000000000..dbfe144f0d7 --- /dev/null +++ b/datastores/gossip_kv/Makefile @@ -0,0 +1,83 @@ +# Makefile + +## Minikube Options. +MINIKUBE_DISK_SIZE:=100g +MINIKUBE_CPUS:=16 +MINIKUBE_MEMORY:=32768 + +BASE_IMAGE_VERSION:=latest +SERVER_IMAGE_VERSION:=latest +CLI_IMAGE_VERSION:=latest +LOAD_TEST_IMAGE_VERSION:=latest + +# Docker Image Tags +BASE_IMAGE_TAG:=hydroflow-gossip-kv-base-image:$(BASE_IMAGE_VERSION) +SERVER_IMAGE_TAG:=hydroflow-gossip-kv-server:$(SERVER_IMAGE_VERSION) +CLI_IMAGE_TAG:=hydroflow-gossip-kv-cli:$(CLI_IMAGE_VERSION) +LOAD_TEST_IMAGE_TAG:=hydroflow-gossip-kv-load-test:$(LOAD_TEST_IMAGE_VERSION) + +AWS_TERRAFORM_PATH=../../datastores/gossip_kv/deployment/aws/terraform + +# Target to start Minikube with specific options +start_minikube: + minikube start --disk-size=$(MINIKUBE_DISK_SIZE) --cpus=$(MINIKUBE_CPUS) --memory=$(MINIKUBE_MEMORY) + @echo "Please run 'eval \$$(minikube docker-env)' to use the Minikube Docker daemon" + +# Target to build the Docker images +build_docker_images: build_base_image build_server_image build_cli_image build_load_test_image + +build_base_image: + docker build -t "$(BASE_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/baseimage.Dockerfile ../.. + +build_server_image: + docker build -t "$(SERVER_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/Dockerfile ../.. + +build_cli_image: + docker build -t "$(CLI_IMAGE_TAG)" -f ../../datastores/gossip_kv/cli/Dockerfile ../.. + +build_load_test_image: + docker build -t "$(LOAD_TEST_IMAGE_TAG)" -f ../../datastores/gossip_kv/load_test_server/Dockerfile ../.. + +# Target to clean up the Minikube cluster +clean_local: + minikube delete + +# Target to deploy the Gossip KV Server to the Minikube cluster +deploy_local: + kubectl apply -f ../../datastores/gossip_kv/server/local + +# Target to delete the Minikube cluster and build again +rebuild_local: clean_local start_minikube build_docker_images + +aws_terraform_init: + terraform -chdir="$(AWS_TERRAFORM_PATH)" init + +aws_terraform_apply: + terraform -chdir="$(AWS_TERRAFORM_PATH)" apply + +aws_setup_kubectl: + @echo "Setting up kubectl to work with AWS EKS Cluster" + aws eks update-kubeconfig --region $$(terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw region) --name $$(terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw cluster_name) + +aws_upload_docker_images: build_docker_images + $(eval SERVER_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_server"]')) + $(eval CLI_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_cli"]')) + $(eval LOAD_TEST_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_load_test"]')) + $(eval REGION := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw region)) + docker tag $(SERVER_IMAGE_TAG) $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION) + docker tag $(CLI_IMAGE_TAG) $(CLI_REPO_URL):$(CLI_IMAGE_VERSION) + docker tag $(LOAD_TEST_IMAGE_TAG) $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION) + aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(SERVER_REPO_URL) + docker push $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION) + aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(CLI_REPO_URL) + docker push $(CLI_REPO_URL):$(CLI_IMAGE_VERSION) + aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(LOAD_TEST_REPO_URL) + docker push $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION) + +aws_tunnel_grafana: + $(eval GRAFANA_PORT := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw grafana_port)) + kubectl port-forward svc/grafana $(GRAFANA_PORT):$(GRAFANA_PORT) + +aws_tunnel_prometheus: + $(eval PROMETHEUS_PORT := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw prometheus_port)) + kubectl port-forward svc/prometheus $(PROMETHEUS_PORT):$(PROMETHEUS_PORT) \ No newline at end of file diff --git a/datastores/gossip_kv/README.md b/datastores/gossip_kv/README.md new file mode 100644 index 00000000000..b282dafe0f1 --- /dev/null +++ b/datastores/gossip_kv/README.md @@ -0,0 +1,83 @@ +# Gossip Key-Value Store + +# Architecture +A gossip-based key-value store library. + +``` +┌─────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐ +│ Process │ │ Process │ │ Process │ │ Process │ +│ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ +│ │ User Application │ │ │ │ User Application │ │ │ │ User Application │ │ │ │ User Application │ │ +│ └────────▲──┬─────────┘ │ │ └─────────▲─┬─────────┘ │ │ └─────────▲─┬─────────┘ │ │ └─────────▲─┬─────────┘ │ +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ +│ ┌────────┴──▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │ │ ┌─────────┴─▼─────────┐ │ +│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │◄──►│ │ Gossip KV Store │ │ +│ └─────────────────────┘ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │ +└─────────────────────────┘ └─────────────────────────┘ └─────────────────────────┘ └─────────────────────────┘ +``` + +## Data Model +TODO: Elaborate +* User Application manipulate data using client library +* Replicated to all members of the gossip cluster +* Eventually consistent + +```json +{ + "sys": { + "members": { + "member_id": { + "port": 1234, + "protocol": "v1" + } + } + }, + "usr": { + "key 1": "value 1", + "key 2": "value 2" + } +} +``` +Data in divided into two sections: A `sys` section that contains system data used by the key-value store itself. The +`usr` section contains user-defined data. + +### `sys` Data +The `sys` data section contains system data / state that is required by the key-value store to do it's work. + +#### Fields + +### `usr` Data + +## Protocol + +## Checkpoints + +# Running Locally Using Minikube +## Install Docker Desktop +```shell +brew install --cask docker +``` +### Run docker (macOS) +``` +open -a Docker +``` + +## Install Minikube +Read more [here](https://minikube.sigs.k8s.io/docs/start/) +```shell +brew install minikube +``` + +## Start Minikube +```shell +minikube start +``` + +## Install `kubectl` +```shell +brew install kubectl +``` +## Configure Minikube to use your Docker Environment +```shell +eval $(minikube -p minikube docker-env) +``` \ No newline at end of file diff --git a/datastores/gossip_kv/cli/Dockerfile b/datastores/gossip_kv/cli/Dockerfile new file mode 100644 index 00000000000..208948c5076 --- /dev/null +++ b/datastores/gossip_kv/cli/Dockerfile @@ -0,0 +1,12 @@ +FROM "hydroflow-gossip-kv-base-image:latest" AS builder +WORKDIR /usr/src/gossip-kv-server +COPY . . +RUN find . +RUN cargo build --release --workspace -p gossip_kv + +FROM rustlang/rust:nightly-slim +COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_cli /usr/local/bin/gossip_cli + +RUN apt-get update && apt-get install -y \ + dnsutils \ + && rm -rf /var/lib/apt/lists/* diff --git a/datastores/gossip_kv/cli/main.rs b/datastores/gossip_kv/cli/main.rs new file mode 100644 index 00000000000..8834da9ce45 --- /dev/null +++ b/datastores/gossip_kv/cli/main.rs @@ -0,0 +1,119 @@ +use std::net::SocketAddr; + +use clap::{CommandFactory, Parser, Subcommand}; +use gossip_kv::{ClientRequest, ClientResponse, Key}; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use hydroflow::{hydroflow_syntax, tokio, DemuxEnum}; +use tracing::error; + +/// CLI program to interact with Layer 0 gossip store. +#[derive(Debug, Parser)] +struct Opts { + #[clap(short, long, help = "Server address to connect to.")] + server_address: Option, +} + +/// Dummy app for using clap to process commands for interactive CLI. +#[derive(Debug, Parser)] +#[command(multicall = true)] +struct InteractiveApp { + #[clap(subcommand)] + commands: InteractiveCommands, +} + +#[derive(Debug, Subcommand, DemuxEnum)] +enum InteractiveCommands { + /// Get a value from the store. + Get { + #[arg(value_parser = parse_key, required = true, help = "Key to get")] + key: Key, + }, + /// Upsert a value in the store. + Set { + #[arg(value_parser = parse_key, required = true, help = "Key to set")] + key: Key, + value: String, + }, + /// Delete a value from the store. + Delete { + #[arg(value_parser = parse_key, required = true, help = "Key to delete")] + key: Key, + }, + /// Exit the application. + Exit, +} + +/// Allows clap to parse Keys from user input. +fn parse_key(s: &str) -> Result { + s.parse::().map_err(|e| e.to_string()) +} + +/// Parse a command from a line of input. +fn parse_command(line: String) -> Option { + // Override how help is handled. + if line.trim() == "help" { + InteractiveApp::command() + .help_template("\nAvailable Commands: \n{subcommands}") + .print_help() + .unwrap(); + return None; + } + + // Split quoted string into parts. + let line_parts = shlex::split(&line); + + if line_parts.is_none() { + error!("\nUnable to parse command."); + return None; + } + + // Provide split parts to clap to process. + let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap()); + + match maybe_parsed { + Err(e) => { + // Problem with the parsed result. This displays some help. + error!("\n{}", e); + None + } + Ok(cli) => Some(cli.commands), + } +} + +#[hydroflow::main] +async fn main() { + tracing_subscriber::fmt::init(); + + let opts = Opts::parse(); + + // Bind to OS-assigned port on localhost. + let address = ipv4_resolve("0.0.0.0:0").unwrap(); + + // Default to localhost:3000 if not provided. + let server_address = opts.server_address.map_or_else( + || ipv4_resolve("localhost:3001").unwrap(), + |s| ipv4_resolve(&s).unwrap(), + ); + + // Setup UDP sockets for communication. + let (outbound, inbound, _) = bind_udp_bytes(address).await; + + let mut cli = hydroflow_syntax! { + inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response)); + + outbound_messages = union() -> dest_sink_serde(outbound); + + // Parse commands from stdin. + commands = source_stdin() + -> filter_map(|line| parse_command(line.unwrap())) + -> demux_enum::(); + + commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages; + commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages; + commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages; + commands[Exit] -> for_each(|()| std::process::exit(0)); // TODO: Graceful shutdown https://github.com/hydro-project/hydroflow/issues/1253 + + }; + + cli.run_async().await; +} diff --git a/datastores/gossip_kv/deployment/aws/terraform/.gitignore b/datastores/gossip_kv/deployment/aws/terraform/.gitignore new file mode 100644 index 00000000000..3fa8c86b7b0 --- /dev/null +++ b/datastores/gossip_kv/deployment/aws/terraform/.gitignore @@ -0,0 +1 @@ +.terraform diff --git a/datastores/gossip_kv/deployment/aws/terraform/main.tf b/datastores/gossip_kv/deployment/aws/terraform/main.tf new file mode 100644 index 00000000000..47799f259c4 --- /dev/null +++ b/datastores/gossip_kv/deployment/aws/terraform/main.tf @@ -0,0 +1,561 @@ +provider "aws" { + region = var.region +} + +# Filter out local zones, which are not currently supported +# with managed node groups +data "aws_availability_zones" "available" { + filter { + name = "opt-in-status" + values = ["opt-in-not-required"] + } +} + +data "aws_caller_identity" "current" {} + +locals { + cluster_name = "anna-load-test-${random_string.suffix.result}" + account_id = data.aws_caller_identity.current.account_id +} + +resource "random_string" "suffix" { + length = 8 + special = false +} + +module "vpc" { + source = "terraform-aws-modules/vpc/aws" + version = "5.8.1" + + name = "anna-load-test-vpc" + + cidr = "10.0.0.0/16" + azs = slice(data.aws_availability_zones.available.names, 0, 3) + + map_public_ip_on_launch = true + public_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"] + + enable_dns_hostnames = true + + public_subnet_tags = { + "kubernetes.io/role/elb" = 1 + } +} + +module "eks_cluster" { + source = "terraform-aws-modules/eks/aws" + version = "20.24.3" + + cluster_name = local.cluster_name + cluster_version = "1.31" + + cluster_endpoint_public_access = true + enable_cluster_creator_admin_permissions = true + + cluster_addons = { + aws-ebs-csi-driver = { + service_account_role_arn = module.irsa-ebs-csi.iam_role_arn + } + } + + vpc_id = module.vpc.vpc_id + subnet_ids = module.vpc.public_subnets + + eks_managed_node_group_defaults = { + ami_type = "AL2_x86_64" + } + + eks_managed_node_groups = { + one = { + name = "servers" + + instance_types = [var.instance_type] + + min_size = 1 + max_size = 3 + desired_size = 2 + } + } +} + +# https://aws.amazon.com/blogs/containers/amazon-ebs-csi-driver-is-now-generally-available-in-amazon-eks-add-ons/ +data "aws_iam_policy" "ebs_csi_policy" { + arn = "arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy" +} + +module "irsa-ebs-csi" { + source = "terraform-aws-modules/iam/aws//modules/iam-assumable-role-with-oidc" + version = "5.39.0" + + create_role = true + role_name = "AmazonEKSTFEBSCSIRole-${module.eks_cluster.cluster_name}" + provider_url = module.eks_cluster.oidc_provider + role_policy_arns = [data.aws_iam_policy.ebs_csi_policy.arn] + oidc_fully_qualified_subjects = ["system:serviceaccount:kube-system:ebs-csi-controller-sa"] +} + +variable "ecr_repositories" { + description = "List of ECR repository names" + type = list(string) + default = ["gossip_kv_server", "gossip_kv_cli", "gossip_kv_load_test"] +} + +module "ecr" { + source = "terraform-aws-modules/ecr/aws" + version = "2.3.0" + + for_each = {for repo in var.ecr_repositories : repo => repo} + repository_name = each.value + + repository_read_write_access_arns = [data.aws_caller_identity.current.arn] + repository_lifecycle_policy = jsonencode({ + rules = [ + { + rulePriority = 1, + description = "Keep last 30 images", + selection = { + tagStatus = "tagged", + tagPrefixList = ["v"], + countType = "imageCountMoreThan", + countNumber = 30 + }, + action = { + type = "expire" + } + } + ] + }) + + repository_image_tag_mutability = "MUTABLE" + tags = { + Terraform = "true" + Environment = "dev" + } +} + +provider "kubernetes" { + host = module.eks_cluster.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks_cluster.cluster_certificate_authority_data) + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + args = [ + "eks", + "get-token", + "--cluster-name", + module.eks_cluster.cluster_name, + ] + } +} + +resource "kubernetes_stateful_set" "gossip_kv_seed_nodes" { + metadata { + name = "gossip-kv-seed-nodes" + labels = { + app = "gossip-kv-seed-nodes" + } + } + + spec { + service_name = "gossip-kv-seed-nodes" + replicas = 1 + + selector { + match_labels = { + app = "gossip-kv-seed-nodes" + } + } + + template { + metadata { + labels = { + app = "gossip-kv-seed-nodes" + } + annotations = { + "prometheus.io/scrape" : "true" + "prometheus.io/port" : var.pod_monitoring_port + } + } + + spec { + termination_grace_period_seconds = 5 + + container { + name = "gossip-kv-server" + image = "${module.ecr.gossip_kv_load_test.repository_url}:latest" + image_pull_policy = "Always" + + env { + name = "RUST_LOG" + value = "trace" + } + + env { + name = "RUST_BACKTRACE" + value = "full" + } + + port { + container_port = 3001 + protocol = "UDP" + } + + port { + container_port = var.pod_monitoring_port + protocol = "TCP" + } + + volume_mount { + name = "gossip-kv-dynamic-config" + mount_path = "/config/dynamic" + } + } + + volume { + name = "gossip-kv-dynamic-config" + + config_map { + name = "gossip-kv-dynamic-config" + } + } + } + } + } +} + +resource "kubernetes_deployment" "gossip_kv_cli" { + metadata { + name = "gossip-kv-cli" + labels = { + app = "gossip-kv-cli" + } + } + + spec { + replicas = 1 + + selector { + match_labels = { + app = "gossip-kv-cli" + } + } + + template { + metadata { + labels = { + app = "gossip-kv-cli" + } + } + + spec { + termination_grace_period_seconds = 5 + + container { + name = "gossip-kv-cli" + image = "${module.ecr.gossip_kv_cli.repository_url}:latest" + image_pull_policy = "Always" + command = ["/bin/sh"] + args = ["-c", "while true; do sleep 3600; done"] + tty = true + + env { + name = "RUST_LOG" + value = "info" + } + } + } + } + } +} + +resource "kubernetes_service" "gossip_kv_seed_nodes" { + metadata { + name = "gossip-kv-seed-nodes" + labels = { + app = "gossip-kv-seed-nodes" + } + } + + spec { + cluster_ip = "None" + selector = { + app = "gossip-kv-seed-nodes" + } + + port { + port = 3001 + target_port = 3001 + protocol = "UDP" + } + } +} + +resource "kubernetes_config_map" "gossip_kv_dynamic_config" { + metadata { + name = "gossip-kv-dynamic-config" + } + + data = { + "dynamic.toml" = < details.repository_url } +} + +output "grafana_port" { + description = "Port for Grafana UI" + value = var.grafana_port +} + +output "prometheus_port" { + description = "Port for Prometheus UI" + value = var.prometheus_port +} \ No newline at end of file diff --git a/datastores/gossip_kv/deployment/aws/terraform/terraform.tf b/datastores/gossip_kv/deployment/aws/terraform/terraform.tf new file mode 100644 index 00000000000..6b54c550a22 --- /dev/null +++ b/datastores/gossip_kv/deployment/aws/terraform/terraform.tf @@ -0,0 +1,30 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 5.70.0" + } + + random = { + source = "hashicorp/random" + version = "~> 3.6.1" + } + + tls = { + source = "hashicorp/tls" + version = "~> 4.0.5" + } + + cloudinit = { + source = "hashicorp/cloudinit" + version = "~> 2.3.4" + } + + kubernetes = { + source = "hashicorp/kubernetes" + version = ">= 2.32.0" + } + } + + required_version = "~> 1.3" +} \ No newline at end of file diff --git a/datastores/gossip_kv/deployment/aws/terraform/variables.tf b/datastores/gossip_kv/deployment/aws/terraform/variables.tf new file mode 100644 index 00000000000..fb8ff7995be --- /dev/null +++ b/datastores/gossip_kv/deployment/aws/terraform/variables.tf @@ -0,0 +1,29 @@ +variable "region" { + description = "AWS region where the resources will be created" + type = string + default = "us-east-2" +} + +variable "instance_type" { + description = "Instance type for the EC2 instances" + type = string + default = "t3.small" +} + +variable "grafana_port" { + description = "Port for Grafana UI" + type = number + default = 4001 +} + +variable "prometheus_port" { + description = "Port for Prometheus UI" + type = number + default = 4002 +} + +variable "pod_monitoring_port" { + description = "Port for monitoring pods using prometheus. Every pod runs a prometheus exporter on this port." + type = number + default = 4003 +} \ No newline at end of file diff --git a/datastores/gossip_kv/deployment/local/objects.yaml b/datastores/gossip_kv/deployment/local/objects.yaml new file mode 100644 index 00000000000..7421df268e1 --- /dev/null +++ b/datastores/gossip_kv/deployment/local/objects.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: gossip-kv-seed-nodes + labels: + app: gossip-kv-seed-nodes +spec: + replicas: 3 + serviceName: gossip-kv-seed-nodes + selector: + matchLabels: + app: gossip-kv-seed-nodes + template: + metadata: + labels: + app: gossip-kv-seed-nodes + spec: + terminationGracePeriodSeconds: 5 # Really aggressive, but makes teardown faster. Not recommended beyond benchmarking. + containers: + - name: gossip-kv-server + image: docker.io/hydroflow/gossip-kv-server:latest + imagePullPolicy: IfNotPresent +# Uncomment the following for debugging +# command: [ "/bin/sh" ] +# args: [ "-c", "while true; do sleep 3600; done" ] + env: + - name: RUST_LOG + value: "trace" + - name: RUST_BACKTRACE + value: "full" + ports: + - containerPort: 3001 + protocol: UDP + volumeMounts: + - name: gossip-kv-dynamic-config + mountPath: /config/dynamic + volumes: + - name: gossip-kv-dynamic-config + configMap: + name: gossip-kv-dynamic-config +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gossip-kv-cli + labels: + app: gossip-kv-cli +spec: + replicas: 1 + selector: + matchLabels: + app: gossip-kv-cli + template: + metadata: + labels: + app: gossip-kv-cli + spec: + terminationGracePeriodSeconds: 5 # Really aggressive, but makes teardown faster. Not recommended beyond benchmarking. + containers: + - name: gossip-kv-cli + image: docker.io/hydroflow/gossip-kv-cli:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: ["-c", "while true; do sleep 3600; done"] + tty: true + env: + - name: RUST_LOG + value: "info" +--- +apiVersion: v1 +kind: Service +metadata: + name: gossip-kv-seed-nodes + labels: + app: gossip-kv-seed-nodes +spec: + ports: + - port: 3001 + targetPort: 3001 + protocol: UDP + clusterIP: None + selector: + app: gossip-kv-seed-nodes +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: gossip-kv-dynamic-config +data: + dynamic.toml: | \ No newline at end of file diff --git a/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml b/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml new file mode 100644 index 00000000000..bebeda8cb5e --- /dev/null +++ b/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: gossip-kv-dynamic-config +data: + dynamic.toml: | + [[seed_nodes]] + id = "gossip-kv-seed-nodes-0" + address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3000" + + [[seed_nodes]] + id = "gossip-kv-seed-nodes-1" + address = "gossip-kv-seed-nodes-1.gossip-kv-seed-nodes.default.svc.cluster.local:3000" + + [[seed_nodes]] + id = "gossip-kv-seed-nodes-2" + address = "gossip-kv-seed-nodes-2.gossip-kv-seed-nodes.default.svc.cluster.local:3000" \ No newline at end of file diff --git a/datastores/gossip_kv/kv/lattices/mod.rs b/datastores/gossip_kv/kv/lattices/mod.rs new file mode 100644 index 00000000000..1aa5883b4bd --- /dev/null +++ b/datastores/gossip_kv/kv/lattices/mod.rs @@ -0,0 +1,248 @@ +use std::cmp::Ordering; +use std::collections::HashSet; +use std::hash::Hash; + +use hydroflow::lattices::{IsBot, IsTop, LatticeFrom, LatticeOrd, Merge}; +use serde::{Deserialize, Serialize}; + +/// A bounded set union lattice with a fixed size N. +/// +/// Once the set reaches size N, it becomes top. The items in the set are no longer tracked to +/// reclaim associated memory. +#[derive(Debug, Clone, Eq, Serialize, Deserialize)] + +pub struct BoundedSetLattice +where + T: Eq + Hash, +{ + // The set of items in the lattice with invariant: + // is_top => items.is_empty() ... i.e. the items are dropped when the lattice reaches top. + items: HashSet, + is_top: bool, +} + +impl LatticeFrom> for BoundedSetLattice +where + T: Eq + Hash, +{ + fn lattice_from(other: BoundedSetLattice) -> Self { + other + } +} + +impl Default for BoundedSetLattice +where + T: Eq + Hash, +{ + fn default() -> Self { + Self { + items: HashSet::new(), + is_top: N == 0, // This lattice is effectively a unit lattice `()`, if N == 0 + } + } +} + +impl From<()> for BoundedSetLattice +where + T: Eq + Hash, +{ + fn from(_: ()) -> Self { + Default::default() + } +} + +impl From> for () +where + T: Eq + Hash, +{ + fn from(_: BoundedSetLattice) -> Self {} +} + +impl BoundedSetLattice +where + T: Eq + Hash, +{ + pub fn new() -> Self { + Default::default() + } + + pub fn new_from(items: U) -> Self + where + U: IntoIterator, + { + let mut lattice = Self::new(); + lattice.merge(items); + lattice + } +} + +impl IsBot for BoundedSetLattice +where + T: Eq + Hash, +{ + fn is_bot(&self) -> bool { + match N { + 0 => true, + _ => self.items.is_empty() && !self.is_top, + } + } +} + +impl IsTop for BoundedSetLattice +where + T: Eq + Hash, +{ + fn is_top(&self) -> bool { + self.is_top + } +} + +impl Merge for BoundedSetLattice +where + U: IntoIterator, + T: Eq + Hash, +{ + fn merge(&mut self, other: U) -> bool { + if self.is_top { + return false; + } + + let old_len = self.items.len(); + self.items.extend(other); + let new_len = self.items.len(); + + if new_len >= N { + self.is_top = true; + self.items.clear(); + } + + new_len != old_len + } +} + +impl PartialOrd for BoundedSetLattice +where + T: Eq + Hash, +{ + fn partial_cmp(&self, other: &Self) -> Option { + match (self.is_top, other.is_top) { + (true, true) => Some(Ordering::Equal), + (true, false) => Some(Ordering::Greater), + (false, true) => Some(Ordering::Less), + (false, false) => match self.items.len().cmp(&other.items.len()) { + Ordering::Greater => { + if other.items.iter().all(|key| self.items.contains(key)) { + Some(Ordering::Greater) + } else { + None + } + } + Ordering::Less => { + if self.items.iter().all(|key| other.items.contains(key)) { + Some(Ordering::Less) + } else { + None + } + } + Ordering::Equal => { + if self.items.iter().all(|key| other.items.contains(key)) { + Some(Ordering::Equal) + } else { + None + } + } + }, + } + } +} + +impl PartialEq for BoundedSetLattice +where + T: Eq + Hash, +{ + fn eq(&self, other: &Self) -> bool { + match (self.is_top, other.is_top) { + (true, true) => true, + (true, false) => false, + (false, true) => false, + (false, false) => self.items == other.items, + } + } +} + +impl LatticeOrd for BoundedSetLattice where T: Eq + Hash {} + +impl Merge> for BoundedSetLattice +where + T: Eq + Hash, +{ + fn merge(&mut self, other: BoundedSetLattice) -> bool { + match (self.is_top, other.is_top) { + (true, _) => false, + (false, true) => { + self.is_top = true; + self.items.clear(); + true + } + (false, false) => self.merge(other.items), + } + } +} + +#[cfg(test)] +mod tests { + use hydroflow::lattices::test::check_all; + + use super::*; + + #[test] + fn test_0_bounded_set_lattice() { + let mut lat: BoundedSetLattice = ().into(); + assert!(lat.is_bot() && lat.is_top()); + + // Merges should always return false. + assert!(!lat.merge([1])); + + // No changes to top/bot status. + assert!(lat.is_bot() && lat.is_top()); + } + + #[test] + fn test_1_bounded_set_lattice() { + // The bounded lattice with N = 1 is effectively a WithBottom lattice. + let mut lat = BoundedSetLattice::::new(); + assert!(lat.is_bot() && !lat.is_top()); + assert!(lat.items.is_empty()); + + assert!(lat.merge([1])); + assert!(!lat.is_bot() && lat.is_top()); + assert!(lat.items.is_empty()); // Check that the items were dropped. + + assert!(!lat.merge([2])); + } + + #[test] + fn test_2_bounded_set_lattice() { + let mut a = BoundedSetLattice::::new(); + let b: BoundedSetLattice = BoundedSetLattice::new_from([1, 2]); + + assert!(a.is_bot() && !a.is_top()); + assert!(!b.is_bot() && b.is_top()); + + assert!(a.merge(b)); + assert!(!a.is_bot() && a.is_top()); + + assert!(!a.merge([3])); + } + + #[test] + fn test_lattice_properties() { + check_all(&[ + Default::default(), + BoundedSetLattice::::new_from([1]), + BoundedSetLattice::::new_from([1, 2]), + BoundedSetLattice::::new_from([1, 2, 3]), + BoundedSetLattice::::new_from([1, 2, 3, 4]), + ]); + } +} diff --git a/datastores/gossip_kv/kv/lib.rs b/datastores/gossip_kv/kv/lib.rs new file mode 100644 index 00000000000..15badc703be --- /dev/null +++ b/datastores/gossip_kv/kv/lib.rs @@ -0,0 +1,273 @@ +pub mod membership; +pub mod model; + +pub mod server; + +pub mod lattices; + +pub mod util; + +use std::collections::HashSet; +use std::fmt::Display; +use std::str::FromStr; + +use serde::{Deserialize, Serialize}; + +use crate::model::{Clock, Namespaces}; +use crate::KeyParseError::InvalidNamespace; + +/// The namespace of the key of an entry in the key-value store. +#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Hash)] +pub enum Namespace { + /// User namespace is for use by the user of the key-value store. + User, + + /// System namespace is reserved for use by the key-value store itself. + System, +} + +/// Error that can occur when parsing a key from a string. +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub enum KeyParseError { + /// The namespace in the key is invalid. Namespaces must be either `usr` or `sys`. + InvalidNamespace, + + /// The key is in an invalid format. Keys must be of the form `/namespace/table/row`. + InvalidFormat, +} + +impl Display for KeyParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + InvalidNamespace => write!(f, "Invalid namespace"), + KeyParseError::InvalidFormat => write!(f, "Invalid key format"), + } + } +} + +impl FromStr for Namespace { + type Err = KeyParseError; + fn from_str(s: &str) -> Result { + match s { + "usr" => Ok(Namespace::User), + "sys" => Ok(Namespace::System), + _ => Err(InvalidNamespace), + } + } +} + +/// The name of a table in the key-value store. +pub type TableName = String; + +/// The key of a row in a table in the key-value store. +pub type RowKey = String; + +/// A key of an entry in the key-value store. +/// +/// Data in the key-value store is organized into namespaces, tables, and rows. Namespaces are +/// either `usr` for user data or `sys` for system data. Namespaces contain tables, which contain +/// rows. Each row has a row key and a row value. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)] +pub struct Key { + /// The namespace of the key. + pub namespace: Namespace, + /// The name of the table in the key. + pub table: TableName, + /// The key of the row in the table. + pub row_key: RowKey, +} + +impl FromStr for Key { + type Err = KeyParseError; + fn from_str(s: &str) -> Result { + let mut parts: Vec = vec![]; + let mut current_part = String::new(); + let mut escaping = false; + + for c in s.chars() { + match (escaping, c) { + (true, '\\') | (true, '/') => { + current_part.push(c); + escaping = false; + } + (true, _) => return Err(KeyParseError::InvalidFormat), + (false, '\\') => { + escaping = true; + } + (false, '/') => { + parts.push(current_part); + current_part = String::new(); + } + (false, _) => { + current_part.push(c); + } + } + } + + if escaping { + return Err(KeyParseError::InvalidFormat); + } + + if !current_part.is_empty() { + parts.push(current_part); + } + + if parts.len() != 4 { + return Err(KeyParseError::InvalidFormat); + } + + if !parts[0].is_empty() { + return Err(KeyParseError::InvalidFormat); + } + + if parts[2].is_empty() || parts[3].is_empty() { + return Err(KeyParseError::InvalidFormat); + } + + let namespace = parts[1].parse()?; + Ok(Key { + namespace, + table: parts[2].to_string(), + row_key: parts[3].to_string(), + }) + } +} + +/// A request from a client to the key-value store. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub enum ClientRequest { + /// A request to get the value of a key. + Get { key: Key }, + /// A request to set the value of a key. + Set { key: Key, value: String }, + /// A request to delete the value of a key. + Delete { key: Key }, +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub enum ClientResponse { + /// A response for a get request. The key is echoed back along with the value, if it exists. + /// Multiple values are returned if there were concurrent writes to the key. + Get { key: Key, value: HashSet }, + /// A response for a set request. The success field is true if the set was successful. + Set { success: bool }, + /// A response for a delete request. The success field is true if delete was successful. + Delete { success: bool }, +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub enum GossipMessage { + /// An "infecting message" to share updates with a peer. + Gossip { + message_id: String, + member_id: String, + writes: Namespaces, + }, + /// An acknowledgement message sent by a peer in response to a Gossip message, to indicate + /// that it hasn't seen some of the writes in the Gossip message before. + Ack { + message_id: String, + member_id: String, + }, + /// A negative acknowledgement sent by a peer in response to a Gossip message, to indicate + /// that it has seen all of the writes in the Gossip message before. + Nack { + message_id: String, + member_id: String, + }, +} + +#[cfg(test)] +mod tests { + use super::{Key, Namespace}; + + #[test] + fn test_key_parsing_sys_namespace() { + // Sys namespace + let first = "/sys/sys_table/sys_row".parse::().unwrap(); + assert_eq!(first.namespace, Namespace::System); + assert_eq!(first.table, "sys_table"); + assert_eq!(first.row_key, "sys_row"); + } + #[test] + fn test_key_parsing_user_namespace() { + // User namespace + let second = "/usr/usr_table/usr_row".parse::().unwrap(); + assert_eq!(second.namespace, Namespace::User); + assert_eq!(second.table, "usr_table"); + assert_eq!(second.row_key, "usr_row"); + } + + #[test] + fn test_key_empty_table() { + // Empty table + let empty_table = "/usr//usr_row".parse::(); + assert!(empty_table.is_err()); + assert_eq!( + empty_table.unwrap_err(), + super::KeyParseError::InvalidFormat + ); + } + + #[test] + fn test_key_empty_row() { + // Empty row + let empty_row = "/usr/usr_table/".parse::(); + assert!(empty_row.is_err()); + assert_eq!(empty_row.unwrap_err(), super::KeyParseError::InvalidFormat); + } + + #[test] + fn test_key_parsing_invalid_namespace() { + // Invalid namespace + let non_existent_namespace = "/ne_namespace/ne_table/ne_row".parse::(); + assert!(non_existent_namespace.is_err()); + assert_eq!( + non_existent_namespace.unwrap_err(), + super::KeyParseError::InvalidNamespace + ); + } + + #[test] + fn test_key_parsing_invalid_format() { + // Invalid format + let invalid_format = "/not_even_a_key".parse::(); + assert!(invalid_format.is_err()); + assert_eq!( + invalid_format.unwrap_err(), + super::KeyParseError::InvalidFormat + ); + + let invalid_format = "abcd/sys/sys_table/sys_row".parse::(); + assert!(invalid_format.is_err()); + assert_eq!( + invalid_format.unwrap_err(), + super::KeyParseError::InvalidFormat + ); + } + + #[test] + fn test_key_parsing_escaping() { + // Escape \ + let key = r"/usr/usr\/table/usr\/row".parse::().unwrap(); + assert_eq!(key.namespace, Namespace::User); + assert_eq!(key.table, r"usr/table"); + assert_eq!(key.row_key, r"usr/row"); + + // Escaping / + let key = r"/usr/usr\\table/usr\\row".parse::().unwrap(); + assert_eq!(key.namespace, Namespace::User); + assert_eq!(key.table, r"usr\table"); + assert_eq!(key.row_key, r"usr\row"); + + // Escaping any character + let key = r"/usr/usr\table/usr\row".parse::(); + assert!(key.is_err()); + assert_eq!(key.unwrap_err(), super::KeyParseError::InvalidFormat); + + // Dangling escape + let key = r"/usr/usr_table/usr_row\".parse::(); + assert!(key.is_err()); + assert_eq!(key.unwrap_err(), super::KeyParseError::InvalidFormat); + } +} diff --git a/datastores/gossip_kv/kv/membership.rs b/datastores/gossip_kv/kv/membership.rs new file mode 100644 index 00000000000..3657c87d3cd --- /dev/null +++ b/datastores/gossip_kv/kv/membership.rs @@ -0,0 +1,85 @@ +use std::fmt::Debug; +use std::hash::Hash; + +use serde::{Deserialize, Serialize}; + +pub type MemberId = String; + +/// Information about a member in the cluster. +/// +/// A member is a transducer that is part of the cluster. Leaving or failing is a terminal +/// state for a member. When a transducer restarts and rejoins the cluster, it is considered a +/// new member. +/// +/// # Generic Parameters +/// -- `A`: The transport of the endpoint on which the protocol is running. In production, this will +/// likely be a `SocketAddr`. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub struct MemberData +where + A: Debug + Clone + Eq + Hash + Serialize, +{ + /// The name of the member. Usually, this is a randomly generated identifier, based on the + /// hostname on which the member is running. + pub id: MemberId, + + /// The protocols that the member supports. + pub protocols: Vec>, +} + +/// A builder for `MemberData`. +pub struct MemberDataBuilder +where + A: Debug + Clone + Eq + Hash + Serialize, +{ + id: MemberId, + protocols: Vec>, +} + +impl MemberDataBuilder +where + A: Debug + Clone + Eq + Hash + Serialize, +{ + /// Creates a new `MemberDataBuilder`. + pub fn new(id: MemberId) -> Self { + MemberDataBuilder { + id, + protocols: Vec::new(), + } + } + + /// Adds a protocol to the member. + pub fn add_protocol(mut self, protocol: Protocol) -> Self { + self.protocols.push(protocol); + self + } + + /// Builds the `MemberData`. + pub fn build(self) -> MemberData { + MemberData { + id: self.id, + protocols: self.protocols, + } + } +} + +/// A protocol supported by a member. +/// +/// # Generic Parameters +/// -- `A`: The transport of the endpoint on which the protocol is running. In production, this will +/// likely be a `SocketAddr`. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub struct Protocol { + /// The name of the protocol. + pub name: String, + + /// The endpoint on which the protocol is running. + pub endpoint: A, +} + +impl Protocol { + /// Creates a new `Protocol`. + pub fn new(name: String, endpoint: A) -> Self { + Protocol { name, endpoint } + } +} diff --git a/datastores/gossip_kv/kv/model.rs b/datastores/gossip_kv/kv/model.rs new file mode 100644 index 00000000000..3d17d78d3b6 --- /dev/null +++ b/datastores/gossip_kv/kv/model.rs @@ -0,0 +1,166 @@ +use hydroflow::lattices::map_union::MapUnionHashMap; +use hydroflow::lattices::set_union::SetUnionHashSet; +use hydroflow::lattices::{DomPair, Max}; + +use crate::Namespace; + +/// Primary key for entries in a table. +pub type RowKey = String; + +/// Value stored in a table. Modelled as a timestamped set of strings. +/// +/// Each value is timestamped with the time at which it was last updated. Concurrent updates at +/// the same timestamp are stored as a set. +pub type RowValue = DomPair>; + +/// A map from row keys to values in a table. +pub type Table = MapUnionHashMap; + +/// Name of a table in the data store. +pub type TableName = String; + +/// A map from table names to tables. +pub type TableMap = MapUnionHashMap>; + +pub type NamespaceMap = MapUnionHashMap>; + +pub type Namespaces = NamespaceMap>; + +/// Timestamps used in the model. +// TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydroflow/issues/1207. +pub type Clock = Max; + +/// TableMap element to upsert a row in an existing TableMap. +/// +/// Merge this into an existing TableMap to upsert a row in a table. If the table does not exist, +/// it gets created. There's no explicit "create table" operation. +/// +/// Parameters: +/// - `row_ts`: New timestamp of the row being upserted. +/// - `table_name`: Name of the table. +/// - `key`: Primary key of the row. +/// - `val`: Row value. +pub fn upsert_row( + row_ts: C, + ns: Namespace, + table_name: TableName, + key: RowKey, + val: String, +) -> Namespaces { + let value: RowValue = RowValue::new_from(row_ts, SetUnionHashSet::new_from([val])); + let row: Table> = Table::new_from([(key, value)]); + let table: TableMap> = TableMap::new_from([(table_name, row)]); + Namespaces::new_from([(ns, table)]) +} + +/// TableMap element to delete a row from an existing TableMap. +/// +/// Merge this into an existing TableMap to delete a row from a table. +/// +/// Parameters: +/// - `row_ts`: New timestamp of the row being deleted. +/// - `table_name`: Name of the table. +/// - `key`: Primary key of the row. +pub fn delete_row( + row_ts: C, + ns: Namespace, + table_name: TableName, + key: RowKey, +) -> Namespaces { + let value: RowValue = RowValue::new_from(row_ts, SetUnionHashSet::new_from([])); + let row: Table> = Table::new_from([(key, value)]); + let table = TableMap::new_from([(table_name, row)]); + Namespaces::new_from([(ns, table)]) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use hydroflow::lattices::Merge; + + use crate::model::{delete_row, upsert_row, Clock, Namespaces, RowKey, TableName}; + use crate::Namespace::System; + + #[test] + fn test_table_map() { + let mut namespaces: Namespaces = Namespaces::default(); + + let first_tick: Clock = Clock::new(0); + let second_tick: Clock = Clock::new(1); + + let members_table = TableName::from("members"); + let key_1 = RowKey::from("key1"); + let value_1: String = "value1".to_string(); + + // Table starts out empty. + assert_eq!( + namespaces.as_reveal_ref().len(), + 0, + "Expected no namespaces." + ); + + let insert = upsert_row( + first_tick, + System, + members_table.clone(), + key_1.clone(), + value_1.clone(), + ); + Merge::merge(&mut namespaces, insert); + { + let table = namespaces + .as_reveal_ref() + .get(&System) + .unwrap() + .as_reveal_ref() + .get(&members_table) + .unwrap(); + + let row = table.as_reveal_ref().get(&key_1); + assert!(row.is_some(), "Row should exist"); + assert_eq!( + *row.unwrap().as_reveal_ref().0, + first_tick, + "Unexpected row timestamp" + ); + + let value = row.unwrap().as_reveal_ref().1.as_reveal_ref(); + assert_eq!( + value, + &HashSet::from([value_1.to_string()]), + "Unexpected row value" + ); + } + + let delete_row = delete_row( + second_tick, + System, + members_table.clone(), + key_1.to_string(), + ); + Merge::merge(&mut namespaces, delete_row); + { + let table = namespaces + .as_reveal_ref() + .get(&System) + .unwrap() + .as_reveal_ref() + .get(&members_table) + .unwrap(); + + // Deletion in this case leaves a "tombstone" + let row = table.as_reveal_ref().get(&key_1); + + assert!(row.is_some(), "Row should exist"); + assert_eq!( + *row.unwrap().as_reveal_ref().0, + second_tick, + "Unexpected row timestamp" + ); + + let value = row.unwrap().as_reveal_ref().1.as_reveal_ref(); + assert_eq!(value, &HashSet::from([]), "Row should be empty"); + } + } +} diff --git a/datastores/gossip_kv/kv/server.rs b/datastores/gossip_kv/kv/server.rs new file mode 100644 index 00000000000..a36d89013c4 --- /dev/null +++ b/datastores/gossip_kv/kv/server.rs @@ -0,0 +1,763 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::hash::Hash; + +use hydroflow::futures::{Sink, Stream}; +use hydroflow::hydroflow_syntax; +use hydroflow::itertools::Itertools; +use hydroflow::lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap}; +use hydroflow::lattices::set_union::SetUnionHashSet; +use hydroflow::lattices::{Lattice, PairBimorphism}; +use hydroflow::scheduled::graph::Hydroflow; +use lattices::set_union::SetUnion; +use lattices::{IsTop, Max, Pair}; +use lazy_static::lazy_static; +use prometheus::{register_int_counter, IntCounter}; +use rand::seq::IteratorRandom; +use rand::thread_rng; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use tracing::{info, trace}; + +use crate::lattices::BoundedSetLattice; +use crate::membership::{MemberData, MemberId}; +use crate::model::{ + delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, +}; +use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress}; +use crate::GossipMessage::{Ack, Nack}; +use crate::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace}; + +/// A trait that represents an abstract network address. In production, this will typically be +/// SocketAddr. +pub trait Address: Hash + Debug + Clone + Eq + Serialize {} +impl Address for A where A: Hash + Debug + Clone + Eq + Serialize {} + +#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)] +pub struct SeedNode +where + A: Address, +{ + pub id: MemberId, + pub address: A, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Lattice)] +pub struct InfectingWrite { + write: Namespaces, + members: BoundedSetLattice, +} + +pub type MessageId = String; + +lazy_static! { + pub static ref SETS_COUNTER: IntCounter = + register_int_counter!("sets", "Counts the number of SET requests processed.").unwrap(); +} + +/// Creates a L0 key-value store server using Hydroflow. +/// +/// # Arguments +/// -- `client_inputs`: The input stream of client requests for the client protocol. +/// -- `client_outputs`: The output sink of client responses for the client protocol. +/// -- `member_info`: The membership information of the server. +/// -- `seed_nodes`: A list of seed nodes that can be used to bootstrap the gossip cluster. +#[expect(clippy::too_many_arguments)] +pub fn server< + ClientInput, + ClientOutput, + ClientOutputError, + GossipInput, + GossipOutput, + GossipOutputError, + GossipTrigger, + SeedNodeStream, + Addr, +>( + client_inputs: ClientInput, + client_outputs: ClientOutput, + gossip_inputs: GossipInput, + gossip_outputs: GossipOutput, + gossip_trigger: GossipTrigger, + member_info: MemberData, + seed_nodes: Vec>, + seed_node_stream: SeedNodeStream, +) -> Hydroflow<'static> +where + ClientInput: Stream + Unpin + 'static, + ClientOutput: Sink<(ClientResponse, Addr), Error = ClientOutputError> + Unpin + 'static, + GossipInput: Stream + Unpin + 'static, + GossipOutput: Sink<(GossipMessage, Addr), Error = GossipOutputError> + Unpin + 'static, + GossipTrigger: Stream + Unpin + 'static, + SeedNodeStream: Stream>> + Unpin + 'static, + Addr: Address + DeserializeOwned + 'static, + ClientOutputError: Debug + 'static, + GossipOutputError: Debug + 'static, +{ + let my_member_id = member_info.id.clone(); + // TODO: This is ugly, but the only way this works at the moment. + let member_id_2 = my_member_id.clone(); + let member_id_3 = my_member_id.clone(); + let member_id_4 = my_member_id.clone(); + let member_id_5 = my_member_id.clone(); + let member_id_6 = my_member_id.clone(); + + hydroflow_syntax! { + + on_start = initialize() -> tee(); + on_start -> for_each(|_| info!("{:?}: Transducer {} started.", context.current_tick(), member_id_6)); + + seed_nodes = source_stream(seed_node_stream) + -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec>| { + **last_seed_nodes = new_seed_nodes; + info!("Updated seed nodes: {:?}", **last_seed_nodes); + }); + + // Setup member metadata for this process. + on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap())) + -> writes; + + client_out = + inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr)) + -> dest_sink(client_outputs); + + client_in = source_stream(client_inputs) + -> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr)) + -> demux_enum::>(); + + client_in[Get] + -> inspect(|req| trace!("{:?}: Received Get request: {:?}.", context.current_tick(), req)) + -> map(|(key, addr) : (Key, Addr)| { + let row = MapUnionHashMap::new_from([ + ( + key.row_key, + SetUnionHashSet::new_from([addr /* to respond with the result later*/]) + ), + ]); + let table = MapUnionHashMap::new_from([(key.table, row)]); + MapUnionHashMap::new_from([(key.namespace, table)]) + }) + -> reads; + + client_in[Set] + -> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request)) + -> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value)) + -> inspect(|_| { + SETS_COUNTER.inc(); // Bump SET metrics + }) + -> writes; + + client_in[Delete] + -> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req)) + -> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key)) + -> writes; + + gossip_in = source_stream(gossip_inputs) + -> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr)) + -> demux_enum::>(); + + incoming_gossip_messages = gossip_in[Gossip] + -> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request)) + -> tee(); + + gossip_in[Ack] + -> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request)) + -> null(); + + gossip_in[Nack] + -> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request)) + -> map( |(message_id, member_id, _addr)| { + MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) })) + }) + -> infecting_writes; + + gossip_out = union() -> dest_sink(gossip_outputs); + + incoming_gossip_messages + -> map(|(_msg_id, _member_id, writes, _addr)| writes ) + -> writes; + + gossip_processing_pipeline = incoming_gossip_messages + -> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces>, Addr)| { + let namespaces = &#namespaces; + let all_data: &HashMap>> = namespaces.as_reveal_ref(); + let possible_new_data: &HashMap>>>= writes.as_reveal_ref(); + + // Check if any of the data is new + /* TODO: This logic is duplicated in MapUnion::Merge and ideally should be accessed + from the pass-through streaming output from `state`. See + https://www.notion.so/hydro-project/Proposal-for-State-API-10a2a586262f8080b981d1a2948a69ac + for more. */ + let gossip_has_new_data = possible_new_data.iter() + .flat_map(|(namespace, tables)| { + tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{ + rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref())) + }) + }) + .any(|(ns,table, row_key, new_ts)| { + let existing_tables = all_data.get(ns); + let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table)); + let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key)); + let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref()); + + if let Some(existing_ts) = existing_ts { + trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts); + new_ts > existing_ts + } else { + true + } + }); + + if gossip_has_new_data { + (Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes)) + } else { + (Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None) + } + }) + -> tee(); + + gossip_processing_pipeline + -> map(|(response, address, _writes)| (response, address)) + -> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr)) + -> gossip_out; + + gossip_processing_pipeline + -> filter(|(_, _, writes)| writes.is_some()) + -> map(|(_, _, writes)| writes.unwrap()) + -> writes; + + writes = union(); + + writes -> namespaces; + + namespaces = state::<'static, Namespaces::>(); + new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs + + reads = state::<'tick, MapUnionHashMap>>>>(); + + new_writes -> [0]process_system_table_reads; + reads -> [1]process_system_table_reads; + + process_system_table_reads = lattice_bimorphism(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(PairBimorphism))), #namespaces, #reads) + -> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick. + -> flat_map(|result: NamespaceMap, SetUnion>>>| { + + let mut response: Vec<(ClientResponse, Addr)> = vec![]; + + let result = result.as_reveal_ref(); + + for (namespace, tables) in result.iter() { + for (table_name, table) in tables.as_reveal_ref().iter() { + for (row_key, join_results) in table.as_reveal_ref().iter() { + let key = Key { + namespace: *namespace, + table: table_name.clone(), + row_key: row_key.clone(), + }; + + let timestamped_values = join_results.as_reveal_ref().0; + let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref(); + + let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref(); + let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap(); + + response.push(( + ClientResponse::Get {key, value: all_values.clone()}, + socket_addr.clone(), + )); + } + } + } + response + }) -> client_out; + + new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x)); + + // Step 1: Put the new writes in a map, with the write as the key and a SetBoundedLattice as the value. + infecting_writes = union() -> state::<'static, MapUnionHashMap>(); + + new_writes -> map(|write| { + // Ideally, the write itself is the key, but writes are a hashmap and hashmaps don't + // have a hash implementation. So we just generate a GUID identifier for the write + // for now. + let id = uuid::Uuid::new_v4().to_string(); + MapUnionSingletonMap::new_from((id, InfectingWrite { write, members: BoundedSetLattice::new() })) + }) -> infecting_writes; + + gossip_trigger = source_stream(gossip_trigger); + + gossip_messages = gossip_trigger + -> flat_map( |_| + { + let infecting_writes = #infecting_writes.as_reveal_ref().clone(); + trace!("{:?}: Currently gossipping {} infecting writes.", context.current_tick(), infecting_writes.iter().filter(|(_, write)| !write.members.is_top()).count()); + infecting_writes + } + ) + -> filter(|(_id, infecting_write)| !infecting_write.members.is_top()) + -> map(|(id, infecting_write)| { + trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write); + let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone(); + + let mut peer_names = HashSet::new(); + peers.iter().for_each(|(row_key, _)| { + peer_names.insert(row_key.clone()); + }); + + let seed_nodes = &#seed_nodes; + seed_nodes.iter().for_each(|seed_node| { + peer_names.insert(seed_node.id.clone()); + }); + + // Exclude self from the list of peers. + peer_names.remove(&member_id_5); + + trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names); + + let chosen_peer_name = peer_names.iter().choose(&mut thread_rng()); + + if chosen_peer_name.is_none() { + trace!("{:?}: No peers to gossip to.", context.current_tick()); + return None; + } + + let chosen_peer_name = chosen_peer_name.unwrap(); + let gossip_address = if peers.contains_key(chosen_peer_name) { + let peer_info_value = peers.get(chosen_peer_name).unwrap().as_reveal_ref().1.as_reveal_ref().iter().next().unwrap().clone(); + let peer_info_deserialized = serde_json::from_str::>(&peer_info_value).unwrap(); + peer_info_deserialized.protocols.iter().find(|protocol| protocol.name == "gossip").unwrap().clone().endpoint + } else { + seed_nodes.iter().find(|seed_node| seed_node.id == *chosen_peer_name).unwrap().address.clone() + }; + + trace!("Chosen peer: {:?}:{:?}", chosen_peer_name, gossip_address); + Some((id, infecting_write, gossip_address)) + }) + -> flatten() + -> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address)) + -> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| { + let gossip_request = GossipMessage::Gossip { + message_id: message_id.clone(), + member_id: member_id_4.clone(), + writes: infecting_write.write.clone(), + }; + (gossip_request, peer_gossip_address) + }) + -> gossip_out; + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use hydroflow::tokio_stream::empty; + use hydroflow::util::simulation::{Address, Fleet, Hostname}; + + use super::*; + use crate::membership::{MemberDataBuilder, Protocol}; + + #[hydroflow::test] + async fn test_member_init() { + let mut fleet = Fleet::new(); + + let server_name: Hostname = "server".to_string(); + + let server_client_address = Address::new(server_name.clone(), "client".to_string()); + let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string()); + + let (_, gossip_trigger_rx) = hydroflow::util::unbounded_channel::<()>(); + + // Create the kv server + fleet.add_host(server_name.clone(), |ctx| { + let client_input = ctx.new_inbox::("client".to_string()); + let client_output = ctx.new_outbox::("client".to_string()); + + let gossip_input = ctx.new_inbox::("gossip".to_string()); + let gossip_output = ctx.new_outbox::("gossip".to_string()); + + let member_data = MemberDataBuilder::new(server_name.clone()) + .add_protocol(Protocol::new( + "client".into(), + server_client_address.clone(), + )) + .add_protocol(Protocol::new( + "gossip".into(), + server_gossip_address.clone(), + )) + .build(); + + server( + client_input, + client_output, + gossip_input, + gossip_output, + gossip_trigger_rx, + member_data, + vec![], + empty(), + ) + }); + + let client_name: Hostname = "client".to_string(); + + let key = "/sys/members/server".parse::().unwrap(); + + let (trigger_tx, trigger_rx) = hydroflow::util::unbounded_channel::<()>(); + let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::(); + + let key_clone = key.clone(); + let server_client_address_clone = server_client_address.clone(); + + fleet.add_host(client_name.clone(), |ctx| { + let client_tx = ctx.new_outbox::("client".to_string()); + let client_rx = ctx.new_inbox::("client".to_string()); + + hydroflow_syntax! { + + client_output = dest_sink(client_tx); + + source_stream(trigger_rx) + -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) ) + -> client_output; + + client_input = source_stream(client_rx) + -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap()); + + } + }); + + // Send a trigger to the client to send a get request. + trigger_tx.send(()).unwrap(); + + let expected_member_data = MemberDataBuilder::new(server_name.clone()) + .add_protocol(Protocol::new( + "client".to_string(), + server_client_address.clone(), + )) + .add_protocol(Protocol::new( + "gossip".to_string(), + server_gossip_address.clone(), + )) + .build(); + + loop { + fleet.run_single_tick_all_hosts().await; + + let responses = + hydroflow::util::collect_ready_async::, _>(&mut response_rx).await; + + if !responses.is_empty() { + assert_eq!( + responses, + &[(ClientResponse::Get { + key: key.clone(), + value: HashSet::from([ + serde_json::to_string(&expected_member_data).unwrap() + ]) + })] + ); + break; + } + } + } + + #[hydroflow::test] + async fn test_multiple_values_same_tick() { + let mut fleet = Fleet::new(); + + let server_name: Hostname = "server".to_string(); + + let server_client_address = Address::new(server_name.clone(), "client".to_string()); + + let (_, gossip_trigger_rx) = hydroflow::util::unbounded_channel::<()>(); + + // Create the kv server + fleet.add_host(server_name.clone(), |ctx| { + let client_input = ctx.new_inbox::("client".to_string()); + let client_output = ctx.new_outbox::("client".to_string()); + + let gossip_input = ctx.new_inbox::("gossip".to_string()); + let gossip_output = ctx.new_outbox::("gossip".to_string()); + let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string()); + + let member_data = MemberDataBuilder::new(server_name.clone()) + .add_protocol(Protocol::new( + "client".into(), + server_client_address.clone(), + )) + .add_protocol(Protocol::new( + "gossip".into(), + server_gossip_address.clone(), + )) + .build(); + + server( + client_input, + client_output, + gossip_input, + gossip_output, + gossip_trigger_rx, + member_data, + vec![], + empty(), + ) + }); + + let key = Key { + namespace: Namespace::System, + table: "table".to_string(), + row_key: "row".to_string(), + }; + let val_a = "A".to_string(); + let val_b = "B".to_string(); + + let writer_name: Hostname = "writer".to_string(); + + let (writer_trigger_tx, writer_trigger_rx) = hydroflow::util::unbounded_channel::(); + let key_clone = key.clone(); + let server_client_address_clone = server_client_address.clone(); + + fleet.add_host(writer_name.clone(), |ctx| { + let client_tx = ctx.new_outbox::("client".to_string()); + hydroflow_syntax! { + client_output = dest_sink(client_tx); + + source_stream(writer_trigger_rx) + -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_client_address_clone.clone()) ) + -> client_output; + } + }); + + // Send two messages from the writer. + let writer = fleet.get_host_mut(&writer_name).unwrap(); + writer_trigger_tx.send(val_a.clone()).unwrap(); + writer.run_tick(); + + writer_trigger_tx.send(val_b.clone()).unwrap(); + writer.run_tick(); + + // Transmit messages across the network. + fleet.process_network().await; + + // Run the server. + let server = fleet.get_host_mut(&server_name).unwrap(); + server.run_tick(); + + // Read the value back. + let reader_name: Hostname = "reader".to_string(); + + let (reader_trigger_tx, reader_trigger_rx) = hydroflow::util::unbounded_channel::<()>(); + let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::(); + + let key_clone = key.clone(); + let server_client_address_clone = server_client_address.clone(); + + fleet.add_host(reader_name.clone(), |ctx| { + let client_tx = ctx.new_outbox::("client".to_string()); + let client_rx = ctx.new_inbox::("client".to_string()); + + hydroflow_syntax! { + client_output = dest_sink(client_tx); + + source_stream(reader_trigger_rx) + -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) ) + -> client_output; + + client_input = source_stream(client_rx) + -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap()); + + } + }); + + reader_trigger_tx.send(()).unwrap(); + + loop { + fleet.run_single_tick_all_hosts().await; + + let responses = + hydroflow::util::collect_ready_async::, _>(&mut response_rx).await; + + if !responses.is_empty() { + assert_eq!( + responses, + &[ClientResponse::Get { + key, + value: HashSet::from([val_a, val_b]) + }] + ); + break; + } + } + } + + #[hydroflow::test] + async fn test_gossip() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .finish(); + + let _ = tracing::subscriber::set_global_default(subscriber); + + let mut fleet = Fleet::new(); + + let server_a: Hostname = "server_a".to_string(); + let server_b: Hostname = "server_b".to_string(); + + let server_a_client_address = Address::new(server_a.clone(), "client".to_string()); + let server_b_client_address = Address::new(server_b.clone(), "client".to_string()); + + let server_a_gossip_address = Address::new(server_a.clone(), "gossip".to_string()); + let server_b_gossip_address = Address::new(server_b.clone(), "gossip".to_string()); + + let seed_nodes = vec![ + SeedNode { + id: server_a.clone(), + address: server_a_gossip_address.clone(), + }, + SeedNode { + id: server_b.clone(), + address: server_b_gossip_address.clone(), + }, + ]; + + let (gossip_trigger_tx_a, gossip_trigger_rx_a) = hydroflow::util::unbounded_channel::<()>(); + + let seed_nodes_clone = seed_nodes.clone(); + fleet.add_host(server_a.clone(), |ctx| { + let client_input = ctx.new_inbox::("client".to_string()); + let client_output = ctx.new_outbox::("client".to_string()); + + let gossip_input = ctx.new_inbox::("gossip".to_string()); + let gossip_output = ctx.new_outbox::("gossip".to_string()); + + let member_data = MemberDataBuilder::new(server_a.clone()) + .add_protocol(Protocol::new( + "client".into(), + server_a_client_address.clone(), + )) + .add_protocol(Protocol::new( + "gossip".into(), + server_a_gossip_address.clone(), + )) + .build(); + + server( + client_input, + client_output, + gossip_input, + gossip_output, + gossip_trigger_rx_a, + member_data, + seed_nodes_clone, + empty(), + ) + }); + + let (_, gossip_trigger_rx_b) = hydroflow::util::unbounded_channel::<()>(); + + let seed_nodes_clone = seed_nodes.clone(); + fleet.add_host(server_b.clone(), |ctx| { + let client_input = ctx.new_inbox::("client".to_string()); + let client_output = ctx.new_outbox::("client".to_string()); + + let gossip_input = ctx.new_inbox::("gossip".to_string()); + let gossip_output = ctx.new_outbox::("gossip".to_string()); + + let member_data = MemberDataBuilder::new(server_b.clone()) + .add_protocol(Protocol::new( + "client".into(), + server_b_client_address.clone(), + )) + .add_protocol(Protocol::new( + "gossip".into(), + server_b_gossip_address.clone(), + )) + .build(); + + server( + client_input, + client_output, + gossip_input, + gossip_output, + gossip_trigger_rx_b, + member_data, + seed_nodes_clone, + empty(), + ) + }); + + let key = Key { + namespace: Namespace::User, + table: "table".to_string(), + row_key: "row".to_string(), + }; + + let writer_name: Hostname = "writer".to_string(); + + let (writer_trigger_tx, writer_trigger_rx) = hydroflow::util::unbounded_channel::(); + + let key_clone = key.clone(); + let server_a_client_address_clone = server_a_client_address.clone(); + + fleet.add_host(writer_name.clone(), |ctx| { + let client_tx = ctx.new_outbox::("client".to_string()); + hydroflow_syntax! { + client_output = dest_sink(client_tx); + + source_stream(writer_trigger_rx) + -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_a_client_address_clone.clone()) ) + -> client_output; + } + }); + + let reader_name: Hostname = "reader".to_string(); + + let (reader_trigger_tx, reader_trigger_rx) = hydroflow::util::unbounded_channel::<()>(); + let (response_tx, mut response_rx) = hydroflow::util::unbounded_channel::(); + + let key_clone = key.clone(); + let server_b_client_address_clone = server_b_client_address.clone(); + + fleet.add_host(reader_name.clone(), |ctx| { + let client_tx = ctx.new_outbox::("client".to_string()); + let client_rx = ctx.new_inbox::("client".to_string()); + + hydroflow_syntax! { + client_output = dest_sink(client_tx); + + source_stream(reader_trigger_rx) + -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_b_client_address_clone.clone()) ) + -> client_output; + + client_input = source_stream(client_rx) + -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap()); + + } + }); + + let value = "VALUE".to_string(); + writer_trigger_tx.send(value.clone()).unwrap(); + + loop { + reader_trigger_tx.send(()).unwrap(); + fleet.run_single_tick_all_hosts().await; + let responses = + hydroflow::util::collect_ready_async::, _>(&mut response_rx).await; + + if !responses.is_empty() { + assert_eq!( + responses, + &[ClientResponse::Get { + key, + value: HashSet::from([value.clone()]) + }] + ); + break; + } + + gossip_trigger_tx_a.send(()).unwrap(); + } + } +} diff --git a/datastores/gossip_kv/kv/util.rs b/datastores/gossip_kv/kv/util.rs new file mode 100644 index 00000000000..4cc391da6e7 --- /dev/null +++ b/datastores/gossip_kv/kv/util.rs @@ -0,0 +1,87 @@ +use hydroflow::DemuxEnum; + +use crate::model::{Clock, Namespaces}; +use crate::{ClientRequest, GossipMessage, Key}; + +/// Convenience enum to represent a client request with the address of the client. Makes it +/// possible to use `demux_enum` in the surface syntax. +#[derive(Debug, DemuxEnum)] +pub enum ClientRequestWithAddress { + /// A get request with the key and the address of the client. + Get { key: Key, addr: A }, + /// A set request with the key, value and the address of the client. + Set { key: Key, value: String, addr: A }, + /// A delete request with the key and the address of the client. + Delete { key: Key, addr: A }, +} + +impl ClientRequestWithAddress { + /// Create a `ClientRequestWithAddress` from a `ClientRequest` and an address. + pub fn from_request_and_address(request: ClientRequest, addr: A) -> Self { + match request { + ClientRequest::Get { key } => Self::Get { key, addr }, + ClientRequest::Set { key, value } => Self::Set { key, value, addr }, + ClientRequest::Delete { key } => Self::Delete { key, addr }, + } + } +} + +/// Convenience enum to represent a gossip request with the address of the client. Makes it +/// possible to use `demux_enum` in the surface syntax. +#[derive(Debug, DemuxEnum)] +pub enum GossipRequestWithAddress { + /// A gossip request with the message id, writes and the address of the client. + Gossip { + message_id: String, + member_id: String, + writes: Namespaces, + addr: A, + }, + /// An ack request with the message id and the address of the client. + Ack { + message_id: String, + member_id: String, + addr: A, + }, + /// A nack request with the message id and the address of the client. + Nack { + message_id: String, + member_id: String, + addr: A, + }, +} + +impl GossipRequestWithAddress { + /// Create a `GossipRequestWithAddress` from a `GossipMessage` and an address. + pub fn from_request_and_address(request: GossipMessage, addr: A) -> Self { + match request { + GossipMessage::Gossip { + message_id, + member_id, + writes, + } => Self::Gossip { + message_id, + member_id, + writes, + addr, + }, + + GossipMessage::Ack { + message_id, + member_id, + } => Self::Ack { + message_id, + addr, + member_id, + }, + GossipMessage::Nack { + message_id, + member_id, + } => Self::Nack { + message_id, + addr, + member_id, + }, + } + } +} diff --git a/datastores/gossip_kv/load_test_server/Dockerfile b/datastores/gossip_kv/load_test_server/Dockerfile new file mode 100644 index 00000000000..91f27fdd22d --- /dev/null +++ b/datastores/gossip_kv/load_test_server/Dockerfile @@ -0,0 +1,11 @@ +FROM "hydroflow-gossip-kv-base-image:latest" AS builder +WORKDIR /usr/src/gossip-kv-server +COPY . . +RUN find . +RUN cargo build --release --workspace -p gossip_kv + +FROM rustlang/rust:nightly-slim +COPY --from=builder /usr/src/gossip-kv-server/target/release/load_test_server /usr/local/bin/load_test_server + +# Don't skip the trailing slash in the destination directory +CMD ["load_test_server"] diff --git a/datastores/gossip_kv/load_test_server/server.rs b/datastores/gossip_kv/load_test_server/server.rs new file mode 100644 index 00000000000..88bd40e3fb3 --- /dev/null +++ b/datastores/gossip_kv/load_test_server/server.rs @@ -0,0 +1,226 @@ +use std::convert::Infallible; +use std::num::{NonZeroU32, ParseFloatError}; +use std::thread::sleep; +use std::time::Duration; + +use clap::Parser; +use gossip_kv::membership::{MemberDataBuilder, Protocol}; +use gossip_kv::{ClientRequest, GossipMessage}; +use governor::{Quota, RateLimiter}; +use hydroflow::util::{unbounded_channel, unsync_channel}; +use prometheus::{gather, Encoder, TextEncoder}; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task; +use tracing::{error, info, trace}; +use warp::Filter; + +type LoadTestAddress = u64; + +use gossip_kv::server::{server, SeedNode}; +use hydroflow::futures::sink::drain; +use hydroflow::futures::stream; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::tokio_stream::StreamExt; +use lattices::cc_traits::Iter; + +const UNKNOWN_ADDRESS: LoadTestAddress = 9999999999; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)] +struct Opts { + /// Number of threads to run. Each thread will run an instance of the gossip-kv server transducer. + #[clap(short, long, default_value = "5")] + thread_count: usize, + + /// Frequency (in seconds) at which to send gossip messages. + #[clap(short, long, default_value = "10", value_parser = clap_duration_from_secs)] + gossip_frequency: Duration, + + /// Maximum number of SET requests to send per second. + #[clap(short, long, default_value = "1")] + max_set_throughput: u32, +} + +/// Parse duration from float string for clap args. +fn clap_duration_from_secs(arg: &str) -> Result { + arg.parse().map(Duration::from_secs_f32) +} + +fn run_server( + server_name: String, + gossip_address: LoadTestAddress, + gossip_input_rx: UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>, + switchboard: Switchboard, + seed_nodes: Vec>, + opts: Opts, +) { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None); + + let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel(); + + let member_data = MemberDataBuilder::new(server_name.clone()) + .add_protocol(Protocol::new("gossip".into(), gossip_address)) + .build(); + + rt.block_on(async { + let local = task::LocalSet::new(); + + let (client_input_tx, client_input_rx) = unbounded_channel(); + + let put_throughput = opts.max_set_throughput; + local.spawn_local(async move { + let rate_limiter = RateLimiter::direct(Quota::per_second( + NonZeroU32::new(put_throughput).unwrap(), + )); + loop { + rate_limiter.until_ready().await; + let key = "/usr/table/key".parse().unwrap(); + let request = ClientRequest::Set { + key, + value: "FOOBAR".to_string(), + }; + client_input_tx.send((request, UNKNOWN_ADDRESS)).unwrap(); + } + }); + + let gossip_frequency = opts.gossip_frequency; + local.spawn_local(async move { + loop { + tokio::time::sleep(gossip_frequency).await; + gossip_trigger_tx.send(()).unwrap(); + } + }); + + // Networking + local.spawn_local(async move { + while let Some((msg, addr)) = gossip_output_rx.next().await { + trace!("Sending gossip message: {:?} to {}", msg, addr); + let outbox = switchboard.gossip_outboxes.get(addr as usize).unwrap(); + if let Err(e) = outbox.send((msg, gossip_address)) { + error!("Failed to send gossip message: {:?}", e); + } + } + }); + + local.spawn_local(async { + let mut server = server( + client_input_rx, + drain(), // Ignoring client responses for now. + gossip_input_rx, + gossip_output_tx, + gossip_trigger_rx, + member_data, + seed_nodes, + stream::empty(), + ); + + server.run_async().await + }); + + local.await + }); + }); +} + +struct Switchboard { + gossip_outboxes: Vec>, +} + +impl Clone for Switchboard { + fn clone(&self) -> Self { + Self { + gossip_outboxes: self.gossip_outboxes.clone(), + } + } +} + +impl Switchboard { + fn new() -> Self { + Self { + gossip_outboxes: Vec::new(), + } + } + fn new_outbox( + &mut self, + ) -> ( + LoadTestAddress, + UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>, + ) { + let addr: LoadTestAddress = self.gossip_outboxes.len() as LoadTestAddress; + let (tx, rx) = unbounded_channel(); + self.gossip_outboxes.push(tx); + (addr, rx) + } +} + +async fn metrics_handler() -> Result { + let encoder = TextEncoder::new(); + let metric_families = gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(warp::reply::with_header( + buffer, + "Content-Type", + encoder.format_type(), + )) +} + +fn main() { + tracing_subscriber::fmt::init(); + + let opts: Opts = Opts::parse(); + + std::thread::spawn(move || { + let metrics_route = warp::path("metrics").and_then(metrics_handler); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + info!("Starting metrics server on port 4003"); + warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await; + }); + }); + + info!("Starting load test with with {} threads", opts.thread_count); + + let mut switchboard = Switchboard::new(); + + let outboxes: Vec<_> = (0..opts.thread_count) + .map(|_| { + let (addr, rx) = switchboard.new_outbox(); + (format!("SERVER-{}", addr), addr, rx) + }) + .collect(); + + let seed_nodes: Vec<_> = outboxes + .iter() + .map(|(name, addr, _)| SeedNode { + id: name.clone(), + address: *addr, + }) + .collect(); + + outboxes.into_iter().for_each(|(name, addr, outbox)| { + run_server( + name, + addr, + outbox, + switchboard.clone(), + seed_nodes.clone(), + opts, + ); + }); + + loop { + sleep(Duration::from_secs(1)); + } +} diff --git a/datastores/gossip_kv/server/.gitignore b/datastores/gossip_kv/server/.gitignore new file mode 100644 index 00000000000..9d1173a0613 --- /dev/null +++ b/datastores/gossip_kv/server/.gitignore @@ -0,0 +1 @@ +config/local.toml \ No newline at end of file diff --git a/datastores/gossip_kv/server/Dockerfile b/datastores/gossip_kv/server/Dockerfile new file mode 100644 index 00000000000..4032b4056fb --- /dev/null +++ b/datastores/gossip_kv/server/Dockerfile @@ -0,0 +1,13 @@ +FROM "hydroflow-gossip-kv-base-image:latest" AS builder +WORKDIR /usr/src/gossip-kv-server +COPY . . +RUN find . +RUN cargo build --release --workspace -p gossip_kv + +FROM rustlang/rust:nightly-slim +COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_server /usr/local/bin/gossip_server + +RUN mkdir -p /config/static +# Don't skip the trailing slash in the destination directory +COPY datastores/gossip_kv/server/config/static/*.toml /config/static/ +CMD ["gossip_server"] diff --git a/datastores/gossip_kv/server/README.md b/datastores/gossip_kv/server/README.md new file mode 100644 index 00000000000..434315e29f4 --- /dev/null +++ b/datastores/gossip_kv/server/README.md @@ -0,0 +1,44 @@ +From the `hydroflow` directory, run + +## Minikube + +### Start Minikube +Disk allocation is done by the driver used to create the VM. Setting this to a high value will do nothing if the +driver isn't correctly configured. You'll only notice that hydroflow runs out of disk space while compiling. +For Docker, the disk size is set in the Docker Desktop settings. Also, provide as many CPUs here as possible, since +building the code is CPU-intensive. +```shell +minikube start --disk-size=100g --cpus=16 --memory=32768 +``` + +### Use the Docker daemon from minikube +```shell +eval $(minikube docker-env) +``` + +## Build Docker Base Image +Speeds up code changes by caching build dependencies. +```shell +docker build -t "hydroflow/gossip-kv-server-base-image:latest" -f datastores/gossip_kv/server/baseimage.Dockerfile . +``` + +## Build Docker Image for Gossip Server +```shell +docker build -t "hydroflow/gossip-kv-server:latest" -f datastores/gossip_kv/server/Dockerfile . +``` + +## Build Docker Image for Gossip CLI +```shell +docker build -t "hydroflow/gossip-kv-cli:latest" -f datastores/gossip_kv/cli/Dockerfile . +``` + +## Check if minikube has the image +You should see "hydroflow/gossip-kv" +```shell +minikube image ls --format tablemin +``` + +## Deploy to Minikube +```shell +kubectl apply -f datastores/gossip_kv/server/deployment/local/objects.yaml +``` \ No newline at end of file diff --git a/datastores/gossip_kv/server/baseimage.Dockerfile b/datastores/gossip_kv/server/baseimage.Dockerfile new file mode 100644 index 00000000000..911de64a7f7 --- /dev/null +++ b/datastores/gossip_kv/server/baseimage.Dockerfile @@ -0,0 +1,23 @@ +FROM rustlang/rust:nightly AS builder +WORKDIR /usr/src/gossip-kv-server-base-image +COPY . . + +RUN apt-get update && apt-get install -y \ + python3 \ + python3.11-dev \ + libpython3.11 \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +## Build everything, including dependencies. The built dependencies will be cached, so only changing the server +## code requires lesser build time. +RUN cargo build --release --workspace -p gossip_kv + +## Copy the built file to where the actual app will be built subsequently. +RUN mkdir -p /usr/src/gossip-kv-server/target/release +RUN mv /usr/src/gossip-kv-server-base-image/target/release/build/ /usr/src/gossip-kv-server/target/release/build/ +RUN mv /usr/src/gossip-kv-server-base-image/target/release/deps/ /usr/src/gossip-kv-server/target/release/deps/ +RUN mv /usr/src/gossip-kv-server-base-image/target/release/.fingerprint/ /usr/src/gossip-kv-server/target/release/.fingerprint/ + +## Delete all the source code +RUN rm -rf /usr/src/gossip-kv-server-base-image \ No newline at end of file diff --git a/datastores/gossip_kv/server/config/mod.rs b/datastores/gossip_kv/server/config/mod.rs new file mode 100644 index 00000000000..13ff54873ad --- /dev/null +++ b/datastores/gossip_kv/server/config/mod.rs @@ -0,0 +1,130 @@ +use std::path::PathBuf; + +use config::{Config, ConfigError, File}; +use hydroflow::futures::future::ready; +use hydroflow::futures::{Stream, StreamExt}; +use notify::{Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::UnboundedSender; +use tracing::trace; + +/// L0 Data Store settings. +#[derive(Debug, Deserialize, Serialize)] +pub struct ServerSettings { + /// An initial set of "seed nodes" that can be used to bootstrap the gossip cluster. These + /// won't be the only nodes in the cluster, but they can be used to discover other nodes. + pub seed_nodes: Vec, +} + +const CONFIG_ROOT: &str = "config"; +const STATIC_CONFIG_PATH: &str = "static"; +const DYNAMIC_CONFIG_PATH: &str = "dynamic"; + +fn static_config_path(subpath: &str) -> PathBuf { + PathBuf::from(CONFIG_ROOT) + .join(STATIC_CONFIG_PATH) + .join(subpath) +} + +fn dynamic_config_path(subpath: &str) -> PathBuf { + PathBuf::from(CONFIG_ROOT) + .join(DYNAMIC_CONFIG_PATH) + .join(subpath) +} + +impl ServerSettings { + /// Load the settings from the configuration files. + pub fn new() -> Result { + let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into()); + + let settings = Config::builder() + /* Load the default settings from the `config/default.toml` file. */ + .add_source(File::from(static_config_path("default.toml")).required(false)) + + /* Load additional overrides based on context (alpha, beta, production, etc.), if they exist. */ + .add_source(File::from(static_config_path(&run_mode)).required(false)) + + /* Load the local settings, if they exist. These are .gitignored to prevent accidental + check-in. */ + .add_source(File::from(static_config_path("local")).required(false)) + + /* Load the dynamic settings, if they exist. These always override any static configuration*/ + .add_source(File::from(dynamic_config_path("dynamic.toml")).required(false)) + .build()?; + + settings.try_deserialize() + } +} + +/// A seed node that can be used to bootstrap the gossip cluster. +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)] +pub struct SeedNodeSettings { + /// The ID of the seed node. + pub id: String, + + /// The address on which the seed node is listening for gossip messages. + pub address: String, +} + +/// Setup a watcher for the settings files and return a stream of settings changes. +/// +/// Returns the watcher, the initial settings, and a stream of settings changes. The watcher is +/// returned so that it can be kept alive for the lifetime of the application. Also returns a +/// snapshot of the current settings. +pub fn setup_settings_watch() -> ( + RecommendedWatcher, + ServerSettings, + impl Stream, +) { + let (tx, rx) = hydroflow::util::unbounded_channel(); + + // Setup the watcher + let mut watcher = notify::RecommendedWatcher::new( + UnboundedSenderEventHandler::new(tx), + notify::Config::default(), + ) + .unwrap(); + watcher + .watch(&PathBuf::from(CONFIG_ROOT), RecursiveMode::Recursive) + .unwrap(); + + // Read initial settings + let initial_settings = ServerSettings::new().unwrap(); + + let change_stream = rx + .map(Result::unwrap) + .map(|event| { + trace!("Event: {:?}", event); + match event.kind { + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => { + Some(ServerSettings::new().unwrap()) + } + _ => { + trace!("Unhandled event: {:?}", event); + None + } + } + }) + .filter_map(ready); + + // If the watcher is dropped, the stream will stop producing events. So, returning the watcher. + (watcher, initial_settings, change_stream) +} + +/// Wraps an UnboundedSender to implement the notify::EventHandler trait. This allows sending +/// file notification evnts to UnboundedSender instances. +struct UnboundedSenderEventHandler { + tx: UnboundedSender>, +} + +impl UnboundedSenderEventHandler { + fn new(tx: UnboundedSender>) -> Self { + Self { tx } + } +} + +impl EventHandler for UnboundedSenderEventHandler { + fn handle_event(&mut self, event: notify::Result) { + self.tx.send(event).unwrap(); + } +} diff --git a/datastores/gossip_kv/server/config/static/default.toml b/datastores/gossip_kv/server/config/static/default.toml new file mode 100644 index 00000000000..b651079e8aa --- /dev/null +++ b/datastores/gossip_kv/server/config/static/default.toml @@ -0,0 +1,5 @@ +seed_nodes = [] + +#[[seed_nodes]] +#id = "gossip-kv-seed-nodes-0" +#address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3000" \ No newline at end of file diff --git a/datastores/gossip_kv/server/config/static/development.toml b/datastores/gossip_kv/server/config/static/development.toml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/datastores/gossip_kv/server/main.rs b/datastores/gossip_kv/server/main.rs new file mode 100644 index 00000000000..cb7dac2c8b3 --- /dev/null +++ b/datastores/gossip_kv/server/main.rs @@ -0,0 +1,179 @@ +use std::convert::Infallible; +use std::fmt::Debug; +use std::future::ready; +use std::hash::Hash; +use std::io::Error; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use clap::Parser; +use gossip_kv::membership::{MemberDataBuilder, Protocol}; +use gossip_kv::server::{server, SeedNode}; +use gossip_kv::{ClientRequest, GossipMessage}; +use hydroflow::futures::{SinkExt, StreamExt}; +use hydroflow::tokio_stream::wrappers::IntervalStream; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use hydroflow::{bincode, tokio}; +use prometheus::{gather, Encoder, TextEncoder}; +use tracing::{error, info, trace}; +use warp::Filter; + +use crate::config::{setup_settings_watch, SeedNodeSettings}; +use crate::membership::member_name; + +mod config; + +mod membership; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)] +struct Opts { + /// Port to listen for gossip messages. + #[clap(short, long, default_value = "3000")] + gossip_port: u16, + + /// Port to listen for client requests. + #[clap(short, long, default_value = "3001")] + client_port: u16, +} + +fn make_seed_node(settings: &SeedNodeSettings) -> SeedNode { + SeedNode { + id: settings.id.clone(), + address: ipv4_resolve(&settings.address).unwrap(), + } +} + +async fn metrics_handler() -> Result { + let encoder = TextEncoder::new(); + let metric_families = gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(warp::reply::with_header( + buffer, + "Content-Type", + encoder.format_type(), + )) +} + +#[hydroflow::main] +async fn main() { + tracing_subscriber::fmt::init(); + + let opts: Opts = Opts::parse(); + + let metrics_route = warp::path("metrics").and_then(metrics_handler); + tokio::spawn(async move { + info!("Starting metrics server on port 4003"); + warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await; + }); + + // Setup protocol information in the member metadata. + let client_protocol_address = + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), opts.client_port); + let gossip_protocol_address = + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), opts.gossip_port); + + let member_data = MemberDataBuilder::new(member_name().clone()) + .add_protocol(Protocol::new("gossip".into(), gossip_protocol_address)) + .add_protocol(Protocol::new("client".into(), client_protocol_address)) + .build(); + + let (client_outbound, client_inbound, _) = bind_udp_bytes(client_protocol_address).await; + let (gossip_outbound, gossip_inbound, _) = bind_udp_bytes(gossip_protocol_address).await; + + info!( + "Server {:?} listening for client requests on: {:?}", + member_data.id, client_protocol_address + ); + + // TODO: Remove code duplication here. + // Setup message serialization for outbound client responses. + let client_ob = client_outbound.with(|(msg, addr)| { + ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>(( + hydroflow::util::serialize_to_bytes(msg), + addr, + ))) + }); + + // Setup message deserialization for inbound client requests. + let client_ib = client_inbound.filter_map(|input| { + let mapped = match input { + Ok((bytes, addr)) => { + let msg: bincode::Result = + hydroflow::util::deserialize_from_bytes(&bytes); + match msg { + Ok(msg) => Some((msg, addr)), + Err(e) => { + error!("Error deserializing message: {:?}", e); + None + } + } + } + Err(e) => { + error!("Error receiving message: {:?}", e); + None + } + }; + ready(mapped) + }); + + // Setup message serialization for outbound client responses. + let gossip_ob = gossip_outbound.with(|(msg, addr)| { + ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>(( + hydroflow::util::serialize_to_bytes(msg), + addr, + ))) + }); + + // Setup message deserialization for inbound client requests. + let gossip_ib = gossip_inbound.filter_map(|input| { + let mapped = match input { + Ok((bytes, addr)) => { + let msg: bincode::Result = + hydroflow::util::deserialize_from_bytes(&bytes); + match msg { + Ok(msg) => Some((msg, addr)), + Err(e) => { + error!("Error deserializing message: {:?}", e); + None + } + } + } + Err(e) => { + error!("Error receiving message: {:?}", e); + None + } + }; + ready(mapped) + }); + + let gossip_rx = + IntervalStream::new(tokio::time::interval(tokio::time::Duration::from_secs(5))).map(|_| ()); + + let (_watcher, server_settings, settings_stream) = setup_settings_watch(); + + let seed_nodes = server_settings + .seed_nodes + .iter() + .map(make_seed_node) + .collect::>(); + + let seed_node_stream = settings_stream.map(|settings| { + trace!("Settings updated. Reloading seed nodes"); + settings.seed_nodes.iter().map(make_seed_node).collect() + }); + + // Create and run the server + let mut server = server( + client_ib, + client_ob, + gossip_ib, + gossip_ob, + gossip_rx, + member_data, + seed_nodes, + seed_node_stream, + ); + + server.run_async().await; +} diff --git a/datastores/gossip_kv/server/membership.rs b/datastores/gossip_kv/server/membership.rs new file mode 100644 index 00000000000..2e20ab534bf --- /dev/null +++ b/datastores/gossip_kv/server/membership.rs @@ -0,0 +1,37 @@ +use std::sync::OnceLock; + +use gossip_kv::membership::MemberId; +// use rand::distributions::Distribution; +// use rand::{Rng}; + +/// This is a simple distribution that generates a random lower-case alphanumeric +// struct LowercaseAlphanumeric; +// +// impl Distribution for LowercaseAlphanumeric { +// fn sample(&self, rng: &mut R) -> char { +// let choices = b"abcdefghijklmnopqrstuvwxyz0123456789"; +// choices[rng.gen_range(0..choices.len())] as char +// } +// } + +/// Gets a name for the current process. +pub fn member_name() -> &'static MemberId { + static MEMBER_NAME: OnceLock = OnceLock::new(); + MEMBER_NAME.get_or_init(|| { + // Generate a lower-case alphanumeric suffix of length 4 + + // TODO: Random suffixes are good, but make benchmarking a pain. For now, we'll just use + // the hostname as-is. + + // let suffix: String = thread_rng() + // .sample_iter(&LowercaseAlphanumeric) + // .take(4) + // .map(char::from) + // .collect(); + + // Retrieve hostname + hostname::get().unwrap().to_str().unwrap().to_string() + + // format!("{}-{}", hostname, suffix) + }) +}