diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a10723e..2213edc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,7 +79,7 @@ jobs: - name: Setup Build Environment run: | apt update - apt install --yes gcc g++ libssl-dev pkg-config cmake + apt install --yes gcc g++ libssl-dev pkg-config cmake protobuf-compiler rm -rf /var/lib/apt/lists/* - name: Install Clippy run: | @@ -122,7 +122,7 @@ jobs: - name: Setup Build Environment run: | apt update - apt install --yes gcc g++ libssl-dev pkg-config cmake + apt install --yes gcc g++ libssl-dev pkg-config cmake protobuf-compiler rm -rf /var/lib/apt/lists/* - name: Run Test run: | diff --git a/Cargo.lock b/Cargo.lock index 0753e1d..7e01e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,33 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "anyhow" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.57" @@ -51,7 +78,7 @@ dependencies = [ "lazy_static", "libflate", "num-bigint", - "rand", + "rand 0.7.3", "serde", "serde_json", "strum", @@ -62,6 +89,51 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "axum" +version = "0.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes 1.1.0", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" +dependencies = [ + "async-trait", + "bytes 1.1.0", + "futures-util", + "http", + "http-body", + "mime", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -78,23 +150,10 @@ dependencies = [ ] [[package]] -name = "bindgen" -version = "0.57.0" +name = "base64" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", -] +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "bitflags" @@ -102,15 +161,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "boringssl-src" -version = "0.3.0+688fc5c" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f901accdf830d2ea2f4e27f923a5e1125cd8b1a39ab578b9db1a42d578a6922b" -dependencies = [ - "cmake", -] - [[package]] name = "byteorder" version = "1.4.3" @@ -149,28 +199,18 @@ dependencies = [ "common_types", "dashmap", "futures", - "grpcio", "tokio", + "tonic", ] [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d#2c10152d021cd5a26b9c870cdede6a0317adca3d" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=29cb0c6fba76401fd9a4ae5b8cacc9002ad78650#29cb0c6fba76401fd9a4ae5b8cacc9002ad78650" dependencies = [ - "futures", - "grpcio", - "protobuf", - "protobuf-builder", -] - -[[package]] -name = "cexpr" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" -dependencies = [ - "nom", + "prost", + "tonic", + "tonic-build", ] [[package]] @@ -192,26 +232,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "clang-sys" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc00842eed744b858222c4c9faf7243aafc6d33f92f96935263ef4d8a41ce21" -dependencies = [ - "glob", - "libc", - "libloading", -] - -[[package]] -name = "cmake" -version = "0.1.48" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" -dependencies = [ - "cc", -] - [[package]] name = "common_types" version = "0.1.0" @@ -247,7 +267,7 @@ dependencies = [ "cfg-if", "hashbrown", "lock_api", - "parking_lot_core 0.9.3", + "parking_lot_core", ] [[package]] @@ -302,6 +322,18 @@ dependencies = [ "instant", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures" version = "0.3.21" @@ -429,26 +461,6 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" -[[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - -[[package]] -name = "grpcio" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d99e00eed7e0a04ee2705112e7cfdbe1a3cc771147f22f016a8cd2d002187b" -dependencies = [ - "futures", - "grpcio-sys", - "libc", - "log", - "parking_lot 0.11.2", - "protobuf", -] - [[package]] name = "grpcio-compiler" version = "0.7.0" @@ -459,19 +471,22 @@ dependencies = [ ] [[package]] -name = "grpcio-sys" -version = "0.9.1+1.38.0" +name = "h2" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9447d1a926beeef466606cc45717f80897998b548e7dc622873d453e1ecb4be4" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" dependencies = [ - "bindgen", - "boringssl-src", - "cc", - "cmake", - "libc", - "libz-sys", - "pkg-config", - "walkdir", + "bytes 1.1.0", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", ] [[package]] @@ -489,6 +504,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -498,6 +519,92 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes 1.1.0", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes 1.1.0", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abfba89e19b959ca163c7752ba59d737c1ceea53a5d31a149c805446fc958064" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + +[[package]] +name = "indexmap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.12" @@ -507,6 +614,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.1" @@ -519,12 +635,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.123" @@ -551,28 +661,6 @@ dependencies = [ "rle-decode-fast", ] -[[package]] -name = "libloading" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd" -dependencies = [ - "cfg-if", - "winapi", -] - -[[package]] -name = "libz-sys" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "lock_api" version = "0.4.7" @@ -592,12 +680,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -620,6 +720,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "murmur3" version = "0.4.1" @@ -629,16 +735,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "nom" -version = "5.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" -dependencies = [ - "memchr", - "version_check", -] - [[package]] name = "num-bigint" version = "0.2.6" @@ -694,17 +790,6 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.5", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -712,21 +797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.3", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -749,10 +820,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" [[package]] -name = "peeking_take_while" -version = "0.1.2" +name = "percent-encoding" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" + +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "pin-project-lite" @@ -766,18 +867,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkg-config" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" - [[package]] name = "ppv-lite86" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.43" @@ -787,6 +892,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +dependencies = [ + "bytes 1.1.0", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +dependencies = [ + "bytes 1.1.0", + "heck 0.4.0", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +dependencies = [ + "bytes 1.1.0", + "prost", +] + [[package]] name = "proto" version = "0.1.0" @@ -868,11 +1026,22 @@ checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ "getrandom 0.1.16", "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.2.2", + "rand_core 0.5.1", "rand_hc", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -880,7 +1049,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.5.1", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -892,13 +1071,22 @@ dependencies = [ "getrandom 0.1.16", ] +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.6", +] + [[package]] name = "rand_hc" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -946,27 +1134,12 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "ryu" version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -1004,12 +1177,6 @@ dependencies = [ "serde", ] -[[package]] -name = "shlex" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1084,7 +1251,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87c85aa3f8ea653bfd3ddf25f7ee357ee4d204731f6aa9ad04002306f6e2774c" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", @@ -1101,6 +1268,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "synstructure" version = "0.12.6" @@ -1169,7 +1342,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1177,6 +1350,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -1188,6 +1371,176 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes 1.1.0", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes 1.1.0", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typed-builder" version = "0.5.1" @@ -1233,12 +1586,6 @@ dependencies = [ "serde", ] -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" @@ -1246,14 +1593,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] -name = "walkdir" -version = "2.3.2" +name = "want" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" dependencies = [ - "same-file", - "winapi", - "winapi-util", + "log", + "try-lock", ] [[package]] @@ -1301,15 +1647,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index e9beaf0..dcdb74b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,14 +6,15 @@ edition = "2021" [dependencies] async-trait = "0.1.57" -grpcio = "0.9.1" avro-rs = "0.13.0" dashmap = "5.3.4" futures = "0.3" +tonic = "0.8.1" +tokio = "1.15" [dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "2c10152d021cd5a26b9c870cdede6a0317adca3d" +rev = "29cb0c6fba76401fd9a4ae5b8cacc9002ad78650" [dependencies.common_types] git = "https://github.com/CeresDB/ceresdb.git" diff --git a/src/db_client/builder.rs b/src/db_client/builder.rs index a9c9c4e..b309270 100644 --- a/src/db_client/builder.rs +++ b/src/db_client/builder.rs @@ -4,8 +4,7 @@ use std::sync::Arc; use crate::{ db_client::{cluster::ClusterImpl, standalone::StandaloneImpl, DbClient}, - router::RouterImpl, - rpc_client::RpcClientImplBuilder, + rpc_client::RpcClientImplFactory, RpcConfig, RpcOptions, }; @@ -52,17 +51,13 @@ impl Builder { } pub fn build(self) -> Arc { - let rpc_client_builder = RpcClientImplBuilder::new(self.grpc_config, self.rpc_opts); + let rpc_client_factory = + Arc::new(RpcClientImplFactory::new(self.grpc_config, self.rpc_opts)); match self.mode { - Mode::Standalone => { - Arc::new(StandaloneImpl::new(rpc_client_builder.build(self.endpoint))) - } + Mode::Standalone => Arc::new(StandaloneImpl::new(rpc_client_factory, self.endpoint)), - Mode::Cluster => { - let router = RouterImpl::new(rpc_client_builder.build(self.endpoint)); - Arc::new(ClusterImpl::new(router, rpc_client_builder)) - } + Mode::Cluster => Arc::new(ClusterImpl::new(rpc_client_factory, self.endpoint)), } } } diff --git a/src/db_client/cluster.rs b/src/db_client/cluster.rs index 2090ce5..4c9f498 100644 --- a/src/db_client/cluster.rs +++ b/src/db_client/cluster.rs @@ -5,8 +5,9 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use dashmap::DashMap; use futures::future::join_all; +use tokio::sync::OnceCell; -use super::{standalone::StandaloneImpl, DbClient}; +use super::{inner::InnerClient, DbClient}; use crate::{ errors::ClusterWriteError, model::{ @@ -15,29 +16,48 @@ use crate::{ write::{WriteRequest, WriteResponse}, QueryResponse, }, - router::Router, - rpc_client::{RpcClientImpl, RpcClientImplBuilder, RpcContext}, + router::{Router, RouterImpl}, + rpc_client::{RpcClientFactory, RpcContext}, util::should_refresh, Error, Result, }; -/// Client for ceresdb of cluster mode. -pub struct ClusterImpl { - router: R, - // Server connection handler pool. - standalone_pool: StandalonePool, +/// Client implementation for ceresdb while using cluster mode. +pub struct ClusterImpl { + factory: Arc, + router_endpoint: String, + router: OnceCell>, + standalone_pool: DirectClientPool, +} + +impl ClusterImpl { + pub fn new(factory: Arc, router_endpoint: String) -> Self { + Self { + factory: factory.clone(), + router_endpoint, + router: OnceCell::new(), + standalone_pool: DirectClientPool::new(factory), + } + } + + #[inline] + async fn init_router(&self) -> Result> { + let router_client = self.factory.build(self.router_endpoint.clone()).await?; + Ok(Box::new(RouterImpl::new(router_client))) + } } #[async_trait] -impl DbClient for ClusterImpl { +impl DbClient for ClusterImpl { async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { if req.metrics.is_empty() { return Err(Error::Unknown( "Metrics in query request can't be empty in cluster mode".to_string(), )); } + let router_handle = self.router.get_or_try_init(|| self.init_router()).await?; - let endpoint = match self.router.route(&req.metrics, ctx).await { + let endpoint = match router_handle.route(&req.metrics, ctx).await { Ok(mut eps) => { if let Some(ep) = eps[0].take() { ep @@ -54,8 +74,8 @@ impl DbClient for ClusterImpl { let client = self.standalone_pool.get_or_create(&endpoint).clone(); - client.query_internal(ctx, req.clone()).await.map_err(|e| { - self.router.evict(&req.metrics); + client.query_internal(ctx, req).await.map_err(|e| { + router_handle.evict(&req.metrics); e }) } @@ -63,7 +83,8 @@ impl DbClient for ClusterImpl { async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { // Get metrics' related endpoints(some may not exist). let should_routes: Vec<_> = req.write_entries.iter().map(|(m, _)| m.clone()).collect(); - let endpoints = self.router.route(&should_routes, ctx).await?; + let router_handle = self.router.get_or_try_init(|| self.init_router()).await?; + let endpoints = router_handle.route(&should_routes, ctx).await?; // Partition write entries in request according to related endpoints. let mut no_corresponding_endpoints = Vec::new(); @@ -99,7 +120,7 @@ impl DbClient for ClusterImpl { .collect(); let mut futures = Vec::with_capacity(client_req_paris.len()); for (client, req) in client_req_paris { - futures.push(async move { client.write_internal(ctx, req).await }) + futures.push(async move { client.write_internal(ctx, &req).await }) } // Await rpc results and collect results. @@ -134,7 +155,7 @@ impl DbClient for ClusterImpl { }) .flatten() .collect(); - self.router.evict(&evicts); + router_handle.evict(&evicts); let cluster_error: ClusterWriteError = metrics_result_pairs.into(); if cluster_error.all_ok() { @@ -145,30 +166,21 @@ impl DbClient for ClusterImpl { } } -impl ClusterImpl { - pub fn new(route_client: R, standalone_builder: RpcClientImplBuilder) -> Self { - Self { - router: route_client, - standalone_pool: StandalonePool::new(standalone_builder), - } - } -} - -struct StandalonePool { - pool: DashMap>>, - standalone_builder: RpcClientImplBuilder, +/// DirectClientPool is the pool actually holding connections to data nodes. +struct DirectClientPool { + pool: DashMap>>, + factory: Arc, } -// TODO better to add gc. -impl StandalonePool { - fn new(standalone_builder: RpcClientImplBuilder) -> Self { +impl DirectClientPool { + fn new(factory: Arc) -> Self { Self { pool: DashMap::new(), - standalone_builder, + factory, } } - fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { + fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { if let Some(c) = self.pool.get(endpoint) { // If exist in cache, return. c.value().clone() @@ -176,8 +188,9 @@ impl StandalonePool { // If not exist, build --> insert --> return. self.pool .entry(endpoint.clone()) - .or_insert(Arc::new(StandaloneImpl::new( - self.standalone_builder.build(endpoint.to_string()), + .or_insert(Arc::new(InnerClient::new( + self.factory.clone(), + endpoint.to_string(), ))) .clone() } diff --git a/src/db_client/inner.rs b/src/db_client/inner.rs new file mode 100644 index 0000000..0f7bac7 --- /dev/null +++ b/src/db_client/inner.rs @@ -0,0 +1,74 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use tokio::sync::OnceCell; + +use crate::{ + model::{ + convert, + request::QueryRequest, + write::{WriteRequest, WriteResponse}, + QueryResponse, Schema, + }, + rpc_client::{RpcClient, RpcClientFactory, RpcContext}, + Error, Result, +}; + +/// Inner client for both standalone and cluster modes. +/// +/// Now, [`InnerClient`] just wraps [`RpcClient`] simply. +pub(crate) struct InnerClient { + factory: Arc, + endpoint: String, + inner_client: OnceCell>, +} + +impl InnerClient { + pub fn new(factory: Arc, endpoint: String) -> Self { + InnerClient { + factory, + endpoint, + inner_client: OnceCell::new(), + } + } + + #[inline] + async fn init(&self) -> Result> { + self.factory.build(self.endpoint.clone()).await + } + + pub async fn query_internal( + &self, + ctx: &RpcContext, + req: &QueryRequest, + ) -> Result { + let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?; + let result_pb = client_handle.as_ref().query(ctx, req.clone().into()).await; + + result_pb.and_then(|resp_pb| { + if !resp_pb.schema_content.is_empty() { + convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows) + .map_err(Error::Client) + } else { + Ok(QueryResponse { + schema: Schema::default(), + rows: Vec::new(), + affected_rows: resp_pb.affected_rows, + }) + } + }) + } + + pub async fn write_internal( + &self, + ctx: &RpcContext, + req: &WriteRequest, + ) -> Result { + let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?; + client_handle + .write(ctx, req.clone().into()) + .await + .map(|resp_pb| resp_pb.into()) + } +} diff --git a/src/db_client/mod.rs b/src/db_client/mod.rs index 07c041c..a530e5f 100644 --- a/src/db_client/mod.rs +++ b/src/db_client/mod.rs @@ -2,6 +2,7 @@ mod builder; mod cluster; +mod inner; mod standalone; use async_trait::async_trait; diff --git a/src/db_client/standalone.rs b/src/db_client/standalone.rs index 3c54d52..221a056 100644 --- a/src/db_client/standalone.rs +++ b/src/db_client/standalone.rs @@ -1,70 +1,43 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +use std::sync::Arc; + use async_trait::async_trait; +use super::inner::InnerClient; use crate::{ db_client::DbClient, model::{ - convert, request::QueryRequest, write::{WriteRequest, WriteResponse}, - QueryResponse, Schema, + QueryResponse, }, - rpc_client::{RpcClient, RpcContext}, - Error, Result, + rpc_client::{RpcClientFactory, RpcContext}, + Result, }; /// Client for ceresdb of standalone mode. /// -/// Now, [`StandaloneImpl`] just wraps [`RpcClient`] simply. -pub struct StandaloneImpl { - pub rpc_client: R, +/// Now, [`StandaloneImpl`] just wraps [`InnerClient`] simply. +pub struct StandaloneImpl { + inner_client: InnerClient, } -#[async_trait] -impl DbClient for StandaloneImpl { - async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { - self.query_internal(ctx, req.clone()).await - } - - async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { - self.write_internal(ctx, req.clone()).await +impl StandaloneImpl { + pub fn new(factory: Arc, endpoint: String) -> Self { + Self { + inner_client: InnerClient::new(factory, endpoint), + } } } -impl StandaloneImpl { - pub fn new(rpc_client: R) -> Self { - Self { rpc_client } - } - - pub async fn query_internal( - &self, - ctx: &RpcContext, - req: QueryRequest, - ) -> Result { - let result_pb = self.rpc_client.query(ctx, &req.into()).await; - result_pb.and_then(|resp_pb| { - if !resp_pb.schema_content.is_empty() { - convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows) - .map_err(Error::Client) - } else { - Ok(QueryResponse { - schema: Schema::default(), - rows: Vec::new(), - affected_rows: resp_pb.affected_rows, - }) - } - }) +#[async_trait] +impl DbClient for StandaloneImpl { + async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { + self.inner_client.query_internal(ctx, req).await } - pub async fn write_internal( - &self, - ctx: &RpcContext, - req: WriteRequest, - ) -> Result { - self.rpc_client - .write(ctx, &req.into()) - .await - .map(|resp_pb| resp_pb.into()) + async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { + self.inner_client.write_internal(ctx, req).await } } diff --git a/src/errors.rs b/src/errors.rs index 6af17c6..7eb63ca 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -6,19 +6,30 @@ use crate::model::write::WriteResponse; pub enum Error { /// Error from the running server. Server(ServerError), + /// Error from the rpc. /// Note that any error caused by a running server wont be wrapped in the /// grpc errors. - Rpc(grpcio::Error), + Rpc(tonic::Status), + /// Error about rpc. /// It will be throw while connection between client and server is broken /// and try for reconnecting is failed(timeout). - Connect(String), + Connect { + addr: String, + source: Box, + }, + /// Error from the client and basically the rpc request has not been called /// yet or the rpc request has already been finished successfully. Client(String), + + /// Error about authentication + AuthFail(AuthFailStatus), + /// ClusterWriteError(ClusterWriteError), + /// Error unknown Unknown(String), } @@ -67,10 +78,19 @@ pub struct ServerError { pub msg: String, } -pub type Result = std::result::Result; +#[derive(Debug, Clone)] +pub struct AuthFailStatus { + pub code: AuthCode, + pub msg: String, +} -impl From for Error { - fn from(grpc_err: grpcio::Error) -> Self { - Error::Rpc(grpc_err) - } +#[derive(Debug, Clone)] +pub enum AuthCode { + Ok = 0, + + InvalidTenantMeta = 1, + + InvalidTokenMeta = 2, } + +pub type Result = std::result::Result; diff --git a/src/model/request.rs b/src/model/request.rs index 9ecafbd..7549ad0 100644 --- a/src/model/request.rs +++ b/src/model/request.rs @@ -12,10 +12,9 @@ pub struct QueryRequest { impl From for QueryRequestPb { fn from(req: QueryRequest) -> Self { - let mut pb_req = QueryRequestPb::default(); - pb_req.set_metrics(req.metrics.into()); - pb_req.set_ql(req.ql); - - pb_req + QueryRequestPb { + metrics: req.metrics, + ql: req.ql, + } } } diff --git a/src/model/value.rs b/src/model/value.rs index 43080ef..bd09bd2 100644 --- a/src/model/value.rs +++ b/src/model/value.rs @@ -2,7 +2,7 @@ //! 'Value' used in local. -use ceresdbproto::storage::Value as ValuePb; +use ceresdbproto::storage::{value, Value as ValuePb}; pub type TimestampMs = i64; @@ -49,20 +49,20 @@ impl From for ValuePb { fn from(val: Value) -> Self { let mut val_pb = ValuePb::default(); match val { - Value::Timestamp(v) => val_pb.set_timestamp_value(v), - Value::Double(v) => val_pb.set_float64_value(v), - Value::Float(v) => val_pb.set_float32_value(v), - Value::Varbinary(v) => val_pb.set_varbinary_value(v), - Value::String(v) => val_pb.set_string_value(v), - Value::UInt64(v) => val_pb.set_uint64_value(v), - Value::UInt32(v) => val_pb.set_uint32_value(v), - Value::UInt16(v) => val_pb.set_uint16_value(v as u32), - Value::UInt8(v) => val_pb.set_uint8_value(v as u32), - Value::Int64(v) => val_pb.set_int64_value(v), - Value::Int32(v) => val_pb.set_int32_value(v), - Value::Int16(v) => val_pb.set_int16_value(v as i32), - Value::Int8(v) => val_pb.set_int8_value(v as i32), - Value::Boolean(v) => val_pb.set_bool_value(v), + Value::Timestamp(v) => val_pb.value = Some(value::Value::TimestampValue(v)), + Value::Double(v) => val_pb.value = Some(value::Value::Float64Value(v)), + Value::Float(v) => val_pb.value = Some(value::Value::Float32Value(v)), + Value::Varbinary(v) => val_pb.value = Some(value::Value::VarbinaryValue(v)), + Value::String(v) => val_pb.value = Some(value::Value::StringValue(v)), + Value::UInt64(v) => val_pb.value = Some(value::Value::Uint64Value(v)), + Value::UInt32(v) => val_pb.value = Some(value::Value::Uint32Value(v)), + Value::UInt16(v) => val_pb.value = Some(value::Value::Uint16Value(v.into())), + Value::UInt8(v) => val_pb.value = Some(value::Value::Uint8Value(v.into())), + Value::Int64(v) => val_pb.value = Some(value::Value::Int64Value(v)), + Value::Int32(v) => val_pb.value = Some(value::Value::Int32Value(v)), + Value::Int16(v) => val_pb.value = Some(value::Value::Int16Value(v.into())), + Value::Int8(v) => val_pb.value = Some(value::Value::Int8Value(v.into())), + Value::Boolean(v) => val_pb.value = Some(value::Value::BoolValue(v)), }; val_pb diff --git a/src/model/write/request.rs b/src/model/write/request.rs index 8ff4a1c..7fb2860 100644 --- a/src/model/write/request.rs +++ b/src/model/write/request.rs @@ -224,15 +224,13 @@ impl From for WriteRequestPb { for (metric, entries) in req.write_entries { write_metrics_pb.push(convert_one_write_metric(metric, entries)); } - req_pb.set_metrics(write_metrics_pb.into()); + req_pb.metrics = write_metrics_pb; req_pb } } fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMetricPb { - let mut write_metric_pb = WriteMetricPb::default(); - let mut tags_dict = NameDict::new(); let mut fields_dict = NameDict::new(); let mut wirte_entries_pb = Vec::with_capacity(entries.len()); @@ -240,12 +238,12 @@ fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMe wirte_entries_pb.push(convert_entry(&mut tags_dict, &mut fields_dict, entry)); } - write_metric_pb.set_metric(metric); - write_metric_pb.set_tag_names(tags_dict.convert_ordered().into()); - write_metric_pb.set_field_names(fields_dict.convert_ordered().into()); - write_metric_pb.set_entries(wirte_entries_pb.into()); - - write_metric_pb + WriteMetricPb { + metric, + tag_names: tags_dict.convert_ordered(), + field_names: fields_dict.convert_ordered(), + entries: wirte_entries_pb, + } } fn convert_entry( @@ -253,11 +251,10 @@ fn convert_entry( fields_dict: &mut NameDict, entry: WriteEntry, ) -> WriteEntryPb { - let mut entry_pb = WriteEntryPb::default(); - entry_pb.set_tags(convert_tags(tags_dict, entry.series.tags).into()); - entry_pb.set_field_groups(convert_ts_fields(fields_dict, entry.ts_fields).into()); - - entry_pb + WriteEntryPb { + tags: convert_tags(tags_dict, entry.series.tags), + field_groups: convert_ts_fields(fields_dict, entry.ts_fields), + } } fn convert_tags(tags_dict: &mut NameDict, tags: BTreeMap) -> Vec { @@ -267,9 +264,10 @@ fn convert_tags(tags_dict: &mut NameDict, tags: BTreeMap) -> Vec< let mut tag_pbs = Vec::with_capacity(tags.len()); for (name, val) in tags { - let mut tag_pb = TagPb::default(); - tag_pb.set_name_index(tags_dict.insert(name)); - tag_pb.set_value(val.into()); + let tag_pb = TagPb { + name_index: tags_dict.insert(name), + value: Some(val.into()), + }; tag_pbs.push(tag_pb); } @@ -287,17 +285,18 @@ fn convert_ts_fields( let mut field_group_pbs = Vec::with_capacity(ts_fields.len()); for (ts, fields) in ts_fields { // ts + fields will be converted to field group in pb - let mut field_group_pb = FieldGroupPb::default(); - field_group_pb.set_timestamp(ts); - let mut field_pbs = Vec::with_capacity(fields.len()); for (name, val) in fields { - let mut field_pb = Field::default(); - field_pb.set_name_index(fields_dict.insert(name)); - field_pb.set_value(val.into()); + let field_pb = Field { + name_index: fields_dict.insert(name), + value: Some(val.into()), + }; field_pbs.push(field_pb); } - field_group_pb.set_fields(field_pbs.into()); + let field_group_pb = FieldGroupPb { + timestamp: ts, + fields: field_pbs, + }; // collect field group field_group_pbs.push(field_group_pb); @@ -503,9 +502,9 @@ mod test { let tag_names = tags_dict.convert_ordered(); for tag_pb in tags_pb { - let name_idx = tag_pb.get_name_index() as usize; + let name_idx = tag_pb.name_index as usize; let value_in_map: ValuePb = test_tags.get(&tag_names[name_idx]).unwrap().clone().into(); - assert_eq!(value_in_map, *tag_pb.get_value()); + assert_eq!(value_in_map, tag_pb.value.unwrap()); } } @@ -557,12 +556,12 @@ mod test { let field_names = fields_dict.convert_ordered(); for f_group in field_groups_pb { - let fields_map = test_ts_fields.get(&f_group.get_timestamp()).unwrap(); + let fields_map = test_ts_fields.get(&f_group.timestamp).unwrap(); for field_pb in f_group.fields { - let key_in_map = field_names[field_pb.get_name_index() as usize].as_str(); + let key_in_map = field_names[field_pb.name_index as usize].as_str(); let val_in_map: ValuePb = fields_map.get(key_in_map).unwrap().clone().into(); - assert_eq!(val_in_map, *field_pb.get_value()); + assert_eq!(val_in_map, field_pb.value.unwrap()); } } } diff --git a/src/options.rs b/src/options.rs index 9533684..adc07f6 100644 --- a/src/options.rs +++ b/src/options.rs @@ -10,8 +10,12 @@ pub struct RpcConfig { pub max_send_msg_len: i32, /// -1 means unlimited pub max_recv_msg_len: i32, - pub keepalive_time: Duration, + // an interval for htt2 ping frames + pub keepalive_interval: Duration, + // timeout for http2 ping frame acknowledement pub keepalive_timeout: Duration, + // enables http2_keep_alive or not + pub keep_alive_while_idle: bool, } impl Default for RpcConfig { @@ -22,9 +26,13 @@ impl Default for RpcConfig { max_send_msg_len: 20 * (1 << 20), // 1GB max_recv_msg_len: 1 << 30, - // 1day - keepalive_time: Duration::from_secs(3600 * 30), + // Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive + keepalive_interval: Duration::from_secs(60 * 10), + // A timeout for receiving an acknowledgement of the keep-alive ping + // If the ping is not acknowledged within the timeout, the connection will be closed keepalive_timeout: Duration::from_secs(3), + // default keep http2 connections alive while idle + keep_alive_while_idle: true, } } } diff --git a/src/router.rs b/src/router.rs index e9a370b..81082ed 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use ceresdbproto::storage::RouteRequest; @@ -29,13 +29,13 @@ pub trait Router: Send + Sync { /// /// [`route`]: RouterImpl::route /// [`evict`]: RouterImpl::evict -pub struct RouterImpl { +pub struct RouterImpl { cache: DashMap, - rpc_client: R, + rpc_client: Arc, } -impl RouterImpl { - pub fn new(rpc_client: R) -> Self { +impl RouterImpl { + pub fn new(rpc_client: Arc) -> Self { Self { cache: DashMap::new(), rpc_client, @@ -44,7 +44,7 @@ impl RouterImpl { } #[async_trait] -impl Router for RouterImpl { +impl Router for RouterImpl { async fn route(&self, metrics: &[String], ctx: &RpcContext) -> Result>> { let mut target_endpoints = vec![None; metrics.len()]; @@ -68,8 +68,8 @@ impl Router for RouterImpl { // Get endpoints of misses from remote. let mut req = RouteRequest::default(); let miss_metrics = misses.iter().map(|(m, _)| m.clone()).collect(); - req.set_metrics(miss_metrics); - let resp = self.rpc_client.route(ctx, &req).await?; + req.metrics = miss_metrics; + let resp = self.rpc_client.route(ctx, req).await?; // Fill miss endpoint and update cache. for route in resp.routes { @@ -135,7 +135,7 @@ mod test { // route --> change route_table --> route again. let ctx = RpcContext::new("test".to_string(), "".to_string()); let metrics = vec![metric1.clone(), metric2.clone()]; - let route_client = RouterImpl::new(mock_rpc_client); + let route_client = RouterImpl::new(Arc::new(mock_rpc_client)); let route_res1 = route_client.route(&metrics, &ctx).await.unwrap(); assert_eq!(&endpoint1, route_res1.get(0).unwrap().as_ref().unwrap()); assert_eq!(&endpoint2, route_res1.get(1).unwrap().as_ref().unwrap()); diff --git a/src/rpc_client/mock_rpc_client.rs b/src/rpc_client/mock_rpc_client.rs index 98d1c0d..cd7222e 100644 --- a/src/rpc_client/mock_rpc_client.rs +++ b/src/rpc_client/mock_rpc_client.rs @@ -23,15 +23,15 @@ pub struct MockRpcClient { #[async_trait] impl RpcClient for MockRpcClient { - async fn query(&self, _ctx: &RpcContext, _req: &QueryRequestPb) -> Result { + async fn query(&self, _ctx: &RpcContext, _req: QueryRequestPb) -> Result { todo!() } - async fn write(&self, _ctx: &RpcContext, _req: &WriteRequestPb) -> Result { + async fn write(&self, _ctx: &RpcContext, _req: WriteRequestPb) -> Result { todo!() } - async fn route(&self, _ctx: &RpcContext, req: &RouteRequestPb) -> Result { + async fn route(&self, _ctx: &RpcContext, req: RouteRequestPb) -> Result { let route_tables = self.route_table.clone(); let routes: Vec<_> = req .metrics @@ -39,16 +39,19 @@ impl RpcClient for MockRpcClient { .map(|m| { let endpoint = route_tables.get(m.as_str()).unwrap().value().clone(); let mut route_pb = RoutePb::default(); - let mut endpoint_pb = EndpointPb::default(); - endpoint_pb.set_ip(endpoint.ip); - endpoint_pb.set_port(endpoint.port); - route_pb.set_metric(m.clone()); - route_pb.set_endpoint(endpoint_pb); + let endpoint_pb = EndpointPb { + ip: endpoint.ip, + port: endpoint.port, + }; + route_pb.metric = m.clone(); + route_pb.endpoint = Some(endpoint_pb); route_pb }) .collect(); - let mut route_resp = RouteResponsePb::default(); - route_resp.set_routes(routes.into()); + let route_resp = RouteResponsePb { + header: None, + routes, + }; Ok(route_resp) } } diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index 50fef4d..d943dc8 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -3,6 +3,8 @@ mod mock_rpc_client; mod rpc_client_impl; +use std::sync::Arc; + use async_trait::async_trait; use ceresdbproto::storage::{ QueryRequest as QueryRequestPb, QueryResponse as QueryResponsePb, @@ -10,7 +12,7 @@ use ceresdbproto::storage::{ WriteRequest as WriteRequestPb, WriteResponse as WriteResponsePb, }; pub use mock_rpc_client::MockRpcClient; -pub use rpc_client_impl::{RpcClientImpl, RpcClientImplBuilder}; +pub use rpc_client_impl::RpcClientImplFactory; use crate::errors::Result; @@ -29,7 +31,16 @@ impl RpcContext { #[async_trait] pub trait RpcClient: Send + Sync { - async fn query(&self, ctx: &RpcContext, req: &QueryRequestPb) -> Result; - async fn write(&self, ctx: &RpcContext, req: &WriteRequestPb) -> Result; - async fn route(&self, ctx: &RpcContext, req: &RouteRequestPb) -> Result; + async fn query(&self, ctx: &RpcContext, req: QueryRequestPb) -> Result; + async fn write(&self, ctx: &RpcContext, req: WriteRequestPb) -> Result; + async fn route(&self, ctx: &RpcContext, req: RouteRequestPb) -> Result; +} + +#[async_trait] +pub trait RpcClientFactory: Send + Sync { + /// Build `RpcClient`. + /// + /// It may fail because of invalid endpoint. Any caller calls this method + /// should handle the potencial error. + async fn build(&self, endpoint: String) -> Result>; } diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 9c1b97b..155b9ff 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -1,162 +1,147 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - use std::sync::Arc; use async_trait::async_trait; -use ceresdbproto::{ - storage::{ - QueryRequest as QueryRequestPb, QueryResponse as QueryResponsePb, - RouteRequest as RouteRequestPb, RouteResponse as RouteResponsePb, - WriteRequest as WriteRequestPb, WriteResponse as WriteResponsePb, - }, - storage_grpc::StorageServiceClient, +use ceresdbproto::storage::{ + storage_service_client::StorageServiceClient, QueryRequest as QueryRequestPb, + QueryResponse as QueryResponsePb, RouteRequest as RouteRequestPb, + RouteResponse as RouteResponsePb, WriteRequest as WriteRequestPb, + WriteResponse as WriteResponsePb, +}; +use tonic::{ + metadata::{Ascii, MetadataValue}, + service::Interceptor, + transport::{Channel, Endpoint}, + Request, Status, }; -use grpcio::{CallOption, Channel, ChannelBuilder, EnvBuilder, Environment, MetadataBuilder}; use crate::{ - errors::{Error, Result, ServerError}, + errors::{AuthCode, AuthFailStatus, Error, Result}, options::{RpcConfig, RpcOptions}, - rpc_client::{RpcClient, RpcContext}, - util::is_ok, + rpc_client::{RpcClient, RpcClientFactory, RpcContext}, }; -const RPC_HEADER_TENANT_KEY: &str = "x-ceresdb-access-tenant"; - -/// The implementation for DbClient is based on grpc protocol. -#[derive(Clone)] -pub struct RpcClientImpl { - raw_client: Arc, - rpc_opts: RpcOptions, +struct RpcClientImpl { channel: Channel, } +impl RpcClientImpl { + fn new(channel: Channel) -> Self { + Self { channel } + } +} + #[async_trait] impl RpcClient for RpcClientImpl { - async fn query(&self, ctx: &RpcContext, req: &QueryRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.query_async_opt(req, call_opt)?.await?; - - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - if resp.schema_content.is_empty() { - let mut r = QueryResponsePb::default(); - r.set_affected_rows(resp.affected_rows); - return Ok(r); - } - - Ok(resp) + async fn query(&self, ctx: &RpcContext, req: QueryRequestPb) -> Result { + let interceptor = AuthInterceptor::new(ctx)?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client.query(Request::new(req)).await.map_err(Error::Rpc)?; + Ok(response.into_inner()) } - async fn write(&self, ctx: &RpcContext, req: &WriteRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.write_async_opt(req, call_opt)?.await?; - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - Ok(resp) + async fn write(&self, ctx: &RpcContext, req: WriteRequestPb) -> Result { + let interceptor = AuthInterceptor::new(ctx)?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client.write(Request::new(req)).await.map_err(Error::Rpc)?; + Ok(response.into_inner()) } - async fn route(&self, ctx: &RpcContext, req: &RouteRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.route_async_opt(req, call_opt)?.await?; - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - Ok(resp) + async fn route(&self, ctx: &RpcContext, req: RouteRequestPb) -> Result { + let interceptor = AuthInterceptor::new(ctx)?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client.route(Request::new(req)).await.map_err(Error::Rpc)?; + Ok(response.into_inner()) } } -impl RpcClientImpl { - /// Make the `CallOption` for grpc request. - fn make_call_option(&self, ctx: &RpcContext) -> Result { - let mut builder = MetadataBuilder::with_capacity(1); - builder - .add_str(RPC_HEADER_TENANT_KEY, &ctx.tenant) - .map_err(|e| Error::Client(format!("invalid tenant:{}, err:{}", ctx.tenant, e)))?; - let headers = builder.build(); +const RPC_HEADER_TENANT_KEY: &str = "x-ceresdb-access-tenant"; - Ok(CallOption::default() - .timeout(self.rpc_opts.read_timeout) - .headers(headers)) - } +/// AuthInterceptor is implemented as an interceptor for tonic. +/// Its duty is to check user authentication. +pub struct AuthInterceptor { + tenant: MetadataValue, + _token: MetadataValue, +} - async fn check_connectivity(&self) -> Result<()> { - if !self - .channel - .wait_for_connected(self.rpc_opts.connect_timeout) - .await - { - return Err(Error::Connect( - "Connection broken and try for reconnecting failed".to_string(), - )); - } +impl AuthInterceptor { + fn new(ctx: &RpcContext) -> std::result::Result { + Ok(AuthInterceptor { + tenant: ctx.tenant.parse().map_err(|_e| { + Error::AuthFail(AuthFailStatus { + code: AuthCode::InvalidTenantMeta, + msg: format!( + "invalid tenant: {}, can not be converted to grpc metadata", + ctx.tenant + ), + }) + })?, + _token: ctx.token.parse().map_err(|_e| { + Error::AuthFail(AuthFailStatus { + code: AuthCode::InvalidTokenMeta, + msg: format!( + "invalid token: {}, can not be converted to grpc metadata", + ctx.token + ), + }) + })?, + }) + } +} - Ok(()) +impl Interceptor for AuthInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> std::result::Result, Status> { + request + .metadata_mut() + .insert(RPC_HEADER_TENANT_KEY, self.tenant.clone()); + Ok(request) } } -/// Builder for building an [`Client`]. -#[derive(Clone)] -pub struct RpcClientImplBuilder { +pub struct RpcClientImplFactory { rpc_opts: RpcOptions, grpc_config: RpcConfig, - env: Arc, } -#[allow(clippy::return_self_not_must_use)] -impl RpcClientImplBuilder { +impl RpcClientImplFactory { pub fn new(grpc_config: RpcConfig, rpc_opts: RpcOptions) -> Self { - let env = { - let mut env_builder = EnvBuilder::new(); - if let Some(thread_num) = grpc_config.thread_num { - env_builder = env_builder.cq_count(thread_num); - } - - Arc::new(env_builder.build()) - }; - Self { rpc_opts, grpc_config, - env, } } +} - pub fn build(&self, endpoint: String) -> RpcClientImpl { - let channel = ChannelBuilder::new(self.env.clone()) - .max_send_message_len(self.grpc_config.max_send_msg_len) - .max_receive_message_len(self.grpc_config.max_recv_msg_len) - .keepalive_time(self.grpc_config.keepalive_time) - .keepalive_timeout(self.grpc_config.keepalive_timeout) - .connect(&endpoint); - let channel_clone = channel.clone(); - let raw_client = Arc::new(StorageServiceClient::new(channel)); - RpcClientImpl { - raw_client, - rpc_opts: self.rpc_opts.clone(), - channel: channel_clone, - } +#[async_trait] +impl RpcClientFactory for RpcClientImplFactory { + async fn build(&self, endpoint: String) -> Result> { + let configured_endpoint = + Endpoint::from_shared(endpoint.clone()).map_err(|e| Error::Connect { + addr: endpoint.clone(), + source: Box::new(e), + })?; + let configured_endpoint = match self.grpc_config.keep_alive_while_idle { + true => configured_endpoint + .connect_timeout(self.rpc_opts.connect_timeout) + .keep_alive_timeout(self.grpc_config.keepalive_timeout) + .keep_alive_while_idle(true) + .http2_keep_alive_interval(self.grpc_config.keepalive_interval), + false => configured_endpoint + .connect_timeout(self.rpc_opts.connect_timeout) + .keep_alive_while_idle(false), + }; + let channel = configured_endpoint + .connect() + .await + .map_err(|e| Error::Connect { + addr: endpoint, + source: Box::new(e), + })?; + Ok(Arc::new(RpcClientImpl::new(channel))) } } diff --git a/src/util.rs b/src/util.rs index cd7e59c..28c3f09 100644 --- a/src/util.rs +++ b/src/util.rs @@ -16,6 +16,7 @@ impl StatusCode { } #[inline] +#[allow(dead_code)] pub fn is_ok(code: u32) -> bool { code == StatusCode::Ok.as_u32() }