diff --git a/.cargo/config.toml b/.cargo/config.toml index be883d3e2fd4..ee79d40d252b 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -15,5 +15,8 @@ linker = "rust-lld" [target.x86_64-unknown-linux-musl] linker = "rust-lld" +[target.aarch64-unknown-linux-musl] +linker = "rust-lld" + [target.x86_64-pc-windows-msvc] linker = "rust-lld.exe" diff --git a/Cargo.lock b/Cargo.lock index bc00ec71c754..4975daca3665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,6 +291,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -306,6 +317,18 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "benches" version = "0.0.0" @@ -774,6 +797,41 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.75", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.75", +] + [[package]] name = "dashmap" version = "6.0.1" @@ -813,6 +871,23 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "differential-dataflow-master" version = "0.13.0-dev.1" @@ -1163,6 +1238,40 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.9.4" @@ -1203,7 +1312,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.20.1", ] [[package]] @@ -1221,10 +1330,13 @@ dependencies = [ "dunce", "dyn-clone", "futures", + "futures-core", "hydroflow_deploy_integration", "indicatif", "inferno", "itertools", + "k8s-openapi", + "kube", "memo-map", "nameof", "nanoid", @@ -1232,6 +1344,7 @@ dependencies = [ "serde", "serde_json", "shell-escape", + "tar", "tempfile", "tokio", "tokio-stream", @@ -1424,6 +1537,77 @@ dependencies = [ "stageleft_tool", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper", + "hyper-util", + "log", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body", + "hyper", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1447,6 +1631,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -1593,6 +1783,153 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec9ad60d674508f3ca8f380a928cfe7b096bc729c4e2dbfe3852bc45da3ab30b" +dependencies = [ + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "jsonpath-rust" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d8fe85bd70ff715f31ce8c739194b423d79811a19602115d611a3ec85d6200" +dependencies = [ + "lazy_static", + "once_cell", + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror", +] + +[[package]] +name = "k8s-openapi" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "550f99d93aa4c2b25de527bce492d772caf5e21d7ac9bd4b508ba781c8d91e30" +dependencies = [ + "base64 0.21.7", + "chrono", + "serde", + "serde-value", + "serde_json", +] + +[[package]] +name = "kube" +version = "0.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bfada4e00dac93a7b94e454ae4cde04ff8786645ac1b98f31352272e2682b5" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "0.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0708306b5c0085f249f5e3d2d56a9bbfe0cbbf4fd4eb9ed4bbba542ba7649a7" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http 1.1.0", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rand", + "rustls", + "rustls-pemfile", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "0.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7845bcc3e0f422df4d9049570baedd9bc1942f0504594e393e72fe24092559cf" +dependencies = [ + "chrono", + "form_urlencoded", + "http 1.1.0", + "json-patch", + "k8s-openapi", + "schemars", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "kube-derive" +version = "0.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0d2527a6ff7adf00b34d558c4c5de9404abe28808cb0a4c64b57e2c1b0716a" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.75", +] + +[[package]] +name = "kube-runtime" +version = "0.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4560e2c5c71366f6dceb6500ce33cf72299aede92381bb875dc2d4ba4f102c21" +dependencies = [ + "ahash", + "async-trait", + "backoff", + "derivative", + "futures", + "hashbrown", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot 0.12.3", + "pin-project", + "serde", + "serde_json", + "smallvec", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lattices" version = "0.5.7" @@ -1718,6 +2055,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minicov" version = "0.3.5" @@ -1898,6 +2241,12 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-src" version = "300.3.1+3.3.1" @@ -1920,6 +2269,21 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -1986,12 +2350,67 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pem" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "percent-encoding" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.75", +] + +[[package]] +name = "pest_meta" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2434,6 +2853,21 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rust-sitter" version = "0.4.3" @@ -2523,6 +2957,61 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" + +[[package]] +name = "rustls-webpki" +version = "0.102.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "ryu" version = "1.0.18" @@ -2538,6 +3027,39 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "schemars" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.75", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -2562,6 +3084,39 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "0.9.0" @@ -2601,6 +3156,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde-wasm-bindgen" version = "0.4.5" @@ -2623,6 +3188,17 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "serde_json" version = "1.0.128" @@ -2645,6 +3221,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2757,6 +3346,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "ssh2" version = "0.9.4" @@ -2855,6 +3450,12 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "1.0.109" @@ -2899,6 +3500,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tar" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "target-lexicon" version = "0.12.16" @@ -3087,6 +3699,17 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -3108,7 +3731,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.20.1", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", ] [[package]] @@ -3122,6 +3757,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite", + "slab", "tokio", ] @@ -3184,12 +3820,61 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "base64 0.21.7", + "bitflags 2.6.0", + "bytes", + "http 1.1.0", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3301,6 +3986,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8ddffe35a0e5eeeadf13ff7350af564c6e73993a24db62caee1822b185c2600" +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "try_match" version = "0.4.2" @@ -3358,7 +4049,26 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", "httparse", "log", "rand", @@ -3374,6 +4084,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -3413,6 +4129,18 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" @@ -3474,6 +4202,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -3904,6 +4641,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -3925,6 +4673,12 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + [[package]] name = "zipf" version = "7.0.1" diff --git a/hydro_deploy/core/Cargo.toml b/hydro_deploy/core/Cargo.toml index f74fc73fa066..022441f3f7c2 100644 --- a/hydro_deploy/core/Cargo.toml +++ b/hydro_deploy/core/Cargo.toml @@ -21,11 +21,14 @@ bytes = "1.1.0" cargo_metadata = "0.18.0" dunce = "1.0.0" dyn-clone = "1.0.0" -futures = "0.3.0" +futures = "0.3.26" +futures-core = "0.3.26" hydroflow_deploy_integration = { path = "../hydroflow_deploy_integration", version = "^0.9.0" } indicatif = "0.17.0" inferno = "0.11.0" itertools = "0.10.0" # TODO(mingwei): remove when `iter_intersperse` is stabilized. +kube = { version = "0.90.0", features = ["derive", "runtime", "ws"] } +k8s-openapi = { version = "0.21.1", features = ["latest"] } memo-map = "0.3.0" nameof = "1.0.0" nanoid = "0.4.0" @@ -37,3 +40,4 @@ tempfile = "3.0.0" tokio = { version = "1.29.0", features = [ "full" ] } tokio-stream = { version = "0.1.3", default-features = false } tokio-util = { version = "0.7.5", features = [ "compat", "io-util" ] } +tar = "0.4.40" diff --git a/hydro_deploy/core/src/azure.rs b/hydro_deploy/core/src/azure.rs index b2b4f5f2a81a..cf0a77559142 100644 --- a/hydro_deploy/core/src/azure.rs +++ b/hydro_deploy/core/src/azure.rs @@ -50,6 +50,7 @@ pub struct AzureHost { project: String, os_type: String, // linux or windows machine_size: String, + architecture: Option, // default to x86_64 image: Option>, region: String, user: Option, @@ -63,6 +64,7 @@ impl AzureHost { project: String, os_type: String, // linux or windows machine_size: String, + architecture: Option, // default to x86_64 image: Option>, region: String, user: Option, @@ -73,6 +75,7 @@ impl AzureHost { os_type, machine_size, image, + architecture, region, user, launched: OnceLock::new(), @@ -84,7 +87,10 @@ impl AzureHost { #[async_trait] impl Host for AzureHost { fn target_type(&self) -> HostTargetType { - HostTargetType::Linux + match self.architecture.as_deref() { + Some("aarch64") => HostTargetType::Linux(crate::LinuxArchitecture::AARCH64), + _ => HostTargetType::Linux(crate::LinuxArchitecture::X86_64), + } } fn request_port(&self, bind_type: &ServerStrategy) { @@ -401,7 +407,7 @@ impl Host for AzureHost { .map(|a| a.clone() as Arc) } - fn provision(&self, resource_result: &Arc) -> Arc { + async fn provision(&self, resource_result: &Arc) -> Arc { self.launched .get_or_init(|| { let id = self.id; diff --git a/hydro_deploy/core/src/custom_service.rs b/hydro_deploy/core/src/custom_service.rs index 947b2e8410fc..297baf550ca3 100644 --- a/hydro_deploy/core/src/custom_service.rs +++ b/hydro_deploy/core/src/custom_service.rs @@ -59,7 +59,7 @@ impl Service for CustomService { } let host = &self.on; - let launched = host.provision(resource_result); + let launched = host.provision(resource_result).await; self.launched_host = Some(launched); Ok(()) } diff --git a/hydro_deploy/core/src/deployment.rs b/hydro_deploy/core/src/deployment.rs index faec27e64bf5..d43164d03bcc 100644 --- a/hydro_deploy/core/src/deployment.rs +++ b/hydro_deploy/core/src/deployment.rs @@ -9,7 +9,7 @@ use tokio::sync::RwLock; use super::gcp::GcpNetwork; use super::{ progress, CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, - ResourceResult, Service, + ResourceResult, Service, PodHost }; use crate::{AzureHost, ServiceBuilder}; @@ -117,7 +117,7 @@ impl Deployment { self.last_resource_result = Some(resource_result.clone()); for host in self.hosts.iter().filter_map(Weak::upgrade) { - host.provision(&resource_result); + host.provision(&resource_result).await; } let upgraded_services = self @@ -224,6 +224,7 @@ impl Deployment { &mut self, project: String, machine_type: String, + architecture: Option, image: String, region: String, network: Arc>, @@ -235,6 +236,7 @@ impl Deployment { id, project, machine_type, + architecture, image, region, network, @@ -250,10 +252,19 @@ impl Deployment { project: String, os_type: String, // linux or windows machine_size: String, + architecture: Option, image: Option>, region: String, user: Option, ) -> Arc { - self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user)) + self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, architecture, image, region, user)) + } + + #[allow(clippy::too_many_arguments)] + #[builder(entry = "PodHost", exit = "add")] + pub fn add_pod_host( + &mut self, + ) -> Arc { + self.add_host(|id| PodHost::new(id)) } } diff --git a/hydro_deploy/core/src/gcp.rs b/hydro_deploy/core/src/gcp.rs index a60a14280d46..3bd684390c65 100644 --- a/hydro_deploy/core/src/gcp.rs +++ b/hydro_deploy/core/src/gcp.rs @@ -174,6 +174,7 @@ pub struct GcpComputeEngineHost { project: String, machine_type: String, + architecture: Option, image: String, region: String, network: Arc>, @@ -192,6 +193,7 @@ impl GcpComputeEngineHost { id: usize, project: impl Into, machine_type: impl Into, + architecture: impl Into>, image: impl Into, region: impl Into, network: Arc>, @@ -202,6 +204,7 @@ impl GcpComputeEngineHost { id, project: project.into(), machine_type: machine_type.into(), + architecture: architecture.into(), image: image.into(), region: region.into(), network, @@ -216,7 +219,10 @@ impl GcpComputeEngineHost { #[async_trait] impl Host for GcpComputeEngineHost { fn target_type(&self) -> HostTargetType { - HostTargetType::Linux + match self.architecture.as_deref() { + Some("aarch64") => HostTargetType::Linux(crate::LinuxArchitecture::AARCH64), + _ => HostTargetType::Linux(crate::LinuxArchitecture::X86_64), + } } fn request_port(&self, bind_type: &ServerStrategy) { @@ -442,7 +448,7 @@ impl Host for GcpComputeEngineHost { .map(|a| a.clone() as Arc) } - fn provision(&self, resource_result: &Arc) -> Arc { + async fn provision(&self, resource_result: &Arc) -> Arc { self.launched .get_or_init(|| { let id = self.id; diff --git a/hydro_deploy/core/src/hydroflow_crate/build.rs b/hydro_deploy/core/src/hydroflow_crate/build.rs index 61efa475a1a7..e17328995e16 100644 --- a/hydro_deploy/core/src/hydroflow_crate/build.rs +++ b/hydro_deploy/core/src/hydroflow_crate/build.rs @@ -11,7 +11,7 @@ use nanoid::nanoid; use tokio::sync::OnceCell; use crate::progress::ProgressTracker; -use crate::HostTargetType; +use crate::{HostTargetType, LinuxArchitecture}; /// Build parameters for [`build_crate_memoized`]. #[derive(PartialEq, Eq, Hash, Clone)] @@ -90,11 +90,20 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO ProgressTracker::rich_leaf("build", move |set_msg| async move { tokio::task::spawn_blocking(move || { let mut command = Command::new("cargo"); - command.args(["build"]); - - if let Some(profile) = params.profile.as_ref() { - command.args(["--profile", profile]); - } + // command.args([ + // "build".to_string(), + // "--profile".to_string(), + // params.profile.unwrap_or("release".to_string()), + // ]); + command.args([ + "zigbuild".to_string(), + "--profile".to_string(), + params.profile.unwrap_or("release".to_string()), + ]); + + // if let Some(profile) = params.profile.as_ref() { + // command.args(["--profile", profile]); + // } if let Some(bin) = params.bin.as_ref() { command.args(["--bin", bin]); @@ -106,9 +115,12 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO match params.target_type { HostTargetType::Local => {} - HostTargetType::Linux => { + HostTargetType::Linux(LinuxArchitecture::X86_64) => { command.args(["--target", "x86_64-unknown-linux-musl"]); } + HostTargetType::Linux(LinuxArchitecture::AARCH64) => { + command.args(["--target", "aarch64-unknown-linux-musl"]); + } } if params.no_default_features { @@ -129,6 +141,8 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO command.env("CARGO_TARGET_DIR", target_dir); } + ProgressTracker::println(&format!("Command to be executed: {:?}", command)); + let mut spawned = command .current_dir(¶ms.src) .stdout(Stdio::piped()) diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 6089e0c8d333..a674b8f7cff2 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -205,7 +205,7 @@ impl Service for HydroflowCrateService { let built = ProgressTracker::leaf("build", self.build()).await?; let host = &self.on; - let launched = host.provision(resource_result); + let launched = host.provision(resource_result).await; launched.copy_binary(built).await?; @@ -267,6 +267,7 @@ impl Service for HydroflowCrateService { *self.server_defns.try_write().unwrap() = serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap(); } else { + ProgressTracker::println(format!("Did not find ready. Instead found: {:?}", ready_line).as_str()); bail!("expected ready"); } diff --git a/hydro_deploy/core/src/kubernetes/launched_binary.rs b/hydro_deploy/core/src/kubernetes/launched_binary.rs new file mode 100644 index 000000000000..6d983c257cae --- /dev/null +++ b/hydro_deploy/core/src/kubernetes/launched_binary.rs @@ -0,0 +1,117 @@ +#[cfg(unix)] +use std::sync::Arc; + +use async_trait::async_trait; +use futures::TryStreamExt; +use kube::api::AttachedProcess; +use tokio::sync::{mpsc, oneshot}; +use std::sync::Mutex; +use anyhow::Error; + +use tokio::io::AsyncWriteExt; + +use crate::util::prioritized_broadcast; +use crate::LaunchedBinary; + +// pub struct LaunchedPodBinary { +// stdin_sender: Sender, +// stdout_cli_receivers: Arc>>>, +// stdout_receivers: Arc>>>, +// stderr_receivers: Arc>>>, +// } + +pub struct LaunchedPodBinary { + stdin_sender: mpsc::UnboundedSender, + stdout_deploy_receivers: Arc>>>, + stdout_receivers: Arc>>>, + stderr_receivers: Arc>>>, +} + +impl LaunchedPodBinary { + pub fn new(launched_pod_binary: &mut AttachedProcess, id: String) -> Self { + // Create streams for stdout and stdin for the running binary in the pod + + let launch_binary_out = tokio_util::io::ReaderStream::new(launched_pod_binary.stdout().unwrap()); + let launch_binary_err = tokio_util::io::ReaderStream::new(launched_pod_binary.stderr().unwrap()); + let mut stdin = launched_pod_binary.stdin().unwrap(); + + + let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::(); + tokio::spawn(async move { + while let Some(line) = stdin_receiver.recv().await { + if stdin.write_all(line.as_bytes()).await.is_err() { + break; + } + + stdin.flush().await.ok(); + } + }); + + let id_clone = id.clone(); + let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast( + launch_binary_out.map_ok(|bytes| String::from_utf8_lossy(&bytes).to_string()), + move |s| println!("[{id_clone}] {s}"), + ); + let (_, stderr_receivers) = prioritized_broadcast( + launch_binary_err.map_ok(|bytes| String::from_utf8_lossy(&bytes).to_string()), + move |s| eprintln!("[{id}] {s}"), + ); + + Self { + stdin_sender, + stdout_deploy_receivers, + stdout_receivers, + stderr_receivers, + } + } +} + +#[async_trait] +impl LaunchedBinary for LaunchedPodBinary { + fn stdin(&self) -> mpsc::UnboundedSender { + self.stdin_sender.clone() + } + + fn deploy_stdout(&self) -> oneshot::Receiver { + let mut receivers = self.stdout_deploy_receivers.lock().unwrap(); + + if receivers.is_some() { + panic!("Only one deploy stdout receiver is allowed at a time"); + } + + let (sender, receiver) = oneshot::channel::(); + *receivers = Some(sender); + receiver + } + + fn stdout(&self) -> mpsc::UnboundedReceiver { + let mut receivers = self.stdout_receivers.lock().unwrap(); + let (sender, receiver) = mpsc::unbounded_channel::(); + receivers.push(sender); + receiver + } + + fn stderr(&self) -> mpsc::UnboundedReceiver { + let mut receivers = self.stderr_receivers.lock().unwrap(); + let (sender, receiver) = mpsc::unbounded_channel::(); + receivers.push(sender); + receiver + } + + // returns exit code when the hydroflow program finishes + fn exit_code(&self) -> Option { + None + } + + // waits for the hydroflow program to finish + async fn wait(&mut self) -> Result { + Ok(1) + } + + // waits for the hydroflow program to finish + async fn stop(&mut self) -> Result<(), Error> { + // Implement the logic to stop the hydroflow program + // For now, we will return Ok(()) to indicate success + Ok(()) + } +} diff --git a/hydro_deploy/core/src/kubernetes/mod.rs b/hydro_deploy/core/src/kubernetes/mod.rs new file mode 100644 index 000000000000..e4f93af6703a --- /dev/null +++ b/hydro_deploy/core/src/kubernetes/mod.rs @@ -0,0 +1,347 @@ +use std::{collections::HashMap, sync::OnceLock}; +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use hydroflow_deploy_integration::ServerBindConfig; + +use crate::hydroflow_crate::{build::BuildOutput, tracing_options::TracingOptions}; + +use super::{ + ClientStrategy, Host, HostTargetType, LaunchedBinary, LaunchedHost, ResourceBatch, + ResourceResult, ServerStrategy, +}; + +use futures::{StreamExt, TryStreamExt}; +use k8s_openapi::api::core::v1::Pod; +use kube::{ + api::{Api, AttachParams, ListParams, PostParams, ResourceExt, WatchEvent, WatchParams}, + Client, +}; + +use tokio::io::AsyncWriteExt; +use nanoid::nanoid; + +use super::progress::ProgressTracker; + +pub mod launched_binary; +pub use launched_binary::*; + +const ALPHABET: [char; 36] = [ + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', + 't', 'u', 'v', 'w', 'x', 'y', 'z', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', +]; + +#[derive(Debug)] +pub struct PodHost { + pub id: usize, + client_only: bool, + pub launched: OnceLock>, +} + +impl PodHost { + pub fn new(id: usize) -> PodHost { + PodHost { + id, + client_only: false, + launched: OnceLock::new(), + } + } + + pub fn client_only(&self) -> PodHost { + PodHost { + id: self.id, + client_only: true, + launched: OnceLock::new(), + } + } +} + +#[async_trait] +impl Host for PodHost { + fn target_type(&self) -> HostTargetType { + HostTargetType::Linux(crate::LinuxArchitecture::AARCH64) + } + + fn request_port(&self, _bind_type: &ServerStrategy) {} + fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {} + fn request_custom_binary(&self) {} + + fn id(&self) -> usize { + self.id + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn launched(&self) -> Option> { + self.launched + .get() + .map(|a| a.clone() as Arc) + } + + async fn provision(&self, _resource_result: &Arc) -> Arc { + if self.launched.get().is_none() { + let client = Client::try_default().await.unwrap(); + let pod_id = nanoid!(10, &ALPHABET); // pod names can only contain alphanumeric characters + let pod_name = format!("hydro-{}", pod_id); + + // Blank template for a new pod + let p: Pod = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { "name": pod_name }, + "spec": { + "containers": [{ + "name": pod_name, + "image": "ubuntu:20.04", + // Do nothing + "command": ["tail", "-f", "/dev/null"], + }], + } + })).unwrap(); + + let pods: Api = Api::default_namespaced(client); + + // Check if pod_name has already been created. If not, create it. + let lp = ListParams::default().fields(&format!("metadata.name={}", pod_name)); // only want results for our pod + let mut found_existing_pod = false; + match pods.list(&lp).await { + Ok(pod_list) => { + for p in pod_list { + if p.name_any() == pod_name { + found_existing_pod = true; + } + } + } + Err(e) => { + ProgressTracker::println(format!("Error listing pods: {:?}. Maybe your kubernetes cluster is not up?", e).as_str()); + } + } + if !found_existing_pod { + ProgressTracker::println(format!("Creating new pod {:?}", pod_name).as_str()); + let res = pods.create(&PostParams::default(), &p).await; + match res { + Err(e) => ProgressTracker::println(format!("{:?}", e).as_str()), + Ok(_) => (), + } + } + ProgressTracker::println("Check 1"); + + // Wait until the pod is running, otherwise we get 500 error. + let wp = WatchParams::default().fields(format!("metadata.name={}", pod_name).as_str()).timeout(10); + let mut stream = pods.watch(&wp, "0").await.unwrap().boxed(); + loop { + let status_result = stream.try_next().await; + match status_result { + Ok(Some(status)) => match status { + WatchEvent::Added(o) => { + ProgressTracker::println(&format!("Found pod {}", o.name_any())); + } + WatchEvent::Modified(o) => { + let s = o.status.as_ref().expect("status exists on pod"); + if s.phase.clone().unwrap_or_default() == "Running" { + ProgressTracker::println(&format!("Ready to attach to {}", o.name_any())); + break; + } + } + _ => {} + }, + Ok(None) => { + // Pod still being created, likely -- restart the watch stream + stream = pods.watch(&wp, "0").await.unwrap().boxed(); + } + Err(e) => { + ProgressTracker::println(&format!("Error watching pod events: {:?}", e)); + break; + } + } + } + + let internal_ip = pods.get_status(pod_name.as_str()).await.unwrap().status.unwrap().pod_ip.unwrap(); + + self.launched.set(Arc::new(LaunchedPod { + internal_ip, + pod_name, + })).unwrap(); + } + + self.launched.get().unwrap().clone() + } + + fn strategy_as_server<'a>( + &'a self, + connection_from: &dyn Host, + ) -> Result<( + ClientStrategy<'a>, + Box ServerStrategy>, + )> { + if self.client_only { + anyhow::bail!("Pod cannot be a server if it is client only") + } + + if connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id)) { + Ok(( + ClientStrategy::UnixSocket(self.id), + Box::new(|_| ServerStrategy::UnixSocket), + )) + } else if connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self)) { + Ok(( + ClientStrategy::InternalTcpPort(self), + Box::new(|_| ServerStrategy::InternalTcpPort), + )) + } else { + anyhow::bail!("Could not find a strategy to connect to Pod") + } + } + + fn can_connect_to(&self, typ: ClientStrategy) -> bool { + match typ { + ClientStrategy::UnixSocket(id) => { + #[cfg(unix)] + { + self.id == id + } + + #[cfg(not(unix))] + { + let _ = id; + false + } + } + // target_host.as_any().downcast_ref::() + ClientStrategy::InternalTcpPort(_target_host) => true, // TODO: if I'm on the same cluster, can just return true first + ClientStrategy::ForwardedTcpPort(_) => true, + } + } +} + +#[derive(Debug)] +pub struct LaunchedPod { + pub internal_ip: String, // internal IP address for inter-pod communication in a node + pub pod_name: String, +} + +#[async_trait] +impl LaunchedHost for LaunchedPod { + fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig { + match bind_type { + ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket, + ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort(self.internal_ip.clone()), // TODO: change to pod's internal port + ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"), + ServerStrategy::Demux(demux) => { + let mut config_map = HashMap::new(); + for (key, underlying) in demux { + config_map.insert(*key, self.server_config(underlying)); + } + + ServerBindConfig::Demux(config_map) + } + ServerStrategy::Merge(merge) => { + let mut configs = vec![]; + for underlying in merge { + configs.push(self.server_config(underlying)); + } + + ServerBindConfig::Merge(configs) + } + ServerStrategy::Tagged(underlying, id) => { + ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id) + } + ServerStrategy::Null => ServerBindConfig::Null, + } + } + + async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> { + // Create a new pod in the running kubernetes cluster (we assume the user already has one up) + ProgressTracker::println("Copying binary to pod"); + let client = Client::try_default().await?; + let pods: Api = Api::default_namespaced(client); + + let file_name = format!("hydro-{}-binary", binary.unique_id); + // let metadata = std::fs::metadata("/Users/nickjiang/Nick/Hydro/hydroflow/hydro_deploy/core/src/kubernetes/hello_world_aarch64_musl")?; // importantly, this file has executable permissions + { + let binary_data = binary.bin_data.clone(); + let mut header = tar::Header::new_gnu(); + header.set_path(file_name).unwrap(); + header.set_size(binary_data.len() as u64); + // header.set_metadata(&metadata); + header.set_mode(0o755); // give the binary executable permissions + header.set_cksum(); + + let mut ar = tar::Builder::new(Vec::new()); + ar.append(&header, &mut binary_data.as_slice()).unwrap(); + let data = ar.into_inner().unwrap(); + + let ap = AttachParams::default().stdin(true).stderr(false); + let mut tar = pods + .exec(self.pod_name.as_str(), vec!["tar", "xf", "-", "-C", "/"], &ap) + .await?; + let mut tar_stdin = tar.stdin().unwrap(); + // tar_stdin.write_all(&data).await?; + if let Err(e) = tar_stdin.write_all(&data).await { + ProgressTracker::println(&format!("Error writing to stdin: {:?}", e)); + return Err(e.into()); + } + ProgressTracker::println("Wrote all the stdin"); + + // Flush the stdin to finish sending the file through + if let Err(e) = tar_stdin.flush().await { + ProgressTracker::println(&format!("Error flushing stdin: {:?}", e)); + return Err(e.into()); + } + + ProgressTracker::println("Flushed!"); + drop(tar_stdin); // Ensure stdin is closed before joining + let result = tar.join().await; + // tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + match result { + Ok(_) => ProgressTracker::println("Successfully copied binary to pod"), + Err(e) => ProgressTracker::println(&format!("Failed to copy binary to pod: {:?}", e)), + } + } + ProgressTracker::println("Finishd copying binary to pod"); + + Ok(()) + } + + async fn launch_binary( + &self, + id: String, + binary: &BuildOutput, + args: &[String], + _perf: Option, + ) -> Result> { + ProgressTracker::println("Launching binary in Pod"); + + let client = Client::try_default().await?; + let pods: Api = Api::default_namespaced(client); + let pod_name = &self.pod_name; + let file_name = format!("hydro-{}-binary", binary.unique_id); + + // Construct arguments + let mut args_list = vec![format!("./{file_name}")]; + args_list.extend(args.iter().cloned()); + + // Execute binary inside the new pod + let ap = AttachParams::default().stdin(true).stdout(true).stderr(true); + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + let mut launch_binary = match pods.exec(pod_name, args_list, &ap).await { + Ok(exec) => exec, + Err(e) => { + ProgressTracker::println(&format!("Failed to launch binary in Pod: {:?}", e)); + return Err(e.into()); + } + }; + + Ok(Box::new(LaunchedPodBinary::new( + &mut launch_binary, id, + ))) + } + + async fn forward_port(&self, addr: &SocketAddr) -> Result { + Ok(*addr) + } +} diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index f118b25e472d..0ebfa7d63d28 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -15,6 +15,9 @@ pub mod progress; pub mod localhost; pub use localhost::LocalhostHost; +pub mod kubernetes; +pub use kubernetes::PodHost; + pub mod ssh; pub mod gcp; @@ -140,14 +143,22 @@ pub enum ClientStrategy<'a> { ), } +// Architecture for binary +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum LinuxArchitecture { + X86_64, + AARCH64, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum HostTargetType { Local, - Linux, + Linux(LinuxArchitecture), } pub type HostStrategyGetter = Box ServerStrategy>; +#[async_trait] pub trait Host: Send + Sync { fn target_type(&self) -> HostTargetType; @@ -156,6 +167,9 @@ pub trait Host: Send + Sync { /// An identifier for this host, which is unique within a deployment. fn id(&self) -> usize; + /// Returns a reference to the host as a trait object. + fn as_any(&self) -> &dyn std::any::Any; + /// Configures the host to support copying and running a custom binary. fn request_custom_binary(&self); @@ -167,7 +181,7 @@ pub trait Host: Send + Sync { /// Connects to the acquired resources and prepares the host to run services. /// /// This should be called after `collect_resources` is called. - fn provision(&self, resource_result: &Arc) -> Arc; + async fn provision(&self, resource_result: &Arc) -> Arc; fn launched(&self) -> Option>; @@ -180,9 +194,6 @@ pub trait Host: Send + Sync { /// Determines whether this host can connect to another host using the given strategy. fn can_connect_to(&self, typ: ClientStrategy) -> bool; - - /// Returns a reference to the host as a trait object. - fn as_any(&self) -> &dyn std::any::Any; } #[async_trait] diff --git a/hydro_deploy/core/src/localhost/mod.rs b/hydro_deploy/core/src/localhost/mod.rs index a15632af030e..ce3ec944c5fc 100644 --- a/hydro_deploy/core/src/localhost/mod.rs +++ b/hydro_deploy/core/src/localhost/mod.rs @@ -66,7 +66,7 @@ impl Host for LocalhostHost { Some(Arc::new(LaunchedLocalhost)) } - fn provision(&self, _resource_result: &Arc) -> Arc { + async fn provision(&self, _resource_result: &Arc) -> Arc { Arc::new(LaunchedLocalhost) } diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index def888a52b12..e80b690ed9f8 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -22,6 +22,16 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::StreamExt; use tokio_util::io::SyncIoBridge; +// use k8s_openapi::api::core::v1::Pod; +// use kube::{ +// api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams}, +// Client, +// }; + +// use tokio::io::AsyncWriteExt; + +// use kube::core::subresource::AttachParams; + use super::progress::ProgressTracker; use super::util::async_retry; use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy}; @@ -354,6 +364,7 @@ impl LaunchedHost for T { let user = self.ssh_user(); let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}")); + // gets the ssh session for launching the binary let channel = ProgressTracker::leaf( format!("launching binary {}", binary_path.display()), async { @@ -397,11 +408,14 @@ impl LaunchedHost for T { break; } + // flush the entire buffer stdin.flush().await.unwrap(); } }); let id_clone = id.clone(); + // Pull away the first stdout stream into a different "prioritized" channel, + // and send everything else to stdout let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(FuturesBufReader::new(channel.stream(0)).lines(), move |s| { ProgressTracker::println(format!("[{id_clone}] {s}")); diff --git a/hydro_deploy/core/src/util.rs b/hydro_deploy/core/src/util.rs index 3a3c243bafb4..c1035186514e 100644 --- a/hydro_deploy/core/src/util.rs +++ b/hydro_deploy/core/src/util.rs @@ -28,6 +28,8 @@ type PriorityBroadcacst = ( Arc>>>, ); +// Divides up a single stream into two channels (one prioritized and the other not) +// The first data packet that comes in will get sent to the first channel, and all others will get sent to the second channel pub fn prioritized_broadcast> + Send + Unpin + 'static>( mut lines: T, default: impl Fn(String) + Send + 'static, diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 2c1de84faff7..60bbd0a98db3 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -162,6 +162,24 @@ impl Deployment { } } + #[expect(non_snake_case, reason = "pymethods")] + fn PodHost(&self, py: Python<'_>) -> PyResult> { + let arc = self.underlying.blocking_write().add_host(|id| { + core::PodHost::new( + id + ) + }); + + Ok(Py::new( + py, + PyClassInitializer::from(Host { + underlying: arc.clone(), + }) + .add_subclass(KubernetesPodHost { underlying: arc }), + )? + .into_py(py)) + } + #[expect(non_snake_case, reason = "pymethods")] fn Localhost(&self, py: Python<'_>) -> PyResult> { let arc = self.underlying.blocking_read().Localhost(); @@ -185,6 +203,7 @@ impl Deployment { image: String, region: String, network: GcpNetwork, + architecture: Option, user: Option, startup_script: Option, ) -> PyResult> { @@ -193,6 +212,7 @@ impl Deployment { id, project, machine_type, + architecture, image, region, network.underlying, @@ -219,11 +239,12 @@ impl Deployment { os_type: String, // linux or windows machine_size: String, region: String, + architecture: Option, image: Option>, user: Option, ) -> PyResult> { let arc = self.underlying.blocking_write().add_host(|id| { - core::AzureHost::new(id, project, os_type, machine_size, image, region, user) + core::AzureHost::new(id, project, os_type, machine_size, architecture, image, region, user) }); Ok(Py::new( @@ -358,6 +379,28 @@ impl LocalhostHost { } } +#[pyclass(extends=Host, subclass)] +struct KubernetesPodHost { + underlying: Arc, +} + +#[pymethods] +impl KubernetesPodHost { + fn client_only(&self, py: Python<'_>) -> PyResult> { + let arc = Arc::new(self.underlying.client_only()); + + Ok(Py::new( + py, + PyClassInitializer::from(Host { + underlying: arc.clone(), + }) + .add_subclass(KubernetesPodHost { underlying: arc }), + )? + .into_py(py)) + } +} + + #[pyclass] #[derive(Clone)] struct GcpNetwork { @@ -839,6 +882,7 @@ async def coroutine_to_safely_cancellable(c, cancel_token): module.add_class::()?; module.add_class::()?; + module.add_class::()?; module.add_class::()?; module.add_class::()?; diff --git a/hydro_deploy/hydro_cli_examples/kubernetes_send_receive.py b/hydro_deploy/hydro_cli_examples/kubernetes_send_receive.py new file mode 100644 index 000000000000..8f9a689a3b6c --- /dev/null +++ b/hydro_deploy/hydro_cli_examples/kubernetes_send_receive.py @@ -0,0 +1,66 @@ +import hydro +import json +from pathlib import Path +from aiostream import stream + +async def main(args): + + deployment = hydro.Deployment() + + machine1 = deployment.PodHost() + + machine2 = deployment.PodHost() + + sender_count = 2 + senders = [deployment.HydroflowCrate( + src=str(Path(__file__).parent.absolute()), + example="dedalus_sender", + args=[json.dumps(([0, 1], i))], + on=machine1 + ) for i in range(sender_count)] + + receiver1 = deployment.HydroflowCrate( + src=str(Path(__file__).parent.absolute()), + example="dedalus_receiver", + on=machine2 + ) + + receiver2 = deployment.HydroflowCrate( + src=str(Path(__file__).parent.absolute()), + example="dedalus_receiver", + on=machine2 + ) + + for sender in senders: + sender.ports.broadcast.send_to(hydro.demux({ + 0: receiver1.ports.broadcast.merge(), + 1: receiver2.ports.broadcast.merge() + })) + + await deployment.deploy() + + print("deployed!") + + # create this as separate variable to indicate to Hydro that we want to capture all stdout, even after the loop + receiver_1_out = await receiver1.stdout() + receiver_2_out = await receiver2.stdout() + + await deployment.start() + print("started!") + + counter = 0 + async with stream.merge(stream.map(receiver_1_out, lambda x: f"RECEIVER 1: {x}"), stream.map(receiver_2_out, lambda x: f"RECEIVER 2: {x}")).stream() as merged: + async for log in merged: + print(log) + counter += 1 + if counter == 10: + break + + for sender in senders: + await sender.stop() + print(await sender.exit_code()) + +if __name__ == "__main__": + import sys + import hydro.async_wrapper + hydro.async_wrapper.run(main, sys.argv[1:]) diff --git a/hydroflow/src/util/deploy.rs b/hydroflow/src/util/deploy.rs index 64d436ea440b..e7815d08c773 100644 --- a/hydroflow/src/util/deploy.rs +++ b/hydroflow/src/util/deploy.rs @@ -79,6 +79,8 @@ pub async fn init_no_ack_start() -> DeployPorts