From e5f990aa2bc6c22521b24b92eb63fe43324bb7bf Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Wed, 9 Nov 2022 23:37:17 +0800 Subject: [PATCH 01/12] refactor: replace the grpcio crate with tonic --- Cargo.lock | 765 +++++++++++++++++++++--------- Cargo.toml | 5 +- src/db_client/builder.rs | 12 +- src/db_client/cluster.rs | 78 +-- src/db_client/direct.rs | 74 +++ src/db_client/mod.rs | 1 + src/db_client/standalone.rs | 65 +-- src/errors.rs | 21 +- src/model/request.rs | 4 +- src/model/value.rs | 30 +- src/model/write/request.rs | 36 +- src/options.rs | 14 +- src/router.rs | 20 +- src/rpc_client/mock_rpc_client.rs | 16 +- src/rpc_client/mod.rs | 17 +- src/rpc_client/rpc_client_impl.rs | 220 ++++----- src/util.rs | 1 + 17 files changed, 885 insertions(+), 494 deletions(-) create mode 100644 src/db_client/direct.rs diff --git a/Cargo.lock b/Cargo.lock index 0753e1d..7e01e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,33 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "anyhow" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.57" @@ -51,7 +78,7 @@ dependencies = [ "lazy_static", "libflate", "num-bigint", - "rand", + "rand 0.7.3", "serde", "serde_json", "strum", @@ -62,6 +89,51 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "axum" +version = "0.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes 1.1.0", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" +dependencies = [ + "async-trait", + "bytes 1.1.0", + "futures-util", + "http", + "http-body", + "mime", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -78,23 +150,10 @@ dependencies = [ ] [[package]] -name = "bindgen" -version = "0.57.0" +name = "base64" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", -] +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "bitflags" @@ -102,15 +161,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "boringssl-src" -version = "0.3.0+688fc5c" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f901accdf830d2ea2f4e27f923a5e1125cd8b1a39ab578b9db1a42d578a6922b" -dependencies = [ - "cmake", -] - [[package]] name = "byteorder" version = "1.4.3" @@ -149,28 +199,18 @@ dependencies = [ "common_types", "dashmap", "futures", - "grpcio", "tokio", + "tonic", ] [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2c10152d021cd5a26b9c870cdede6a0317adca3d#2c10152d021cd5a26b9c870cdede6a0317adca3d" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=29cb0c6fba76401fd9a4ae5b8cacc9002ad78650#29cb0c6fba76401fd9a4ae5b8cacc9002ad78650" dependencies = [ - "futures", - "grpcio", - "protobuf", - "protobuf-builder", -] - -[[package]] -name = "cexpr" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" -dependencies = [ - "nom", + "prost", + "tonic", + "tonic-build", ] [[package]] @@ -192,26 +232,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "clang-sys" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc00842eed744b858222c4c9faf7243aafc6d33f92f96935263ef4d8a41ce21" -dependencies = [ - "glob", - "libc", - "libloading", -] - -[[package]] -name = "cmake" -version = "0.1.48" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" -dependencies = [ - "cc", -] - [[package]] name = "common_types" version = "0.1.0" @@ -247,7 +267,7 @@ dependencies = [ "cfg-if", "hashbrown", "lock_api", - "parking_lot_core 0.9.3", + "parking_lot_core", ] [[package]] @@ -302,6 +322,18 @@ dependencies = [ "instant", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures" version = "0.3.21" @@ -429,26 +461,6 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" -[[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - -[[package]] -name = "grpcio" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d99e00eed7e0a04ee2705112e7cfdbe1a3cc771147f22f016a8cd2d002187b" -dependencies = [ - "futures", - "grpcio-sys", - "libc", - "log", - "parking_lot 0.11.2", - "protobuf", -] - [[package]] name = "grpcio-compiler" version = "0.7.0" @@ -459,19 +471,22 @@ dependencies = [ ] [[package]] -name = "grpcio-sys" -version = "0.9.1+1.38.0" +name = "h2" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9447d1a926beeef466606cc45717f80897998b548e7dc622873d453e1ecb4be4" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" dependencies = [ - "bindgen", - "boringssl-src", - "cc", - "cmake", - "libc", - "libz-sys", - "pkg-config", - "walkdir", + "bytes 1.1.0", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", ] [[package]] @@ -489,6 +504,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -498,6 +519,92 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes 1.1.0", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes 1.1.0", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abfba89e19b959ca163c7752ba59d737c1ceea53a5d31a149c805446fc958064" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + +[[package]] +name = "indexmap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.12" @@ -507,6 +614,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.1" @@ -519,12 +635,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.123" @@ -551,28 +661,6 @@ dependencies = [ "rle-decode-fast", ] -[[package]] -name = "libloading" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd" -dependencies = [ - "cfg-if", - "winapi", -] - -[[package]] -name = "libz-sys" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "lock_api" version = "0.4.7" @@ -592,12 +680,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -620,6 +720,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "murmur3" version = "0.4.1" @@ -629,16 +735,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "nom" -version = "5.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" -dependencies = [ - "memchr", - "version_check", -] - [[package]] name = "num-bigint" version = "0.2.6" @@ -694,17 +790,6 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.5", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -712,21 +797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.3", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -749,10 +820,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" [[package]] -name = "peeking_take_while" -version = "0.1.2" +name = "percent-encoding" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" + +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "pin-project-lite" @@ -766,18 +867,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkg-config" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" - [[package]] name = "ppv-lite86" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.43" @@ -787,6 +892,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +dependencies = [ + "bytes 1.1.0", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +dependencies = [ + "bytes 1.1.0", + "heck 0.4.0", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +dependencies = [ + "bytes 1.1.0", + "prost", +] + [[package]] name = "proto" version = "0.1.0" @@ -868,11 +1026,22 @@ checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ "getrandom 0.1.16", "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.2.2", + "rand_core 0.5.1", "rand_hc", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -880,7 +1049,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.5.1", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -892,13 +1071,22 @@ dependencies = [ "getrandom 0.1.16", ] +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.6", +] + [[package]] name = "rand_hc" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -946,27 +1134,12 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "ryu" version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -1004,12 +1177,6 @@ dependencies = [ "serde", ] -[[package]] -name = "shlex" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1084,7 +1251,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87c85aa3f8ea653bfd3ddf25f7ee357ee4d204731f6aa9ad04002306f6e2774c" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", @@ -1101,6 +1268,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "synstructure" version = "0.12.6" @@ -1169,7 +1342,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1177,6 +1350,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -1188,6 +1371,176 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes 1.1.0", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes 1.1.0", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typed-builder" version = "0.5.1" @@ -1233,12 +1586,6 @@ dependencies = [ "serde", ] -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" @@ -1246,14 +1593,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] -name = "walkdir" -version = "2.3.2" +name = "want" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" dependencies = [ - "same-file", - "winapi", - "winapi-util", + "log", + "try-lock", ] [[package]] @@ -1301,15 +1647,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index e9beaf0..dcdb74b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,14 +6,15 @@ edition = "2021" [dependencies] async-trait = "0.1.57" -grpcio = "0.9.1" avro-rs = "0.13.0" dashmap = "5.3.4" futures = "0.3" +tonic = "0.8.1" +tokio = "1.15" [dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "2c10152d021cd5a26b9c870cdede6a0317adca3d" +rev = "29cb0c6fba76401fd9a4ae5b8cacc9002ad78650" [dependencies.common_types] git = "https://github.com/CeresDB/ceresdb.git" diff --git a/src/db_client/builder.rs b/src/db_client/builder.rs index a9c9c4e..a446de7 100644 --- a/src/db_client/builder.rs +++ b/src/db_client/builder.rs @@ -1,11 +1,10 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::{sync::Arc}; use crate::{ db_client::{cluster::ClusterImpl, standalone::StandaloneImpl, DbClient}, - router::RouterImpl, - rpc_client::RpcClientImplBuilder, + rpc_client::{RpcClientImplFactory}, RpcConfig, RpcOptions, }; @@ -52,16 +51,15 @@ impl Builder { } pub fn build(self) -> Arc { - let rpc_client_builder = RpcClientImplBuilder::new(self.grpc_config, self.rpc_opts); + let rpc_client_factory = Arc::new(RpcClientImplFactory::new(self.grpc_config, self.rpc_opts)); match self.mode { Mode::Standalone => { - Arc::new(StandaloneImpl::new(rpc_client_builder.build(self.endpoint))) + Arc::new(StandaloneImpl::new(rpc_client_factory, self.endpoint)) } Mode::Cluster => { - let router = RouterImpl::new(rpc_client_builder.build(self.endpoint)); - Arc::new(ClusterImpl::new(router, rpc_client_builder)) + Arc::new(ClusterImpl::new(rpc_client_factory,self.endpoint)) } } } diff --git a/src/db_client/cluster.rs b/src/db_client/cluster.rs index 2090ce5..f6ae162 100644 --- a/src/db_client/cluster.rs +++ b/src/db_client/cluster.rs @@ -5,8 +5,9 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use dashmap::DashMap; use futures::future::join_all; +use tokio::sync::OnceCell; -use super::{standalone::StandaloneImpl, DbClient}; +use super::{DbClient, direct::DirectInnerClient}; use crate::{ errors::ClusterWriteError, model::{ @@ -15,29 +16,48 @@ use crate::{ write::{WriteRequest, WriteResponse}, QueryResponse, }, - router::Router, - rpc_client::{RpcClientImpl, RpcClientImplBuilder, RpcContext}, + router::{Router, RouterImpl}, + rpc_client::{RpcClientFactory, RpcContext}, util::should_refresh, Error, Result, }; /// Client for ceresdb of cluster mode. -pub struct ClusterImpl { - router: R, - // Server connection handler pool. - standalone_pool: StandalonePool, +pub struct ClusterImpl { + factory: Arc, + router_endpoint: String, + router: OnceCell>, + standalone_pool: DirectClientPool, +} + +impl ClusterImpl { + pub fn new(factory: Arc, router_endpoint: String) -> Self { + Self { + factory: factory.clone(), + router_endpoint, + router: OnceCell::new(), + standalone_pool: DirectClientPool::new(factory), + } + } + + #[inline] + async fn init_router(&self) -> Result> { + let router_client = self.factory.build(self.router_endpoint.clone()).await?; + Ok(Box::new(RouterImpl::new(router_client))) + } } #[async_trait] -impl DbClient for ClusterImpl { +impl DbClient for ClusterImpl { async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { if req.metrics.is_empty() { return Err(Error::Unknown( "Metrics in query request can't be empty in cluster mode".to_string(), )); } + let router_handle = self.router.get_or_try_init(|| self.init_router()).await?; - let endpoint = match self.router.route(&req.metrics, ctx).await { + let endpoint = match router_handle.route(&req.metrics, ctx).await { Ok(mut eps) => { if let Some(ep) = eps[0].take() { ep @@ -54,8 +74,8 @@ impl DbClient for ClusterImpl { let client = self.standalone_pool.get_or_create(&endpoint).clone(); - client.query_internal(ctx, req.clone()).await.map_err(|e| { - self.router.evict(&req.metrics); + client.query_internal(ctx, req).await.map_err(|e| { + router_handle.evict(&req.metrics); e }) } @@ -63,7 +83,8 @@ impl DbClient for ClusterImpl { async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { // Get metrics' related endpoints(some may not exist). let should_routes: Vec<_> = req.write_entries.iter().map(|(m, _)| m.clone()).collect(); - let endpoints = self.router.route(&should_routes, ctx).await?; + let router_handle = self.router.get_or_try_init(|| self.init_router()).await?; + let endpoints = router_handle.route(&should_routes, ctx).await?; // Partition write entries in request according to related endpoints. let mut no_corresponding_endpoints = Vec::new(); @@ -99,7 +120,7 @@ impl DbClient for ClusterImpl { .collect(); let mut futures = Vec::with_capacity(client_req_paris.len()); for (client, req) in client_req_paris { - futures.push(async move { client.write_internal(ctx, req).await }) + futures.push(async move { client.write_internal(ctx, &req).await }) } // Await rpc results and collect results. @@ -134,7 +155,7 @@ impl DbClient for ClusterImpl { }) .flatten() .collect(); - self.router.evict(&evicts); + router_handle.evict(&evicts); let cluster_error: ClusterWriteError = metrics_result_pairs.into(); if cluster_error.all_ok() { @@ -145,30 +166,20 @@ impl DbClient for ClusterImpl { } } -impl ClusterImpl { - pub fn new(route_client: R, standalone_builder: RpcClientImplBuilder) -> Self { - Self { - router: route_client, - standalone_pool: StandalonePool::new(standalone_builder), - } - } -} - -struct StandalonePool { - pool: DashMap>>, - standalone_builder: RpcClientImplBuilder, +struct DirectClientPool { + pool: DashMap>>, + factory: Arc, } -// TODO better to add gc. -impl StandalonePool { - fn new(standalone_builder: RpcClientImplBuilder) -> Self { +impl DirectClientPool { + fn new(factory: Arc) -> Self { Self { pool: DashMap::new(), - standalone_builder, + factory, } } - fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { + fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { if let Some(c) = self.pool.get(endpoint) { // If exist in cache, return. c.value().clone() @@ -176,8 +187,9 @@ impl StandalonePool { // If not exist, build --> insert --> return. self.pool .entry(endpoint.clone()) - .or_insert(Arc::new(StandaloneImpl::new( - self.standalone_builder.build(endpoint.to_string()), + .or_insert(Arc::new(DirectInnerClient::new( + self.factory.clone(), + endpoint.to_string(), ))) .clone() } diff --git a/src/db_client/direct.rs b/src/db_client/direct.rs new file mode 100644 index 0000000..63cea04 --- /dev/null +++ b/src/db_client/direct.rs @@ -0,0 +1,74 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use tokio::sync::OnceCell; + +use crate::{ + model::{ + convert, + request::QueryRequest, + write::{WriteRequest, WriteResponse}, + QueryResponse, Schema, + }, + rpc_client::{RpcClient, RpcClientFactory, RpcContext}, + Error, Result, +}; + +/// Inner client for both standalone and cluster modes. +/// +/// Now, [`DirectInnerClient`] just wraps [`RpcClient`] simply. +pub(crate) struct DirectInnerClient { + factory: Arc, + endpoint: String, + inner_client: OnceCell>, +} + +impl DirectInnerClient { + pub fn new(factory: Arc, endpoint: String) -> Self { + DirectInnerClient { + factory, + endpoint, + inner_client: OnceCell::new(), + } + } + + #[inline] + async fn init(&self) -> Result> { + self.factory.build(self.endpoint.clone()).await + } + + pub async fn query_internal( + &self, + ctx: &RpcContext, + req: &QueryRequest, + ) -> Result { + let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?; + let result_pb = client_handle.as_ref().query(ctx, req.clone().into()).await; + + result_pb.and_then(|resp_pb| { + if !resp_pb.schema_content.is_empty() { + convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows) + .map_err(Error::Client) + } else { + Ok(QueryResponse { + schema: Schema::default(), + rows: Vec::new(), + affected_rows: resp_pb.affected_rows, + }) + } + }) + } + + pub async fn write_internal( + &self, + ctx: &RpcContext, + req: &WriteRequest, + ) -> Result { + let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?; + client_handle + .write(ctx, req.clone().into()) + .await + .map(|resp_pb| resp_pb.into()) + } +} diff --git a/src/db_client/mod.rs b/src/db_client/mod.rs index 07c041c..fb78fe6 100644 --- a/src/db_client/mod.rs +++ b/src/db_client/mod.rs @@ -3,6 +3,7 @@ mod builder; mod cluster; mod standalone; +mod direct; use async_trait::async_trait; pub use builder::{Builder, Mode}; diff --git a/src/db_client/standalone.rs b/src/db_client/standalone.rs index 3c54d52..0cd881a 100644 --- a/src/db_client/standalone.rs +++ b/src/db_client/standalone.rs @@ -1,70 +1,43 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +use std::sync::Arc; + use async_trait::async_trait; +use super::direct::DirectInnerClient; use crate::{ db_client::DbClient, model::{ - convert, request::QueryRequest, write::{WriteRequest, WriteResponse}, - QueryResponse, Schema, + QueryResponse, }, - rpc_client::{RpcClient, RpcContext}, - Error, Result, + rpc_client::{RpcClientFactory, RpcContext}, + Result, }; /// Client for ceresdb of standalone mode. /// /// Now, [`StandaloneImpl`] just wraps [`RpcClient`] simply. -pub struct StandaloneImpl { - pub rpc_client: R, +pub struct StandaloneImpl { + inner_client: DirectInnerClient, } -#[async_trait] -impl DbClient for StandaloneImpl { - async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { - self.query_internal(ctx, req.clone()).await - } - - async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { - self.write_internal(ctx, req.clone()).await +impl StandaloneImpl { + pub fn new(factory: Arc, endpoint: String) -> Self { + Self { + inner_client: DirectInnerClient::new(factory, endpoint), + } } } -impl StandaloneImpl { - pub fn new(rpc_client: R) -> Self { - Self { rpc_client } - } - - pub async fn query_internal( - &self, - ctx: &RpcContext, - req: QueryRequest, - ) -> Result { - let result_pb = self.rpc_client.query(ctx, &req.into()).await; - result_pb.and_then(|resp_pb| { - if !resp_pb.schema_content.is_empty() { - convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows) - .map_err(Error::Client) - } else { - Ok(QueryResponse { - schema: Schema::default(), - rows: Vec::new(), - affected_rows: resp_pb.affected_rows, - }) - } - }) +#[async_trait] +impl DbClient for StandaloneImpl { + async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result { + self.inner_client.query_internal(ctx, req).await } - pub async fn write_internal( - &self, - ctx: &RpcContext, - req: WriteRequest, - ) -> Result { - self.rpc_client - .write(ctx, &req.into()) - .await - .map(|resp_pb| resp_pb.into()) + async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result { + self.inner_client.write_internal(ctx, req).await } } diff --git a/src/errors.rs b/src/errors.rs index 6af17c6..1d98ebf 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use crate::model::write::WriteResponse; +use crate::{model::write::WriteResponse, RpcContext}; #[derive(Debug)] pub enum Error { @@ -9,15 +9,20 @@ pub enum Error { /// Error from the rpc. /// Note that any error caused by a running server wont be wrapped in the /// grpc errors. - Rpc(grpcio::Error), + Rpc(tonic::Status), /// Error about rpc. /// It will be throw while connection between client and server is broken /// and try for reconnecting is failed(timeout). - Connect(String), + Connect { + addr: String, + source: Box, + }, /// Error from the client and basically the rpc request has not been called /// yet or the rpc request has already been finished successfully. Client(String), - /// + /// Error about rpc contex, invalid format + AuthFailInvalid(RpcContext), + /// ClusterWriteError(ClusterWriteError), /// Error unknown Unknown(String), @@ -67,10 +72,4 @@ pub struct ServerError { pub msg: String, } -pub type Result = std::result::Result; - -impl From for Error { - fn from(grpc_err: grpcio::Error) -> Self { - Error::Rpc(grpc_err) - } -} +pub type Result = std::result::Result; \ No newline at end of file diff --git a/src/model/request.rs b/src/model/request.rs index 9ecafbd..2ba9586 100644 --- a/src/model/request.rs +++ b/src/model/request.rs @@ -13,8 +13,8 @@ pub struct QueryRequest { impl From for QueryRequestPb { fn from(req: QueryRequest) -> Self { let mut pb_req = QueryRequestPb::default(); - pb_req.set_metrics(req.metrics.into()); - pb_req.set_ql(req.ql); + pb_req.metrics = req.metrics.into(); + pb_req.ql = req.ql; pb_req } diff --git a/src/model/value.rs b/src/model/value.rs index 43080ef..ff45aa2 100644 --- a/src/model/value.rs +++ b/src/model/value.rs @@ -2,7 +2,7 @@ //! 'Value' used in local. -use ceresdbproto::storage::Value as ValuePb; +use ceresdbproto::storage::{Value as ValuePb, value}; pub type TimestampMs = i64; @@ -49,20 +49,20 @@ impl From for ValuePb { fn from(val: Value) -> Self { let mut val_pb = ValuePb::default(); match val { - Value::Timestamp(v) => val_pb.set_timestamp_value(v), - Value::Double(v) => val_pb.set_float64_value(v), - Value::Float(v) => val_pb.set_float32_value(v), - Value::Varbinary(v) => val_pb.set_varbinary_value(v), - Value::String(v) => val_pb.set_string_value(v), - Value::UInt64(v) => val_pb.set_uint64_value(v), - Value::UInt32(v) => val_pb.set_uint32_value(v), - Value::UInt16(v) => val_pb.set_uint16_value(v as u32), - Value::UInt8(v) => val_pb.set_uint8_value(v as u32), - Value::Int64(v) => val_pb.set_int64_value(v), - Value::Int32(v) => val_pb.set_int32_value(v), - Value::Int16(v) => val_pb.set_int16_value(v as i32), - Value::Int8(v) => val_pb.set_int8_value(v as i32), - Value::Boolean(v) => val_pb.set_bool_value(v), + Value::Timestamp(v) => val_pb.value = Some(value::Value::TimestampValue(v)), + Value::Double(v) => val_pb.value = Some(value::Value::Float64Value(v)), + Value::Float(v) => val_pb.value = Some(value::Value::Float32Value(v)), + Value::Varbinary(v) => val_pb.value = Some(value::Value::VarbinaryValue(v)), + Value::String(v) => val_pb.value = Some(value::Value::StringValue(v)), + Value::UInt64(v) => val_pb.value = Some(value::Value::Uint64Value(v)), + Value::UInt32(v) => val_pb.value = Some(value::Value::Uint32Value(v)), + Value::UInt16(v) => val_pb.value = Some(value::Value::Uint16Value(v.into())), + Value::UInt8(v) => val_pb.value = Some(value::Value::Uint8Value(v.into())), + Value::Int64(v) => val_pb.value = Some(value::Value::Int64Value(v)), + Value::Int32(v) => val_pb.value = Some(value::Value::Int32Value(v)), + Value::Int16(v) => val_pb.value = Some(value::Value::Int16Value(v.into())), + Value::Int8(v) => val_pb.value = Some(value::Value::Int8Value(v.into())), + Value::Boolean(v) => val_pb.value = Some(value::Value::BoolValue(v)), }; val_pb diff --git a/src/model/write/request.rs b/src/model/write/request.rs index 8ff4a1c..fff02e1 100644 --- a/src/model/write/request.rs +++ b/src/model/write/request.rs @@ -224,7 +224,7 @@ impl From for WriteRequestPb { for (metric, entries) in req.write_entries { write_metrics_pb.push(convert_one_write_metric(metric, entries)); } - req_pb.set_metrics(write_metrics_pb.into()); + req_pb.metrics = write_metrics_pb.into(); req_pb } @@ -240,10 +240,10 @@ fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMe wirte_entries_pb.push(convert_entry(&mut tags_dict, &mut fields_dict, entry)); } - write_metric_pb.set_metric(metric); - write_metric_pb.set_tag_names(tags_dict.convert_ordered().into()); - write_metric_pb.set_field_names(fields_dict.convert_ordered().into()); - write_metric_pb.set_entries(wirte_entries_pb.into()); + write_metric_pb.metric = metric; + write_metric_pb.tag_names = tags_dict.convert_ordered().into(); + write_metric_pb.field_names = fields_dict.convert_ordered().into(); + write_metric_pb.entries = wirte_entries_pb.into(); write_metric_pb } @@ -254,8 +254,8 @@ fn convert_entry( entry: WriteEntry, ) -> WriteEntryPb { let mut entry_pb = WriteEntryPb::default(); - entry_pb.set_tags(convert_tags(tags_dict, entry.series.tags).into()); - entry_pb.set_field_groups(convert_ts_fields(fields_dict, entry.ts_fields).into()); + entry_pb.tags = convert_tags(tags_dict, entry.series.tags).into(); + entry_pb.field_groups = convert_ts_fields(fields_dict, entry.ts_fields).into(); entry_pb } @@ -268,8 +268,8 @@ fn convert_tags(tags_dict: &mut NameDict, tags: BTreeMap) -> Vec< let mut tag_pbs = Vec::with_capacity(tags.len()); for (name, val) in tags { let mut tag_pb = TagPb::default(); - tag_pb.set_name_index(tags_dict.insert(name)); - tag_pb.set_value(val.into()); + tag_pb.name_index = tags_dict.insert(name); + tag_pb.value = Some(val.into()); tag_pbs.push(tag_pb); } @@ -288,16 +288,16 @@ fn convert_ts_fields( for (ts, fields) in ts_fields { // ts + fields will be converted to field group in pb let mut field_group_pb = FieldGroupPb::default(); - field_group_pb.set_timestamp(ts); + field_group_pb.timestamp = ts; let mut field_pbs = Vec::with_capacity(fields.len()); for (name, val) in fields { let mut field_pb = Field::default(); - field_pb.set_name_index(fields_dict.insert(name)); - field_pb.set_value(val.into()); + field_pb.name_index = fields_dict.insert(name); + field_pb.value = Some(val.into()); field_pbs.push(field_pb); } - field_group_pb.set_fields(field_pbs.into()); + field_group_pb.fields = field_pbs.into(); // collect field group field_group_pbs.push(field_group_pb); @@ -503,9 +503,9 @@ mod test { let tag_names = tags_dict.convert_ordered(); for tag_pb in tags_pb { - let name_idx = tag_pb.get_name_index() as usize; + let name_idx = tag_pb.name_index as usize; let value_in_map: ValuePb = test_tags.get(&tag_names[name_idx]).unwrap().clone().into(); - assert_eq!(value_in_map, *tag_pb.get_value()); + assert_eq!(value_in_map, tag_pb.value.unwrap()); } } @@ -557,12 +557,12 @@ mod test { let field_names = fields_dict.convert_ordered(); for f_group in field_groups_pb { - let fields_map = test_ts_fields.get(&f_group.get_timestamp()).unwrap(); + let fields_map = test_ts_fields.get(&f_group.timestamp).unwrap(); for field_pb in f_group.fields { - let key_in_map = field_names[field_pb.get_name_index() as usize].as_str(); + let key_in_map = field_names[field_pb.name_index as usize].as_str(); let val_in_map: ValuePb = fields_map.get(key_in_map).unwrap().clone().into(); - assert_eq!(val_in_map, *field_pb.get_value()); + assert_eq!(val_in_map, field_pb.value.unwrap()); } } } diff --git a/src/options.rs b/src/options.rs index 9533684..adc07f6 100644 --- a/src/options.rs +++ b/src/options.rs @@ -10,8 +10,12 @@ pub struct RpcConfig { pub max_send_msg_len: i32, /// -1 means unlimited pub max_recv_msg_len: i32, - pub keepalive_time: Duration, + // an interval for htt2 ping frames + pub keepalive_interval: Duration, + // timeout for http2 ping frame acknowledement pub keepalive_timeout: Duration, + // enables http2_keep_alive or not + pub keep_alive_while_idle: bool, } impl Default for RpcConfig { @@ -22,9 +26,13 @@ impl Default for RpcConfig { max_send_msg_len: 20 * (1 << 20), // 1GB max_recv_msg_len: 1 << 30, - // 1day - keepalive_time: Duration::from_secs(3600 * 30), + // Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive + keepalive_interval: Duration::from_secs(60 * 10), + // A timeout for receiving an acknowledgement of the keep-alive ping + // If the ping is not acknowledged within the timeout, the connection will be closed keepalive_timeout: Duration::from_secs(3), + // default keep http2 connections alive while idle + keep_alive_while_idle: true, } } } diff --git a/src/router.rs b/src/router.rs index e9a370b..21e19d5 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use ceresdbproto::storage::RouteRequest; @@ -29,13 +29,13 @@ pub trait Router: Send + Sync { /// /// [`route`]: RouterImpl::route /// [`evict`]: RouterImpl::evict -pub struct RouterImpl { +pub struct RouterImpl { cache: DashMap, - rpc_client: R, + rpc_client: Arc, } -impl RouterImpl { - pub fn new(rpc_client: R) -> Self { +impl RouterImpl { + pub fn new(rpc_client: Arc) -> Self { Self { cache: DashMap::new(), rpc_client, @@ -44,7 +44,7 @@ impl RouterImpl { } #[async_trait] -impl Router for RouterImpl { +impl Router for RouterImpl { async fn route(&self, metrics: &[String], ctx: &RpcContext) -> Result>> { let mut target_endpoints = vec![None; metrics.len()]; @@ -68,8 +68,8 @@ impl Router for RouterImpl { // Get endpoints of misses from remote. let mut req = RouteRequest::default(); let miss_metrics = misses.iter().map(|(m, _)| m.clone()).collect(); - req.set_metrics(miss_metrics); - let resp = self.rpc_client.route(ctx, &req).await?; + req.metrics = miss_metrics; + let resp = self.rpc_client.route(ctx, req).await?; // Fill miss endpoint and update cache. for route in resp.routes { @@ -135,7 +135,7 @@ mod test { // route --> change route_table --> route again. let ctx = RpcContext::new("test".to_string(), "".to_string()); let metrics = vec![metric1.clone(), metric2.clone()]; - let route_client = RouterImpl::new(mock_rpc_client); + let route_client = RouterImpl::new(Arc::new(mock_rpc_client)); let route_res1 = route_client.route(&metrics, &ctx).await.unwrap(); assert_eq!(&endpoint1, route_res1.get(0).unwrap().as_ref().unwrap()); assert_eq!(&endpoint2, route_res1.get(1).unwrap().as_ref().unwrap()); @@ -153,4 +153,4 @@ mod test { assert_eq!(&endpoint3, route_res3.get(0).unwrap().as_ref().unwrap()); assert_eq!(&endpoint4, route_res3.get(1).unwrap().as_ref().unwrap()); } -} +} \ No newline at end of file diff --git a/src/rpc_client/mock_rpc_client.rs b/src/rpc_client/mock_rpc_client.rs index 98d1c0d..ebd94e3 100644 --- a/src/rpc_client/mock_rpc_client.rs +++ b/src/rpc_client/mock_rpc_client.rs @@ -23,15 +23,15 @@ pub struct MockRpcClient { #[async_trait] impl RpcClient for MockRpcClient { - async fn query(&self, _ctx: &RpcContext, _req: &QueryRequestPb) -> Result { + async fn query(&self, _ctx: &RpcContext, _req: QueryRequestPb) -> Result { todo!() } - async fn write(&self, _ctx: &RpcContext, _req: &WriteRequestPb) -> Result { + async fn write(&self, _ctx: &RpcContext, _req: WriteRequestPb) -> Result { todo!() } - async fn route(&self, _ctx: &RpcContext, req: &RouteRequestPb) -> Result { + async fn route(&self, _ctx: &RpcContext, req: RouteRequestPb) -> Result { let route_tables = self.route_table.clone(); let routes: Vec<_> = req .metrics @@ -40,15 +40,15 @@ impl RpcClient for MockRpcClient { let endpoint = route_tables.get(m.as_str()).unwrap().value().clone(); let mut route_pb = RoutePb::default(); let mut endpoint_pb = EndpointPb::default(); - endpoint_pb.set_ip(endpoint.ip); - endpoint_pb.set_port(endpoint.port); - route_pb.set_metric(m.clone()); - route_pb.set_endpoint(endpoint_pb); + endpoint_pb.ip = endpoint.ip; + endpoint_pb.port = endpoint.port; + route_pb.metric = m.clone(); + route_pb.endpoint = Some(endpoint_pb); route_pb }) .collect(); let mut route_resp = RouteResponsePb::default(); - route_resp.set_routes(routes.into()); + route_resp.routes = routes.into(); Ok(route_resp) } } diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index 50fef4d..e02634d 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -3,6 +3,8 @@ mod mock_rpc_client; mod rpc_client_impl; +use std::sync::Arc; + use async_trait::async_trait; use ceresdbproto::storage::{ QueryRequest as QueryRequestPb, QueryResponse as QueryResponsePb, @@ -10,7 +12,7 @@ use ceresdbproto::storage::{ WriteRequest as WriteRequestPb, WriteResponse as WriteResponsePb, }; pub use mock_rpc_client::MockRpcClient; -pub use rpc_client_impl::{RpcClientImpl, RpcClientImplBuilder}; +pub use rpc_client_impl::{RpcClientImpl, RpcClientImplFactory}; use crate::errors::Result; @@ -29,7 +31,14 @@ impl RpcContext { #[async_trait] pub trait RpcClient: Send + Sync { - async fn query(&self, ctx: &RpcContext, req: &QueryRequestPb) -> Result; - async fn write(&self, ctx: &RpcContext, req: &WriteRequestPb) -> Result; - async fn route(&self, ctx: &RpcContext, req: &RouteRequestPb) -> Result; + async fn query(&self, ctx: &RpcContext, req: QueryRequestPb) -> Result; + async fn write(&self, ctx: &RpcContext, req: WriteRequestPb) -> Result; + async fn route(&self, ctx: &RpcContext, req: RouteRequestPb) -> Result; } + +#[async_trait] +pub trait RpcClientFactory: Send + Sync{ + // The Build method may fail because of invalid endpoint, so it returns a Result. + // Any caller calls this method should handle the potencial error + async fn build(&self, endpoint: String) -> Result>; +} \ No newline at end of file diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 9c1b97b..aa6c6f4 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -1,162 +1,140 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - use std::sync::Arc; use async_trait::async_trait; -use ceresdbproto::{ - storage::{ - QueryRequest as QueryRequestPb, QueryResponse as QueryResponsePb, - RouteRequest as RouteRequestPb, RouteResponse as RouteResponsePb, - WriteRequest as WriteRequestPb, WriteResponse as WriteResponsePb, - }, - storage_grpc::StorageServiceClient, +use ceresdbproto::storage::{ + storage_service_client::StorageServiceClient, QueryRequest as QueryRequestPb, + QueryResponse as QueryResponsePb, RouteRequest as RouteRequestPb, + RouteResponse as RouteResponsePb, WriteRequest as WriteRequestPb, + WriteResponse as WriteResponsePb, +}; +use tonic::{ + metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue}, + service::Interceptor, + transport::{Channel, Endpoint}, + Request, Status, }; -use grpcio::{CallOption, Channel, ChannelBuilder, EnvBuilder, Environment, MetadataBuilder}; use crate::{ - errors::{Error, Result, ServerError}, + errors::{Error, Result}, options::{RpcConfig, RpcOptions}, - rpc_client::{RpcClient, RpcContext}, - util::is_ok, + rpc_client::{RpcClient, RpcClientFactory, RpcContext}, }; -const RPC_HEADER_TENANT_KEY: &str = "x-ceresdb-access-tenant"; - -/// The implementation for DbClient is based on grpc protocol. -#[derive(Clone)] pub struct RpcClientImpl { - raw_client: Arc, - rpc_opts: RpcOptions, channel: Channel, } +impl RpcClientImpl { + fn new(channel: Channel) -> Self { + Self { channel } + } +} + #[async_trait] impl RpcClient for RpcClientImpl { - async fn query(&self, ctx: &RpcContext, req: &QueryRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.query_async_opt(req, call_opt)?.await?; - - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - if resp.schema_content.is_empty() { - let mut r = QueryResponsePb::default(); - r.set_affected_rows(resp.affected_rows); - return Ok(r); - } - - Ok(resp) + async fn query(&self, ctx: &RpcContext, req: QueryRequestPb) -> Result { + let interceptor = + AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client + .query(Request::new(req)) + .await + .map_err(|status: Status| Error::Rpc(status))?; + Ok(response.into_inner()) } - async fn write(&self, ctx: &RpcContext, req: &WriteRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.write_async_opt(req, call_opt)?.await?; - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - Ok(resp) + async fn write(&self, ctx: &RpcContext, req: WriteRequestPb) -> Result { + let interceptor = + AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client + .write(Request::new(req)) + .await + .map_err(|status: Status| Error::Rpc(status))?; + Ok(response.into_inner()) } - async fn route(&self, ctx: &RpcContext, req: &RouteRequestPb) -> Result { - self.check_connectivity().await?; - - let call_opt = self.make_call_option(ctx)?; - let mut resp = self.raw_client.route_async_opt(req, call_opt)?.await?; - if !is_ok(resp.get_header().code) { - let header = resp.take_header(); - return Err(Error::Server(ServerError { - code: header.code, - msg: header.error, - })); - } - - Ok(resp) + async fn route(&self, ctx: &RpcContext, req: RouteRequestPb) -> Result { + let interceptor = + AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let mut client = + StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); + let response = client + .route(Request::new(req)) + .await + .map_err(|status: Status| Error::Rpc(status))?; + Ok(response.into_inner()) } } -impl RpcClientImpl { - /// Make the `CallOption` for grpc request. - fn make_call_option(&self, ctx: &RpcContext) -> Result { - let mut builder = MetadataBuilder::with_capacity(1); - builder - .add_str(RPC_HEADER_TENANT_KEY, &ctx.tenant) - .map_err(|e| Error::Client(format!("invalid tenant:{}, err:{}", ctx.tenant, e)))?; - let headers = builder.build(); +const RPC_HEADER_TENANT_KEY: &str = "x-ceresdb-access-tenant"; - Ok(CallOption::default() - .timeout(self.rpc_opts.read_timeout) - .headers(headers)) - } +pub struct AuthInterceptor { + tenant: MetadataValue, + _token: MetadataValue, +} - async fn check_connectivity(&self) -> Result<()> { - if !self - .channel - .wait_for_connected(self.rpc_opts.connect_timeout) - .await - { - return Err(Error::Connect( - "Connection broken and try for reconnecting failed".to_string(), - )); - } +impl AuthInterceptor { + fn new(ctx: &RpcContext) -> std::result::Result { + let tenant: MetadataValue<_> = ctx.tenant.parse()?; + let _token: MetadataValue<_> = ctx.token.parse()?; + Ok(AuthInterceptor {tenant, _token }) + } +} - Ok(()) +impl<'a> Interceptor for AuthInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> std::result::Result, Status> { + request + .metadata_mut() + .insert(RPC_HEADER_TENANT_KEY, self.tenant.clone()); + Ok(request) } } -/// Builder for building an [`Client`]. -#[derive(Clone)] -pub struct RpcClientImplBuilder { +pub struct RpcClientImplFactory { rpc_opts: RpcOptions, grpc_config: RpcConfig, - env: Arc, } -#[allow(clippy::return_self_not_must_use)] -impl RpcClientImplBuilder { +impl RpcClientImplFactory { pub fn new(grpc_config: RpcConfig, rpc_opts: RpcOptions) -> Self { - let env = { - let mut env_builder = EnvBuilder::new(); - if let Some(thread_num) = grpc_config.thread_num { - env_builder = env_builder.cq_count(thread_num); - } - - Arc::new(env_builder.build()) - }; - Self { rpc_opts, grpc_config, - env, } } +} - pub fn build(&self, endpoint: String) -> RpcClientImpl { - let channel = ChannelBuilder::new(self.env.clone()) - .max_send_message_len(self.grpc_config.max_send_msg_len) - .max_receive_message_len(self.grpc_config.max_recv_msg_len) - .keepalive_time(self.grpc_config.keepalive_time) - .keepalive_timeout(self.grpc_config.keepalive_timeout) - .connect(&endpoint); - let channel_clone = channel.clone(); - let raw_client = Arc::new(StorageServiceClient::new(channel)); - RpcClientImpl { - raw_client, - rpc_opts: self.rpc_opts.clone(), - channel: channel_clone, - } +#[async_trait] +impl RpcClientFactory for RpcClientImplFactory { + async fn build(&self, endpoint: String) -> Result> { + let configured_endpoint = + Endpoint::from_shared(endpoint.clone()).map_err(|e| Error::Connect { + addr: endpoint.clone(), + source: Box::new(e), + })?; + let configured_endpoint = match self.grpc_config.keep_alive_while_idle { + true => configured_endpoint + .connect_timeout(self.rpc_opts.connect_timeout) + .keep_alive_timeout(self.grpc_config.keepalive_timeout) + .keep_alive_while_idle(true) + .http2_keep_alive_interval(self.grpc_config.keepalive_interval), + false => configured_endpoint + .connect_timeout(self.rpc_opts.connect_timeout) + .keep_alive_while_idle(false), + }; + let chan = configured_endpoint + .connect() + .await + .map_err(|e| Error::Connect { + addr: endpoint, + source: Box::new(e), + })?; + Ok(Arc::new(RpcClientImpl::new(chan))) } } diff --git a/src/util.rs b/src/util.rs index cd7e59c..28c3f09 100644 --- a/src/util.rs +++ b/src/util.rs @@ -16,6 +16,7 @@ impl StatusCode { } #[inline] +#[allow(dead_code)] pub fn is_ok(code: u32) -> bool { code == StatusCode::Ok.as_u32() } From 8e7fef83bb2a02918623279f5501e398c3ebe4a6 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Wed, 9 Nov 2022 23:51:58 +0800 Subject: [PATCH 02/12] refactor: fmt code with --- src/db_client/builder.rs | 15 ++++++--------- src/db_client/cluster.rs | 2 +- src/db_client/mod.rs | 2 +- src/errors.rs | 4 ++-- src/model/value.rs | 2 +- src/router.rs | 2 +- src/rpc_client/mod.rs | 8 ++++---- src/rpc_client/rpc_client_impl.rs | 2 +- 8 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/db_client/builder.rs b/src/db_client/builder.rs index a446de7..b309270 100644 --- a/src/db_client/builder.rs +++ b/src/db_client/builder.rs @@ -1,10 +1,10 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{sync::Arc}; +use std::sync::Arc; use crate::{ db_client::{cluster::ClusterImpl, standalone::StandaloneImpl, DbClient}, - rpc_client::{RpcClientImplFactory}, + rpc_client::RpcClientImplFactory, RpcConfig, RpcOptions, }; @@ -51,16 +51,13 @@ impl Builder { } pub fn build(self) -> Arc { - let rpc_client_factory = Arc::new(RpcClientImplFactory::new(self.grpc_config, self.rpc_opts)); + let rpc_client_factory = + Arc::new(RpcClientImplFactory::new(self.grpc_config, self.rpc_opts)); match self.mode { - Mode::Standalone => { - Arc::new(StandaloneImpl::new(rpc_client_factory, self.endpoint)) - } + Mode::Standalone => Arc::new(StandaloneImpl::new(rpc_client_factory, self.endpoint)), - Mode::Cluster => { - Arc::new(ClusterImpl::new(rpc_client_factory,self.endpoint)) - } + Mode::Cluster => Arc::new(ClusterImpl::new(rpc_client_factory, self.endpoint)), } } } diff --git a/src/db_client/cluster.rs b/src/db_client/cluster.rs index f6ae162..cd141e7 100644 --- a/src/db_client/cluster.rs +++ b/src/db_client/cluster.rs @@ -7,7 +7,7 @@ use dashmap::DashMap; use futures::future::join_all; use tokio::sync::OnceCell; -use super::{DbClient, direct::DirectInnerClient}; +use super::{direct::DirectInnerClient, DbClient}; use crate::{ errors::ClusterWriteError, model::{ diff --git a/src/db_client/mod.rs b/src/db_client/mod.rs index fb78fe6..79cca88 100644 --- a/src/db_client/mod.rs +++ b/src/db_client/mod.rs @@ -2,8 +2,8 @@ mod builder; mod cluster; -mod standalone; mod direct; +mod standalone; use async_trait::async_trait; pub use builder::{Builder, Mode}; diff --git a/src/errors.rs b/src/errors.rs index 1d98ebf..7cfb370 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -22,7 +22,7 @@ pub enum Error { Client(String), /// Error about rpc contex, invalid format AuthFailInvalid(RpcContext), - /// + /// ClusterWriteError(ClusterWriteError), /// Error unknown Unknown(String), @@ -72,4 +72,4 @@ pub struct ServerError { pub msg: String, } -pub type Result = std::result::Result; \ No newline at end of file +pub type Result = std::result::Result; diff --git a/src/model/value.rs b/src/model/value.rs index ff45aa2..bd09bd2 100644 --- a/src/model/value.rs +++ b/src/model/value.rs @@ -2,7 +2,7 @@ //! 'Value' used in local. -use ceresdbproto::storage::{Value as ValuePb, value}; +use ceresdbproto::storage::{value, Value as ValuePb}; pub type TimestampMs = i64; diff --git a/src/router.rs b/src/router.rs index 21e19d5..81082ed 100644 --- a/src/router.rs +++ b/src/router.rs @@ -153,4 +153,4 @@ mod test { assert_eq!(&endpoint3, route_res3.get(0).unwrap().as_ref().unwrap()); assert_eq!(&endpoint4, route_res3.get(1).unwrap().as_ref().unwrap()); } -} \ No newline at end of file +} diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index e02634d..307db55 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -37,8 +37,8 @@ pub trait RpcClient: Send + Sync { } #[async_trait] -pub trait RpcClientFactory: Send + Sync{ - // The Build method may fail because of invalid endpoint, so it returns a Result. - // Any caller calls this method should handle the potencial error +pub trait RpcClientFactory: Send + Sync { + // The Build method may fail because of invalid endpoint, so it returns a + // Result. Any caller calls this method should handle the potencial error async fn build(&self, endpoint: String) -> Result>; -} \ No newline at end of file +} diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index aa6c6f4..5f36274 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -80,7 +80,7 @@ impl AuthInterceptor { fn new(ctx: &RpcContext) -> std::result::Result { let tenant: MetadataValue<_> = ctx.tenant.parse()?; let _token: MetadataValue<_> = ctx.token.parse()?; - Ok(AuthInterceptor {tenant, _token }) + Ok(AuthInterceptor { tenant, _token }) } } From b787c66a958342e7beefa074de56e6ce1e9c2ad7 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 00:04:13 +0800 Subject: [PATCH 03/12] refactor: refact some code according to clippy's advice --- src/rpc_client/rpc_client_impl.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 5f36274..8895dd0 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -40,7 +40,7 @@ impl RpcClient for RpcClientImpl { let response = client .query(Request::new(req)) .await - .map_err(|status: Status| Error::Rpc(status))?; + .map_err(Error::Rpc)?; Ok(response.into_inner()) } @@ -52,7 +52,7 @@ impl RpcClient for RpcClientImpl { let response = client .write(Request::new(req)) .await - .map_err(|status: Status| Error::Rpc(status))?; + .map_err(Error::Rpc)?; Ok(response.into_inner()) } @@ -64,7 +64,7 @@ impl RpcClient for RpcClientImpl { let response = client .route(Request::new(req)) .await - .map_err(|status: Status| Error::Rpc(status))?; + .map_err(Error::Rpc)?; Ok(response.into_inner()) } } @@ -84,7 +84,7 @@ impl AuthInterceptor { } } -impl<'a> Interceptor for AuthInterceptor { +impl Interceptor for AuthInterceptor { fn call( &mut self, mut request: tonic::Request<()>, From 2c64f6e3f0dbc3b8217022d758e9140b6b2d5b9d Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 14:27:02 +0800 Subject: [PATCH 04/12] refactor: add some modifications --- src/db_client/cluster.rs | 3 ++- src/rpc_client/mod.rs | 2 +- src/rpc_client/rpc_client_impl.rs | 19 ++++++------------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/db_client/cluster.rs b/src/db_client/cluster.rs index cd141e7..75163ca 100644 --- a/src/db_client/cluster.rs +++ b/src/db_client/cluster.rs @@ -22,7 +22,7 @@ use crate::{ Error, Result, }; -/// Client for ceresdb of cluster mode. +/// Client implementation for ceresdb while using cluster mode. pub struct ClusterImpl { factory: Arc, router_endpoint: String, @@ -166,6 +166,7 @@ impl DbClient for ClusterImpl { } } +/// DirectClientPool is the pool actually holding connections to data nodes. struct DirectClientPool { pool: DashMap>>, factory: Arc, diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index 307db55..d72bb05 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -12,7 +12,7 @@ use ceresdbproto::storage::{ WriteRequest as WriteRequestPb, WriteResponse as WriteResponsePb, }; pub use mock_rpc_client::MockRpcClient; -pub use rpc_client_impl::{RpcClientImpl, RpcClientImplFactory}; +pub use rpc_client_impl::RpcClientImplFactory; use crate::errors::Result; diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 8895dd0..b5848f1 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -20,7 +20,7 @@ use crate::{ rpc_client::{RpcClient, RpcClientFactory, RpcContext}, }; -pub struct RpcClientImpl { +struct RpcClientImpl { channel: Channel, } @@ -37,10 +37,7 @@ impl RpcClient for RpcClientImpl { AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); - let response = client - .query(Request::new(req)) - .await - .map_err(Error::Rpc)?; + let response = client.query(Request::new(req)).await.map_err(Error::Rpc)?; Ok(response.into_inner()) } @@ -49,10 +46,7 @@ impl RpcClient for RpcClientImpl { AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); - let response = client - .write(Request::new(req)) - .await - .map_err(Error::Rpc)?; + let response = client.write(Request::new(req)).await.map_err(Error::Rpc)?; Ok(response.into_inner()) } @@ -61,16 +55,15 @@ impl RpcClient for RpcClientImpl { AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); - let response = client - .route(Request::new(req)) - .await - .map_err(Error::Rpc)?; + let response = client.route(Request::new(req)).await.map_err(Error::Rpc)?; Ok(response.into_inner()) } } const RPC_HEADER_TENANT_KEY: &str = "x-ceresdb-access-tenant"; +/// AuthInterceptor is implemented as an interceptor for tonic. +/// Its duty is to check user authentication. pub struct AuthInterceptor { tenant: MetadataValue, _token: MetadataValue, From fa92fcd17c927e7bdf31c5290389ad6877774944 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 16:10:59 +0800 Subject: [PATCH 05/12] refactor: modify CI workflow in order to support tonic --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a10723e..2213edc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,7 +79,7 @@ jobs: - name: Setup Build Environment run: | apt update - apt install --yes gcc g++ libssl-dev pkg-config cmake + apt install --yes gcc g++ libssl-dev pkg-config cmake protobuf-compiler rm -rf /var/lib/apt/lists/* - name: Install Clippy run: | @@ -122,7 +122,7 @@ jobs: - name: Setup Build Environment run: | apt update - apt install --yes gcc g++ libssl-dev pkg-config cmake + apt install --yes gcc g++ libssl-dev pkg-config cmake protobuf-compiler rm -rf /var/lib/apt/lists/* - name: Run Test run: | From e402a9769885bdc4281df4345e941a18563d4609 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 17:11:44 +0800 Subject: [PATCH 06/12] fix: fix some issues that commes from clippy and CR --- src/db_client/cluster.rs | 8 +++--- src/db_client/{direct.rs => inner.rs} | 8 +++--- src/db_client/mod.rs | 2 +- src/db_client/standalone.rs | 8 +++--- src/model/request.rs | 9 +++---- src/model/write/request.rs | 39 ++++++++++++++------------- src/rpc_client/mock_rpc_client.rs | 13 +++++---- src/rpc_client/rpc_client_impl.rs | 4 +-- 8 files changed, 47 insertions(+), 44 deletions(-) rename src/db_client/{direct.rs => inner.rs} (90%) diff --git a/src/db_client/cluster.rs b/src/db_client/cluster.rs index 75163ca..4c9f498 100644 --- a/src/db_client/cluster.rs +++ b/src/db_client/cluster.rs @@ -7,7 +7,7 @@ use dashmap::DashMap; use futures::future::join_all; use tokio::sync::OnceCell; -use super::{direct::DirectInnerClient, DbClient}; +use super::{inner::InnerClient, DbClient}; use crate::{ errors::ClusterWriteError, model::{ @@ -168,7 +168,7 @@ impl DbClient for ClusterImpl { /// DirectClientPool is the pool actually holding connections to data nodes. struct DirectClientPool { - pool: DashMap>>, + pool: DashMap>>, factory: Arc, } @@ -180,7 +180,7 @@ impl DirectClientPool { } } - fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { + fn get_or_create(&self, endpoint: &Endpoint) -> Arc> { if let Some(c) = self.pool.get(endpoint) { // If exist in cache, return. c.value().clone() @@ -188,7 +188,7 @@ impl DirectClientPool { // If not exist, build --> insert --> return. self.pool .entry(endpoint.clone()) - .or_insert(Arc::new(DirectInnerClient::new( + .or_insert(Arc::new(InnerClient::new( self.factory.clone(), endpoint.to_string(), ))) diff --git a/src/db_client/direct.rs b/src/db_client/inner.rs similarity index 90% rename from src/db_client/direct.rs rename to src/db_client/inner.rs index 63cea04..0f7bac7 100644 --- a/src/db_client/direct.rs +++ b/src/db_client/inner.rs @@ -17,16 +17,16 @@ use crate::{ /// Inner client for both standalone and cluster modes. /// -/// Now, [`DirectInnerClient`] just wraps [`RpcClient`] simply. -pub(crate) struct DirectInnerClient { +/// Now, [`InnerClient`] just wraps [`RpcClient`] simply. +pub(crate) struct InnerClient { factory: Arc, endpoint: String, inner_client: OnceCell>, } -impl DirectInnerClient { +impl InnerClient { pub fn new(factory: Arc, endpoint: String) -> Self { - DirectInnerClient { + InnerClient { factory, endpoint, inner_client: OnceCell::new(), diff --git a/src/db_client/mod.rs b/src/db_client/mod.rs index 79cca88..a530e5f 100644 --- a/src/db_client/mod.rs +++ b/src/db_client/mod.rs @@ -2,7 +2,7 @@ mod builder; mod cluster; -mod direct; +mod inner; mod standalone; use async_trait::async_trait; diff --git a/src/db_client/standalone.rs b/src/db_client/standalone.rs index 0cd881a..221a056 100644 --- a/src/db_client/standalone.rs +++ b/src/db_client/standalone.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; -use super::direct::DirectInnerClient; +use super::inner::InnerClient; use crate::{ db_client::DbClient, model::{ @@ -18,15 +18,15 @@ use crate::{ /// Client for ceresdb of standalone mode. /// -/// Now, [`StandaloneImpl`] just wraps [`RpcClient`] simply. +/// Now, [`StandaloneImpl`] just wraps [`InnerClient`] simply. pub struct StandaloneImpl { - inner_client: DirectInnerClient, + inner_client: InnerClient, } impl StandaloneImpl { pub fn new(factory: Arc, endpoint: String) -> Self { Self { - inner_client: DirectInnerClient::new(factory, endpoint), + inner_client: InnerClient::new(factory, endpoint), } } } diff --git a/src/model/request.rs b/src/model/request.rs index 2ba9586..7549ad0 100644 --- a/src/model/request.rs +++ b/src/model/request.rs @@ -12,10 +12,9 @@ pub struct QueryRequest { impl From for QueryRequestPb { fn from(req: QueryRequest) -> Self { - let mut pb_req = QueryRequestPb::default(); - pb_req.metrics = req.metrics.into(); - pb_req.ql = req.ql; - - pb_req + QueryRequestPb { + metrics: req.metrics, + ql: req.ql, + } } } diff --git a/src/model/write/request.rs b/src/model/write/request.rs index fff02e1..0a98304 100644 --- a/src/model/write/request.rs +++ b/src/model/write/request.rs @@ -224,7 +224,7 @@ impl From for WriteRequestPb { for (metric, entries) in req.write_entries { write_metrics_pb.push(convert_one_write_metric(metric, entries)); } - req_pb.metrics = write_metrics_pb.into(); + req_pb.metrics = write_metrics_pb; req_pb } @@ -241,9 +241,9 @@ fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMe } write_metric_pb.metric = metric; - write_metric_pb.tag_names = tags_dict.convert_ordered().into(); - write_metric_pb.field_names = fields_dict.convert_ordered().into(); - write_metric_pb.entries = wirte_entries_pb.into(); + write_metric_pb.tag_names = tags_dict.convert_ordered(); + write_metric_pb.field_names = fields_dict.convert_ordered(); + write_metric_pb.entries = wirte_entries_pb; write_metric_pb } @@ -253,11 +253,10 @@ fn convert_entry( fields_dict: &mut NameDict, entry: WriteEntry, ) -> WriteEntryPb { - let mut entry_pb = WriteEntryPb::default(); - entry_pb.tags = convert_tags(tags_dict, entry.series.tags).into(); - entry_pb.field_groups = convert_ts_fields(fields_dict, entry.ts_fields).into(); - - entry_pb + WriteEntryPb { + tags: convert_tags(tags_dict, entry.series.tags), + field_groups: convert_ts_fields(fields_dict, entry.ts_fields), + } } fn convert_tags(tags_dict: &mut NameDict, tags: BTreeMap) -> Vec { @@ -267,9 +266,10 @@ fn convert_tags(tags_dict: &mut NameDict, tags: BTreeMap) -> Vec< let mut tag_pbs = Vec::with_capacity(tags.len()); for (name, val) in tags { - let mut tag_pb = TagPb::default(); - tag_pb.name_index = tags_dict.insert(name); - tag_pb.value = Some(val.into()); + let tag_pb = TagPb { + name_index: tags_dict.insert(name), + value: Some(val.into()), + }; tag_pbs.push(tag_pb); } @@ -287,17 +287,18 @@ fn convert_ts_fields( let mut field_group_pbs = Vec::with_capacity(ts_fields.len()); for (ts, fields) in ts_fields { // ts + fields will be converted to field group in pb - let mut field_group_pb = FieldGroupPb::default(); - field_group_pb.timestamp = ts; - let mut field_pbs = Vec::with_capacity(fields.len()); for (name, val) in fields { - let mut field_pb = Field::default(); - field_pb.name_index = fields_dict.insert(name); - field_pb.value = Some(val.into()); + let field_pb = Field { + name_index: fields_dict.insert(name), + value: Some(val.into()), + }; field_pbs.push(field_pb); } - field_group_pb.fields = field_pbs.into(); + let field_group_pb = FieldGroupPb { + timestamp: ts, + fields: field_pbs, + }; // collect field group field_group_pbs.push(field_group_pb); diff --git a/src/rpc_client/mock_rpc_client.rs b/src/rpc_client/mock_rpc_client.rs index ebd94e3..cd7222e 100644 --- a/src/rpc_client/mock_rpc_client.rs +++ b/src/rpc_client/mock_rpc_client.rs @@ -39,16 +39,19 @@ impl RpcClient for MockRpcClient { .map(|m| { let endpoint = route_tables.get(m.as_str()).unwrap().value().clone(); let mut route_pb = RoutePb::default(); - let mut endpoint_pb = EndpointPb::default(); - endpoint_pb.ip = endpoint.ip; - endpoint_pb.port = endpoint.port; + let endpoint_pb = EndpointPb { + ip: endpoint.ip, + port: endpoint.port, + }; route_pb.metric = m.clone(); route_pb.endpoint = Some(endpoint_pb); route_pb }) .collect(); - let mut route_resp = RouteResponsePb::default(); - route_resp.routes = routes.into(); + let route_resp = RouteResponsePb { + header: None, + routes, + }; Ok(route_resp) } } diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index b5848f1..2d9f643 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -121,13 +121,13 @@ impl RpcClientFactory for RpcClientImplFactory { .connect_timeout(self.rpc_opts.connect_timeout) .keep_alive_while_idle(false), }; - let chan = configured_endpoint + let channel = configured_endpoint .connect() .await .map_err(|e| Error::Connect { addr: endpoint, source: Box::new(e), })?; - Ok(Arc::new(RpcClientImpl::new(chan))) + Ok(Arc::new(RpcClientImpl::new(channel))) } } From ec08ba0f992117f2f36e7a8561de8df58b337474 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 20:14:33 +0800 Subject: [PATCH 07/12] refactor: adopt suggestion for src/model/write/request.rs Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com> --- src/model/write/request.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/model/write/request.rs b/src/model/write/request.rs index 0a98304..32800f4 100644 --- a/src/model/write/request.rs +++ b/src/model/write/request.rs @@ -240,10 +240,12 @@ fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMe wirte_entries_pb.push(convert_entry(&mut tags_dict, &mut fields_dict, entry)); } - write_metric_pb.metric = metric; - write_metric_pb.tag_names = tags_dict.convert_ordered(); - write_metric_pb.field_names = fields_dict.convert_ordered(); - write_metric_pb.entries = wirte_entries_pb; + WriteMetricPb { + metric, + tag_names: tags_dict.convert_ordered(), + field_names: fields_dict.convert_ordered(), + entries: write_entries_pb, + } write_metric_pb } From b16ec573ffc2dcb7ef0d61d8cb432c8db8cbe4b2 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 20:16:00 +0800 Subject: [PATCH 08/12] refactor: adopt suggestion for src/rpc_client/rpc_client_impl.rs Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com> --- src/rpc_client/rpc_client_impl.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 2d9f643..61108f1 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -71,9 +71,10 @@ pub struct AuthInterceptor { impl AuthInterceptor { fn new(ctx: &RpcContext) -> std::result::Result { - let tenant: MetadataValue<_> = ctx.tenant.parse()?; - let _token: MetadataValue<_> = ctx.token.parse()?; - Ok(AuthInterceptor { tenant, _token }) + Ok(AuthInterceptor { + tenant: ctx.tenant.parse()?, + _token: ctx.token.parse()? + }) } } From 9c975e7b4c840e851f9aff6ccdcc2d5d27777670 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 20:38:33 +0800 Subject: [PATCH 09/12] refactor: adopt some CR advice, and make some improvements --- src/model/write/request.rs | 6 +----- src/rpc_client/mod.rs | 6 ++++-- src/rpc_client/rpc_client_impl.rs | 4 ++-- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/model/write/request.rs b/src/model/write/request.rs index 32800f4..7fb2860 100644 --- a/src/model/write/request.rs +++ b/src/model/write/request.rs @@ -231,8 +231,6 @@ impl From for WriteRequestPb { } fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMetricPb { - let mut write_metric_pb = WriteMetricPb::default(); - let mut tags_dict = NameDict::new(); let mut fields_dict = NameDict::new(); let mut wirte_entries_pb = Vec::with_capacity(entries.len()); @@ -244,10 +242,8 @@ fn convert_one_write_metric(metric: String, entries: Vec) -> WriteMe metric, tag_names: tags_dict.convert_ordered(), field_names: fields_dict.convert_ordered(), - entries: write_entries_pb, + entries: wirte_entries_pb, } - - write_metric_pb } fn convert_entry( diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index d72bb05..d943dc8 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -38,7 +38,9 @@ pub trait RpcClient: Send + Sync { #[async_trait] pub trait RpcClientFactory: Send + Sync { - // The Build method may fail because of invalid endpoint, so it returns a - // Result. Any caller calls this method should handle the potencial error + /// Build `RpcClient`. + /// + /// It may fail because of invalid endpoint. Any caller calls this method + /// should handle the potencial error. async fn build(&self, endpoint: String) -> Result>; } diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 61108f1..27c1979 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -71,9 +71,9 @@ pub struct AuthInterceptor { impl AuthInterceptor { fn new(ctx: &RpcContext) -> std::result::Result { - Ok(AuthInterceptor { + Ok(AuthInterceptor { tenant: ctx.tenant.parse()?, - _token: ctx.token.parse()? + _token: ctx.token.parse()?, }) } } From d7eb846e04620534b31031cf12e6138cfea5458b Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 21:00:09 +0800 Subject: [PATCH 10/12] refactor: transmit more detailed auth error --- src/errors.rs | 25 ++++++++++++++++++++-- src/rpc_client/rpc_client_impl.rs | 35 +++++++++++++++++++++---------- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 7cfb370..5408012 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -6,10 +6,12 @@ use crate::{model::write::WriteResponse, RpcContext}; pub enum Error { /// Error from the running server. Server(ServerError), + /// Error from the rpc. /// Note that any error caused by a running server wont be wrapped in the /// grpc errors. Rpc(tonic::Status), + /// Error about rpc. /// It will be throw while connection between client and server is broken /// and try for reconnecting is failed(timeout). @@ -17,13 +19,17 @@ pub enum Error { addr: String, source: Box, }, + /// Error from the client and basically the rpc request has not been called /// yet or the rpc request has already been finished successfully. Client(String), - /// Error about rpc contex, invalid format - AuthFailInvalid(RpcContext), + + /// Error about authentication + AuthFail(AuthFailStatus), + /// ClusterWriteError(ClusterWriteError), + /// Error unknown Unknown(String), } @@ -72,4 +78,19 @@ pub struct ServerError { pub msg: String, } +#[derive(Debug, Clone)] +pub struct AuthFailStatus { + pub code: AuthCode, + pub msg: String, +} + +#[derive(Debug, Clone)] +pub enum AuthCode { + Ok = 0, + + InvalidTenantMeta = 1, + + InvalidTokenMeta = 2, +} + pub type Result = std::result::Result; diff --git a/src/rpc_client/rpc_client_impl.rs b/src/rpc_client/rpc_client_impl.rs index 27c1979..155b9ff 100644 --- a/src/rpc_client/rpc_client_impl.rs +++ b/src/rpc_client/rpc_client_impl.rs @@ -8,14 +8,14 @@ use ceresdbproto::storage::{ WriteResponse as WriteResponsePb, }; use tonic::{ - metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue}, + metadata::{Ascii, MetadataValue}, service::Interceptor, transport::{Channel, Endpoint}, Request, Status, }; use crate::{ - errors::{Error, Result}, + errors::{AuthCode, AuthFailStatus, Error, Result}, options::{RpcConfig, RpcOptions}, rpc_client::{RpcClient, RpcClientFactory, RpcContext}, }; @@ -33,8 +33,7 @@ impl RpcClientImpl { #[async_trait] impl RpcClient for RpcClientImpl { async fn query(&self, ctx: &RpcContext, req: QueryRequestPb) -> Result { - let interceptor = - AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let interceptor = AuthInterceptor::new(ctx)?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); let response = client.query(Request::new(req)).await.map_err(Error::Rpc)?; @@ -42,8 +41,7 @@ impl RpcClient for RpcClientImpl { } async fn write(&self, ctx: &RpcContext, req: WriteRequestPb) -> Result { - let interceptor = - AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let interceptor = AuthInterceptor::new(ctx)?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); let response = client.write(Request::new(req)).await.map_err(Error::Rpc)?; @@ -51,8 +49,7 @@ impl RpcClient for RpcClientImpl { } async fn route(&self, ctx: &RpcContext, req: RouteRequestPb) -> Result { - let interceptor = - AuthInterceptor::new(ctx).map_err(|_e| Error::AuthFailInvalid(ctx.clone()))?; + let interceptor = AuthInterceptor::new(ctx)?; let mut client = StorageServiceClient::::with_interceptor(self.channel.clone(), interceptor); let response = client.route(Request::new(req)).await.map_err(Error::Rpc)?; @@ -70,10 +67,26 @@ pub struct AuthInterceptor { } impl AuthInterceptor { - fn new(ctx: &RpcContext) -> std::result::Result { + fn new(ctx: &RpcContext) -> std::result::Result { Ok(AuthInterceptor { - tenant: ctx.tenant.parse()?, - _token: ctx.token.parse()?, + tenant: ctx.tenant.parse().map_err(|_e| { + Error::AuthFail(AuthFailStatus { + code: AuthCode::InvalidTenantMeta, + msg: format!( + "invalid tenant: {}, can not be converted to grpc metadata", + ctx.tenant + ), + }) + })?, + _token: ctx.token.parse().map_err(|_e| { + Error::AuthFail(AuthFailStatus { + code: AuthCode::InvalidTokenMeta, + msg: format!( + "invalid token: {}, can not be converted to grpc metadata", + ctx.token + ), + }) + })?, }) } } From 9ca236352bd28eb6ebc8f65ffdbc8a4db5c86bb9 Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 21:01:43 +0800 Subject: [PATCH 11/12] refactor: remove unused import --- src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/errors.rs b/src/errors.rs index 5408012..bcc7be9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use crate::{model::write::WriteResponse, RpcContext}; +use crate::{model::write::WriteResponse}; #[derive(Debug)] pub enum Error { From cab2518c99011d309b2c2077b450f3d225f02cae Mon Sep 17 00:00:00 2001 From: Arthur Chern Date: Thu, 10 Nov 2022 21:05:31 +0800 Subject: [PATCH 12/12] refactor: fix code format issue --- src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/errors.rs b/src/errors.rs index bcc7be9..7eb63ca 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use crate::{model::write::WriteResponse}; +use crate::model::write::WriteResponse; #[derive(Debug)] pub enum Error {