From c02d04d0b881896baf6a73625137fc6a02899a6d Mon Sep 17 00:00:00 2001 From: Millione Date: Tue, 22 Nov 2022 10:41:06 +0800 Subject: [PATCH] feat(grpc): multi-service support --- Cargo.lock | 35 ++-- Cargo.toml | 1 + examples/Cargo.toml | 8 + examples/proto/echo.proto | 15 ++ examples/proto/hello.proto | 6 +- examples/src/compression/grpc_client.rs | 10 +- examples/src/compression/grpc_server.rs | 37 ++-- examples/src/hello/grpc_client.rs | 10 +- examples/src/hello/grpc_server.rs | 13 +- examples/src/hello/thrift_client.rs | 4 +- examples/src/multiplex/grpc_client.rs | 41 +++++ examples/src/multiplex/grpc_server.rs | 50 +++++ examples/volo-gen/volo.yml | 16 +- volo-build/src/grpc_backend.rs | 18 +- volo-grpc/Cargo.toml | 4 +- volo-grpc/src/codec/compression.rs | 4 +- volo-grpc/src/codec/decode.rs | 11 +- volo-grpc/src/codec/encode.rs | 2 +- volo-grpc/src/lib.rs | 2 +- volo-grpc/src/metadata/encoding.rs | 6 +- volo-grpc/src/metadata/map.rs | 10 + volo-grpc/src/metadata/value.rs | 4 +- volo-grpc/src/server/meta.rs | 75 +++++--- volo-grpc/src/server/mod.rs | 233 +++++++----------------- volo-grpc/src/server/router.rs | 124 +++++++++++++ volo-grpc/src/server/service.rs | 180 ++++++++++++++++++ volo-grpc/src/status.rs | 4 +- 27 files changed, 656 insertions(+), 267 deletions(-) create mode 100644 examples/proto/echo.proto create mode 100644 examples/src/multiplex/grpc_client.rs create mode 100644 examples/src/multiplex/grpc_server.rs create mode 100644 volo-grpc/src/server/router.rs create mode 100644 volo-grpc/src/server/service.rs diff --git a/Cargo.lock b/Cargo.lock index 744e66ae..31b1bd67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,9 +341,9 @@ dependencies = [ [[package]] name = "faststr" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd7a705f88e4665d7a72a763debf1a263d4143c5cefae8e8efceb98d49afe69" +checksum = "fa82a898ed06b12d83fcf7249465ea22dcde9f3ce08b3a876128c505de4e0bb0" dependencies = [ "bytes", "serde", @@ -737,9 +737,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linkedbytes" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eb697040b79d318dd14b4219e6764c72ac84b5b07836551ce35b6847427db09" +checksum = "4f1ebd8faf875631d19aa28beb26b585bd96b47a763d4cd2557fe835ff64186f" dependencies = [ "bytes", "faststr", @@ -780,6 +780,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.5.0" @@ -833,6 +839,7 @@ dependencies = [ "motore-macros", "pin-project", "tokio", + "tower", ] [[package]] @@ -1050,7 +1057,7 @@ dependencies = [ [[package]] name = "pilota" version = "0.3.0" -source = "git+https://github.com/cloudwego/pilota?branch=main#d2e3759138a10725960ad902bc5de3cc59c3b489" +source = "git+https://github.com/cloudwego/pilota?branch=main#136cfe830399e2e17f2d0a35c31934a3be229210" dependencies = [ "anyhow", "async-recursion", @@ -1067,7 +1074,7 @@ dependencies = [ [[package]] name = "pilota-build" version = "0.3.0" -source = "git+https://github.com/cloudwego/pilota?branch=main#d2e3759138a10725960ad902bc5de3cc59c3b489" +source = "git+https://github.com/cloudwego/pilota?branch=main#136cfe830399e2e17f2d0a35c31934a3be229210" dependencies = [ "faststr", "fxhash", @@ -1093,7 +1100,7 @@ dependencies = [ [[package]] name = "pilota-thrift-parser" version = "0.4.0" -source = "git+https://github.com/cloudwego/pilota?branch=main#d2e3759138a10725960ad902bc5de3cc59c3b489" +source = "git+https://github.com/cloudwego/pilota?branch=main#136cfe830399e2e17f2d0a35c31934a3be229210" dependencies = [ "nom", ] @@ -1479,18 +1486,18 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" [[package]] name = "serde" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055" +checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4" +checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e" dependencies = [ "proc-macro2", "quote", @@ -2048,12 +2055,14 @@ dependencies = [ "flate2", "futures", "futures-util", + "fxhash", "h2", "hex", "http", "http-body", "hyper", "hyper-timeout", + "matchit", "metainfo", "motore", "percent-encoding", @@ -2200,9 +2209,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.22.5" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" dependencies = [ "webpki", ] diff --git a/Cargo.toml b/Cargo.toml index f4e049da..9b8f4035 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ lazy_static = "1" linkedbytes = "0.1" linked-hash-map = "0.5" log = "0.4" +matchit = "0.7" nom = "7" normpath = "0.3" num_enum = "0.5" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b17aac39..ee46ccd0 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -30,6 +30,14 @@ path = "src/compression/grpc_server.rs" name = "compression-grpc-client" path = "src/compression/grpc_client.rs" +# multiplex +[[bin]] +name = "multiplex-grpc-server" +path = "src/multiplex/grpc_server.rs" +[[bin]] +name = "multiplex-grpc-client" +path = "src/multiplex/grpc_client.rs" + [dependencies] anyhow.workspace = true async-trait.workspace = true diff --git a/examples/proto/echo.proto b/examples/proto/echo.proto new file mode 100644 index 00000000..391466ac --- /dev/null +++ b/examples/proto/echo.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package echo; + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; +} + +service Echo { + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} +} \ No newline at end of file diff --git a/examples/proto/hello.proto b/examples/proto/hello.proto index 9bfc76d2..6d60ecba 100644 --- a/examples/proto/hello.proto +++ b/examples/proto/hello.proto @@ -2,14 +2,14 @@ syntax = "proto3"; package hello; -service HelloService { - rpc Hello (HelloRequest) returns (HelloResponse); +service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1; } -message HelloResponse { +message HelloReply { string message = 1; } diff --git a/examples/src/compression/grpc_client.rs b/examples/src/compression/grpc_client.rs index 8114550f..e5331b23 100644 --- a/examples/src/compression/grpc_client.rs +++ b/examples/src/compression/grpc_client.rs @@ -9,9 +9,9 @@ use volo_grpc::codec::compression::{ }; lazy_static! { - static ref CLIENT: volo_gen::proto_gen::hello::HelloServiceClient = { + static ref CLIENT: volo_gen::proto_gen::hello::GreeterClient = { let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - volo_gen::proto_gen::hello::HelloServiceClientBuilder::new("hello") + volo_gen::proto_gen::hello::GreeterClientBuilder::new("hello") .send_compressions(vec![ Gzip(Some(GzipConfig::default())), Zlib(Some(ZlibConfig { @@ -29,10 +29,10 @@ async fn main() { let req = volo_gen::proto_gen::hello::HelloRequest { name: "Volo".to_string(), }; - let resp = CLIENT.clone().hello(req).await; + let resp = CLIENT.clone().say_hello(req).await; match resp { - Ok(info) => println!("{:?}", info), - Err(e) => eprintln!("{:?}", e), + Ok(info) => println!("{info:?}"), + Err(e) => eprintln!("{e:?}"), } } diff --git a/examples/src/compression/grpc_server.rs b/examples/src/compression/grpc_server.rs index 329f983c..bb90517a 100644 --- a/examples/src/compression/grpc_server.rs +++ b/examples/src/compression/grpc_server.rs @@ -2,21 +2,24 @@ use std::net::SocketAddr; -use volo_grpc::codec::compression::{ - CompressionEncoding::{Gzip, Identity, Zlib}, - GzipConfig, Level, ZlibConfig, +use volo_grpc::{ + codec::compression::{ + CompressionEncoding::{Gzip, Identity, Zlib}, + GzipConfig, Level, ZlibConfig, + }, + server::{Server, ServiceBuilder}, }; pub struct S; #[volo::async_trait] -impl volo_gen::proto_gen::hello::HelloService for S { - async fn hello( +impl volo_gen::proto_gen::hello::Greeter for S { + async fn say_hello( &self, req: volo_grpc::Request, - ) -> Result, volo_grpc::Status> + ) -> Result, volo_grpc::Status> { - let resp = volo_gen::proto_gen::hello::HelloResponse { + let resp = volo_gen::proto_gen::hello::HelloReply { message: format!("Hello, {}!", req.get_ref().name), }; Ok(volo_grpc::Response::new(resp)) @@ -28,14 +31,18 @@ async fn main() { let addr: SocketAddr = "[::]:8080".parse().unwrap(); let addr = volo::net::Address::from(addr); - volo_gen::proto_gen::hello::HelloServiceServer::new(S) - .send_compressions(vec![ - Zlib(Some(ZlibConfig { - level: Level::fast(), - })), - Gzip(Some(GzipConfig::default())), - ]) - .accept_compressions(vec![Gzip(None), Zlib(None), Identity]) + Server::new() + .add_service( + ServiceBuilder::new(volo_gen::proto_gen::hello::GreeterServer::new(S)) + .send_compressions(vec![ + Zlib(Some(ZlibConfig { + level: Level::fast(), + })), + Gzip(Some(GzipConfig::default())), + ]) + .accept_compressions(vec![Gzip(None), Zlib(None), Identity]) + .build(), + ) .run(addr) .await .unwrap(); diff --git a/examples/src/hello/grpc_client.rs b/examples/src/hello/grpc_client.rs index f73bb82f..7296d674 100644 --- a/examples/src/hello/grpc_client.rs +++ b/examples/src/hello/grpc_client.rs @@ -5,9 +5,9 @@ use std::net::SocketAddr; use lazy_static::lazy_static; lazy_static! { - static ref CLIENT: volo_gen::proto_gen::hello::HelloServiceClient = { + static ref CLIENT: volo_gen::proto_gen::hello::GreeterClient = { let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - volo_gen::proto_gen::hello::HelloServiceClientBuilder::new("hello") + volo_gen::proto_gen::hello::GreeterClientBuilder::new("hello") .address(addr) .build() }; @@ -18,9 +18,9 @@ async fn main() { let req = volo_gen::proto_gen::hello::HelloRequest { name: "Volo".to_string(), }; - let resp = CLIENT.clone().hello(req).await; + let resp = CLIENT.clone().say_hello(req).await; match resp { - Ok(info) => println!("{:?}", info), - Err(e) => eprintln!("{:?}", e), + Ok(info) => println!("{info:?}"), + Err(e) => eprintln!("{e:?}"), } } diff --git a/examples/src/hello/grpc_server.rs b/examples/src/hello/grpc_server.rs index 8cf32def..42247078 100644 --- a/examples/src/hello/grpc_server.rs +++ b/examples/src/hello/grpc_server.rs @@ -2,16 +2,18 @@ use std::net::SocketAddr; +use volo_grpc::server::{Server, ServiceBuilder}; + pub struct S; #[volo::async_trait] -impl volo_gen::proto_gen::hello::HelloService for S { - async fn hello( +impl volo_gen::proto_gen::hello::Greeter for S { + async fn say_hello( &self, req: volo_grpc::Request, - ) -> Result, volo_grpc::Status> + ) -> Result, volo_grpc::Status> { - let resp = volo_gen::proto_gen::hello::HelloResponse { + let resp = volo_gen::proto_gen::hello::HelloReply { message: format!("Hello, {}!", req.get_ref().name), }; Ok(volo_grpc::Response::new(resp)) @@ -23,7 +25,8 @@ async fn main() { let addr: SocketAddr = "[::]:8080".parse().unwrap(); let addr = volo::net::Address::from(addr); - volo_gen::proto_gen::hello::HelloServiceServer::new(S) + Server::new() + .add_service(ServiceBuilder::new(volo_gen::proto_gen::hello::GreeterServer::new(S)).build()) .run(addr) .await .unwrap(); diff --git a/examples/src/hello/thrift_client.rs b/examples/src/hello/thrift_client.rs index 8519915c..ccec451f 100644 --- a/examples/src/hello/thrift_client.rs +++ b/examples/src/hello/thrift_client.rs @@ -25,7 +25,7 @@ async fn main() { .hello(req) .await; match resp { - Ok(info) => println!("{:?}", info), - Err(e) => eprintln!("{:?}", e), + Ok(info) => println!("{info:?}"), + Err(e) => eprintln!("{e:?}"), } } diff --git a/examples/src/multiplex/grpc_client.rs b/examples/src/multiplex/grpc_client.rs new file mode 100644 index 00000000..eb8d2652 --- /dev/null +++ b/examples/src/multiplex/grpc_client.rs @@ -0,0 +1,41 @@ +#![feature(type_alias_impl_trait)] + +use std::net::SocketAddr; + +use lazy_static::lazy_static; + +lazy_static! { + static ref GREETER_CLIENT: volo_gen::proto_gen::hello::GreeterClient = { + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + volo_gen::proto_gen::hello::GreeterClientBuilder::new("hello") + .address(addr) + .build() + }; + static ref ECHO_CLIENT: volo_gen::proto_gen::echo::EchoClient = { + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + volo_gen::proto_gen::echo::EchoClientBuilder::new("hello") + .address(addr) + .build() + }; +} + +#[volo::main] +async fn main() { + let req = volo_gen::proto_gen::hello::HelloRequest { + name: "Volo".to_string(), + }; + let resp = GREETER_CLIENT.clone().say_hello(req).await; + match resp { + Ok(info) => println!("GREETER: {info:?}"), + Err(e) => eprintln!("GREETER: {e:?}"), + } + + let req = volo_gen::proto_gen::echo::EchoRequest { + message: "Volo".to_string(), + }; + let resp = ECHO_CLIENT.clone().unary_echo(req).await; + match resp { + Ok(info) => println!("ECHO: {info:?}"), + Err(e) => eprintln!("ECHO: {e:?}"), + } +} diff --git a/examples/src/multiplex/grpc_server.rs b/examples/src/multiplex/grpc_server.rs new file mode 100644 index 00000000..62a5544d --- /dev/null +++ b/examples/src/multiplex/grpc_server.rs @@ -0,0 +1,50 @@ +#![feature(type_alias_impl_trait)] + +use std::net::SocketAddr; + +use volo_grpc::server::{Server, ServiceBuilder}; + +pub struct G; + +#[volo::async_trait] +impl volo_gen::proto_gen::hello::Greeter for G { + async fn say_hello( + &self, + req: volo_grpc::Request, + ) -> Result, volo_grpc::Status> + { + let resp = volo_gen::proto_gen::hello::HelloReply { + message: format!("Hello, {}!", req.get_ref().name), + }; + Ok(volo_grpc::Response::new(resp)) + } +} + +pub struct E; + +#[volo::async_trait] +impl volo_gen::proto_gen::echo::Echo for E { + async fn unary_echo( + &self, + req: volo_grpc::Request, + ) -> Result, volo_grpc::Status> + { + let resp = volo_gen::proto_gen::echo::EchoResponse { + message: req.get_ref().message.to_string(), + }; + Ok(volo_grpc::Response::new(resp)) + } +} + +#[volo::main] +async fn main() { + let addr: SocketAddr = "[::]:8080".parse().unwrap(); + let addr = volo::net::Address::from(addr); + + Server::new() + .add_service(ServiceBuilder::new(volo_gen::proto_gen::hello::GreeterServer::new(G)).build()) + .add_service(ServiceBuilder::new(volo_gen::proto_gen::echo::EchoServer::new(E)).build()) + .run(addr) + .await + .unwrap(); +} diff --git a/examples/volo-gen/volo.yml b/examples/volo-gen/volo.yml index dc40e633..0161bb13 100644 --- a/examples/volo-gen/volo.yml +++ b/examples/volo-gen/volo.yml @@ -3,13 +3,17 @@ entries: protocol: protobuf filename: proto_gen.rs idls: - - source: local - path: ../proto/hello.proto - includes: - - ../proto + - source: local + path: ../proto/hello.proto + includes: + - ../proto + - source: local + path: ../proto/echo.proto + includes: + - ../proto thrift: protocol: thrift filename: thrift_gen.rs idls: - - source: local - path: ../thrift_idl/hello.thrift + - source: local + path: ../thrift_idl/hello.thrift diff --git a/volo-build/src/grpc_backend.rs b/volo-build/src/grpc_backend.rs index bb44c740..8483ba86 100644 --- a/volo-build/src/grpc_backend.rs +++ b/volo-build/src/grpc_backend.rs @@ -187,6 +187,7 @@ impl CodegenBackend for VoloGrpcBackend { let file = self.cx.file(file_id).unwrap(); let package = file.package.iter().join("."); + let name = format!("{package}.{}", s.name); let req_enum_name_send = format_ident!("{}RequestSend", service_name); let resp_enum_name_send = format_ident!("{}ResponseSend", service_name); @@ -195,7 +196,7 @@ impl CodegenBackend for VoloGrpcBackend { let paths = s .methods .iter() - .map(|method| format!("/{}.{}/{}", package, s.name, method.name)) + .map(|method| format!("/{package}.{}/{}", s.name, method.name)) .collect::>(); let req_matches = s.methods.iter().map(|method| { @@ -204,7 +205,7 @@ impl CodegenBackend for VoloGrpcBackend { .rust_name(method.def_id) .upper_camel_ident() .as_syn_ident(); - let path = format!("/{}.{}/{}", package, s.name, method.name); + let path = format!("/{package}.{}/{}", s.name, method.name); let client_streaming = self.cx.node_contains_tag::(method.def_id); let input_ty = &method.args[0].ty; @@ -264,7 +265,7 @@ impl CodegenBackend for VoloGrpcBackend { s.methods.iter().for_each(|method| { let method_name = self.cx.rust_name(method.def_id).as_syn_ident(); - let path = format!("/{}.{}/{}", package, s.name, method.name); + let path = format!("/{package}.{}/{}", s.name, method.name); let input_ty = &method.args[0].ty; let client_streaming = self.cx.node_contains_tag::(method.def_id); let req_ty = self.client_input_ty(input_ty.clone(), client_streaming); @@ -434,11 +435,10 @@ impl CodegenBackend for VoloGrpcBackend { } impl #server_name { - pub fn new(inner: S) -> ::volo_grpc::server::Server { - let service = Self { + pub fn new(inner: S) -> Self { + Self { inner: ::std::sync::Arc::new(inner), - }; - ::volo_grpc::server::Server::new(service) + } } } @@ -466,6 +466,10 @@ impl CodegenBackend for VoloGrpcBackend { } } } + + impl ::volo_grpc::server::NamedService for #server_name { + const NAME: &'static str = #name; + } }); } diff --git a/volo-grpc/Cargo.toml b/volo-grpc/Cargo.toml index c4993958..135610bb 100644 --- a/volo-grpc/Cargo.toml +++ b/volo-grpc/Cargo.toml @@ -19,7 +19,7 @@ maintenance = { status = "actively-developed" } [dependencies] volo = { version = "0.2", path = "../volo" } -motore.workspace = true +motore = { workspace = true, features = ["tower"] } metainfo.workspace = true anyhow.workspace = true @@ -28,6 +28,7 @@ async-trait.workspace = true base64.workspace = true bytes.workspace = true faststr.workspace = true +fxhash.workspace = true futures-util.workspace = true futures.workspace = true flate2.workspace = true @@ -37,6 +38,7 @@ http-body.workspace = true http.workspace = true hyper = { workspace = true, features = ["full"] } hyper-timeout.workspace = true +matchit.workspace = true percent-encoding.workspace = true pin-project.workspace = true prost.workspace = true diff --git a/volo-grpc/src/codec/compression.rs b/volo-grpc/src/codec/compression.rs index b86f6505..ba2e706c 100644 --- a/volo-grpc/src/codec/compression.rs +++ b/volo-grpc/src/codec/compression.rs @@ -1,4 +1,5 @@ //! These codes are copied from `tonic/src/codec/compression.rs` and may be modified by us. + use std::io; use bytes::{Buf, BufMut, BytesMut}; @@ -153,8 +154,7 @@ impl CompressionEncoding { "identity" => Ok(None), other => { let status = Status::unimplemented(format!( - "Content is compressed with `{}` which isn't supported", - other + "Content is compressed with `{other}` which isn't supported" )); Err(status) } diff --git a/volo-grpc/src/codec/decode.rs b/volo-grpc/src/codec/decode.rs index db71f418..26baa39d 100644 --- a/volo-grpc/src/codec/decode.rs +++ b/volo-grpc/src/codec/decode.rs @@ -125,9 +125,8 @@ impl RecvStream { } flag => { let message = format!( - "protocol error: received message with invalid compression flag: {} \ - (valid flags are 0 and 1), while sending request", - flag + "protocol error: received message with invalid compression flag: {flag} \ + (valid flags are 0 and 1), while sending request" ); // https://grpc.github.io/grpc/core/md_doc_compression.html return Err(Status::new(Code::Internal, message)); @@ -149,11 +148,11 @@ impl RecvStream { if let Err(err) = decompress(*encoding, &mut self.buf, &mut self.decompress_buf) { let message = if let Kind::Response(status) = self.kind { format!( - "Error decompressing: {}, while receiving response with status: {}", - err, status + "Error decompressing: {err}, while receiving response with status: \ + {status}" ) } else { - format!("Error decompressing: {}, while sending request", err) + format!("Error decompressing: {err}, while sending request") }; return Err(Status::new(Code::Internal, message)); } diff --git a/volo-grpc/src/codec/encode.rs b/volo-grpc/src/codec/encode.rs index 2a8db537..0853c1e7 100644 --- a/volo-grpc/src/codec/encode.rs +++ b/volo-grpc/src/codec/encode.rs @@ -16,7 +16,7 @@ pub fn encode( compression_encoding: Option, ) -> BoxStream<'static, Result> where - S: Stream> + Send + 'static, + S: Stream> + Send + Sync + 'static, T: Message + 'static, { Box::pin(async_stream::stream! { diff --git a/volo-grpc/src/lib.rs b/volo-grpc/src/lib.rs index 93eb0a82..58261e61 100644 --- a/volo-grpc/src/lib.rs +++ b/volo-grpc/src/lib.rs @@ -20,7 +20,7 @@ pub mod status; pub mod transport; pub type BoxError = Box; -pub type BoxStream<'l, T> = std::pin::Pin + Send + 'l>>; +pub type BoxStream<'l, T> = std::pin::Pin + Send + Sync + 'l>>; pub use client::Client; pub use codec::decode::RecvStream; diff --git a/volo-grpc/src/metadata/encoding.rs b/volo-grpc/src/metadata/encoding.rs index 661234bb..4b01cfc4 100644 --- a/volo-grpc/src/metadata/encoding.rs +++ b/volo-grpc/src/metadata/encoding.rs @@ -118,7 +118,7 @@ impl value_encoding::Sealed for Binary { fn from_static(value: &'static str) -> HeaderValue { if base64::decode(value).is_err() { - panic!("Invalid base64 passed to from_static: {}", value); + panic!("Invalid base64 passed to from_static: {value}"); } // SAFETY: we have checked the bytes with base64 unsafe { HeaderValue::from_maybe_shared_unchecked(Bytes::from_static(value.as_ref())) } @@ -148,9 +148,9 @@ impl value_encoding::Sealed for Binary { fn fmt(value: &HeaderValue, f: &mut fmt::Formatter<'_>) -> fmt::Result { if let Ok(decoded) = Self::decode(value.as_bytes()) { - write!(f, "{:?}", decoded) + write!(f, "{decoded:?}") } else { - write!(f, "b[invalid]{:?}", value) + write!(f, "b[invalid]{value:?}") } } } diff --git a/volo-grpc/src/metadata/map.rs b/volo-grpc/src/metadata/map.rs index 6f158160..80c60829 100644 --- a/volo-grpc/src/metadata/map.rs +++ b/volo-grpc/src/metadata/map.rs @@ -237,6 +237,16 @@ impl MetadataMap { self.headers } + /// Get a reference to the underlying HTTP HeaderMap + pub fn headers(&self) -> &http::HeaderMap { + &self.headers + } + + /// Get a mutable reference to the underlying HTTP HeaderMap + pub fn headers_mut(&mut self) -> &mut http::HeaderMap { + &mut self.headers + } + /// Create an empty `MetadataMap` with the specified capacity. /// /// The returned map will allocate internal storage in order to hold about diff --git a/volo-grpc/src/metadata/value.rs b/volo-grpc/src/metadata/value.rs index 138303f5..413cdb29 100644 --- a/volo-grpc/src/metadata/value.rs +++ b/volo-grpc/src/metadata/value.rs @@ -738,13 +738,13 @@ fn test_debug() { for &(value, expected) in cases { let val = AsciiMetadataValue::try_from_bytes(value.as_bytes()).unwrap(); - let actual = format!("{:?}", val); + let actual = format!("{val:?}"); assert_eq!(expected, actual); } let mut sensitive = AsciiMetadataValue::from_static("password"); sensitive.set_sensitive(true); - assert_eq!("Sensitive", format!("{:?}", sensitive)); + assert_eq!("Sensitive", format!("{sensitive:?}")); } #[test] diff --git a/volo-grpc/src/server/meta.rs b/volo-grpc/src/server/meta.rs index 3f54bb61..5c25dfb8 100644 --- a/volo-grpc/src/server/meta.rs +++ b/volo-grpc/src/server/meta.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use std::{cell::RefCell, net::SocketAddr, str::FromStr, sync::Arc}; use futures::Future; use metainfo::{Backward, Forward}; @@ -9,6 +9,7 @@ use volo::{ }; use crate::{ + body::Body, context::ServerContext, metadata::{ KeyAndValueRef, MetadataKey, DESTINATION_SERVICE, HEADER_TRANS_REMOTE_ADDR, SOURCE_SERVICE, @@ -16,7 +17,16 @@ use crate::{ Request, Response, Status, }; -#[derive(Clone)] +macro_rules! status_to_http { + ($result:expr) => { + match $result { + Ok(value) => value, + Err(status) => return Ok(status.to_http()), + } + }; +} + +#[derive(Clone, Debug)] pub struct MetaService { inner: S, peer_addr: Option
, @@ -28,33 +38,40 @@ impl MetaService { } } -impl Service> for MetaService +impl Service> for MetaService where - S: Service, Response = Response, Error = Status> + S: Service, Response = Response> + + Clone + Send - + 'static - + Sync, - T: Send + 'static, + + Sync + + 'static, + S::Error: Into, { - type Response = S::Response; + type Response = hyper::Response; - type Error = S::Error; + type Error = Status; - type Future<'cx> = impl Future> + 'cx; + type Future<'cx> = impl Future> + Send + 'cx; fn call<'cx, 's>( &'s self, - cx: &'cx mut ServerContext, - mut volo_req: Request, + _cx: &'cx mut ServerContext, + req: hyper::Request, ) -> Self::Future<'cx> where 's: 'cx, { let peer_addr = self.peer_addr.clone(); - async move { + metainfo::METAINFO.scope(RefCell::new(metainfo::MetaInfo::default()), async move { + let mut cx = ServerContext::default(); + cx.rpc_info.method = Some(req.uri().path().into()); + + let mut volo_req = Request::from_http(req); + let metadata = volo_req.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { + + status_to_http!(metainfo::METAINFO.with(|metainfo| { let mut metainfo = metainfo.borrow_mut(); // caller @@ -107,12 +124,18 @@ where } Ok::<(), Status>(()) - }); + })); + + let volo_resp = match self.inner.call(&mut cx, volo_req).await { + Ok(resp) => resp, + Err(err) => { + return Ok(err.into().to_http()); + } + }; - let mut volo_resp = self.inner.call(cx, volo_req).await?; + let (mut metadata, extensions, message) = volo_resp.into_parts(); - let metadata = volo_resp.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { + status_to_http!(metainfo::METAINFO.with(|metainfo| { let metainfo = metainfo.borrow_mut(); // backward @@ -124,9 +147,17 @@ where } Ok::<(), Status>(()) - }); - - Ok(volo_resp) - } + })); + + let mut resp = hyper::Response::new(message); + *resp.headers_mut() = metadata.into_headers(); + *resp.extensions_mut() = extensions; + resp.headers_mut().insert( + http::header::CONTENT_TYPE, + http::header::HeaderValue::from_static("application/grpc"), + ); + + Ok(resp) + }) } } diff --git a/volo-grpc/src/server/mod.rs b/volo-grpc/src/server/mod.rs index 38cdd155..ee930038 100644 --- a/volo-grpc/src/server/mod.rs +++ b/volo-grpc/src/server/mod.rs @@ -3,52 +3,60 @@ //! This module contains the low level component to build a gRPC server. mod meta; +mod router; +mod service; -use std::{cell::RefCell, marker::PhantomData, time::Duration}; +use std::{fmt, time::Duration}; -use futures::Future; use hyper::server::conn::Http; use motore::{ - builder::ServiceBuilder, layer::{Identity, Layer, Stack}, - service::Service, - BoxError, ServiceExt, + service::{Service, TowerAdapter}, + BoxError, }; +pub use service::ServiceBuilder; use volo::{net::incoming::Incoming, spawn}; +pub use self::router::Router; use crate::{ - body::Body, - codec::{ - compression::{CompressionEncoding, ENCODING_HEADER}, - decode::Kind, - }, - context::{Config, ServerContext}, - message::{RecvEntryMessage, SendEntryMessage}, - server::meta::MetaService, - Request, Response, Status, + body::Body, context::ServerContext, server::meta::MetaService, Request, Response, Status, }; +/// A trait to provide a static reference to the service's +/// name. This is used for routing service's within the router. +pub trait NamedService { + /// The `Service-Name` as described [here]. + /// + /// [here]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + const NAME: &'static str; +} + /// A server for a gRPC service. -pub struct Server { - service: S, +#[derive(Clone)] +pub struct Server { layer: L, http2_config: Http2Config, - rpc_config: Config, + router: Router, } -impl Server { +impl Default for Server { + fn default() -> Self { + Self::new() + } +} + +impl Server { /// Creates a new [`Server`]. - pub fn new(service: S) -> Self { + pub fn new() -> Self { Self { - service, layer: Identity::new(), http2_config: Http2Config::default(), - rpc_config: Config::default(), + router: Router::new(), } } } -impl Server { +impl Server { /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`] option for HTTP2 /// stream-level flow control. /// @@ -133,24 +141,6 @@ impl Server { self } - /// Sets the send compression encodings for the request, and will self-adaptive with config of - /// the client. - /// - /// Default is disable the send compression. - pub fn send_compressions(mut self, config: Vec) -> Self { - self.rpc_config.send_compressions = Some(config); - self - } - - /// Sets the accept compression encodings for the request, and will self-adaptive with config of - /// the server. - /// - /// Default is disable the accept decompression. - pub fn accept_compressions(mut self, config: Vec) -> Self { - self.rpc_config.accept_compressions = Some(config); - self - } - /// Adds a new inner layer to the server. /// /// The layer's `Service` should be `Send + Clone + 'static`. @@ -162,12 +152,11 @@ impl Server { /// The current order is: foo -> bar (the request will come to foo first, and then bar). /// /// After we call `.layer(baz)`, we will get: foo -> bar -> baz. - pub fn layer(self, layer: O) -> Server> { + pub fn layer(self, layer: O) -> Server> { Server { layer: Stack::new(layer, self.layer), - service: self.service, http2_config: self.http2_config, - rpc_config: self.rpc_config, + router: self.router, } } @@ -182,49 +171,56 @@ impl Server { /// The current order is: foo -> bar (the request will come to foo first, and then bar). /// /// After we call `.layer_front(baz)`, we will get: baz -> foo -> bar. - pub fn layer_front(self, layer: Front) -> Server> { + pub fn layer_front(self, layer: Front) -> Server> { Server { layer: Stack::new(self.layer, layer), - service: self.service, http2_config: self.http2_config, - rpc_config: self.rpc_config, + router: self.router, } } - /// The main entry point for the server. - pub async fn run(self, incoming: A) -> Result<(), BoxError> + /// Adds a new service to the router. + pub fn add_service(self, s: S) -> Self where - L: Layer, - L::Service: Service, Response = Response> + S: Service, Response = Response, Error = Status> + + NamedService + Clone + Send - + 'static - + Sync, - >>::Error: Into + Send, - for<'cx> >>::Future<'cx>: Send, - S: Service, Response = Response, Error = Status> - + Send + + Sync + + 'static, + { + Self { + layer: self.layer, + http2_config: self.http2_config, + router: self.router.add_service(s), + } + } + + /// The main entry point for the server. + pub async fn run(self, incoming: A) -> Result<(), BoxError> + where + L: Layer, + L::Service: Service, Response = Response> + Clone + + Send + + Sync + 'static, - T: Send + 'static + RecvEntryMessage, - U: Send + 'static + SendEntryMessage, + >>::Error: Into + Send, { let mut incoming = incoming.make_incoming().await?; tracing::info!("[VOLO] server start at: {:?}", incoming); - let service = ServiceBuilder::new() + let service = motore::builder::ServiceBuilder::new() .layer(self.layer) - .service(self.service); - let service = service.map_err(|err| err.into()); + .service(self.router); while let Some(conn) = incoming.accept().await? { tracing::trace!("[VOLO] recv a connection from: {:?}", conn.info.peer_addr); let peer_addr = conn.info.peer_addr.clone(); - let service = HyperAdaptorService::new( - MetaService::new(service.clone(), peer_addr), - self.rpc_config.clone(), - ); + let service = MetaService::new(service.clone(), peer_addr) + .tower(|req| (ServerContext::default(), req)); + // init server let server = Self::create_http_server(&self.http2_config); spawn(async move { @@ -252,107 +248,12 @@ impl Server { } } -macro_rules! status_to_http { - ($result:expr) => { - match $result { - Ok(value) => value, - Err(status) => return Ok(status.to_http()), - } - }; -} - -/// A service that implements `tower::Service` for service transition between hyper's -/// `tower::Service` and our's `motore::Service`. For more details, A incoming -/// request will first come to hyper's `tower::Service`, then `HyperAdaptorService`, -/// finally our's `motore::Service`. -#[derive(Clone)] -pub struct HyperAdaptorService { - inner: S, - rpc_config: Config, - _marker: PhantomData<(T, U)>, -} - -impl HyperAdaptorService { - pub fn new(inner: S, rpc_config: Config) -> Self { - Self { - inner, - rpc_config, - _marker: PhantomData, - } - } -} - -impl tower::Service> for HyperAdaptorService -where - S: Service, Response = Response> + Clone + Send + 'static, - S::Error: Into, - T: RecvEntryMessage, - U: SendEntryMessage, -{ - type Response = hyper::Response; - type Error = Status; - type Future = impl Future>; - - fn poll_ready( - &mut self, - _: &mut core::task::Context<'_>, - ) -> core::task::Poll> { - core::task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: hyper::Request) -> Self::Future { - let inner = self.inner.clone(); - let rpc_config = self.rpc_config.clone(); - - metainfo::METAINFO.scope(RefCell::new(metainfo::MetaInfo::default()), async move { - let mut cx = ServerContext::default(); - cx.rpc_info.method = Some(req.uri().path().into()); - let send_compression = CompressionEncoding::from_accept_encoding_header( - req.headers(), - &rpc_config.send_compressions, - ); - - let recv_compression = match CompressionEncoding::from_encoding_header( - req.headers(), - &rpc_config.accept_compressions, - ) { - Ok(encoding) => encoding, - Err(status) => return Ok(status.to_http()), - }; - - let (parts, body) = req.into_parts(); - - let message = status_to_http!(T::from_body( - cx.rpc_info.method.as_deref(), - body, - Kind::Request, - recv_compression - )); - - let volo_req = Request::from_http_parts(parts, message); - let volo_resp = match inner.call(&mut cx, volo_req).await { - Ok(resp) => resp, - Err(err) => { - return Ok(err.into().to_http()); - } - }; - - let (metadata, extensions, message) = volo_resp.into_parts(); - - let mut resp = hyper::Response::new(Body::new(message.into_body(send_compression))); - *resp.headers_mut() = metadata.into_headers(); - *resp.extensions_mut() = extensions; - resp.headers_mut().insert( - http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/grpc"), - ); - - if let Some(encoding) = send_compression { - resp.headers_mut() - .insert(ENCODING_HEADER, encoding.into_header_value()); - }; - Ok(resp) - }) +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Server") + .field("http2_config", &self.http2_config) + .field("router", &self.router) + .finish() } } diff --git a/volo-grpc/src/server/router.rs b/volo-grpc/src/server/router.rs new file mode 100644 index 00000000..fb84041a --- /dev/null +++ b/volo-grpc/src/server/router.rs @@ -0,0 +1,124 @@ +use std::{ + fmt, + sync::atomic::{AtomicU32, Ordering}, +}; + +use futures::Future; +use fxhash::FxHashMap; +use http_body::Body as HttpBody; +use motore::{BoxCloneService, Service}; +use volo::Unwrap; + +use super::NamedService; +use crate::{body::Body, context::ServerContext, Request, Response, Status}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct RouteId(u32); + +impl RouteId { + fn next() -> Self { + // `AtomicU64` isn't supported on all platforms + static ID: AtomicU32 = AtomicU32::new(0); + let id = ID.fetch_add(1, Ordering::Relaxed); + if id == u32::MAX { + panic!("Over `u32::MAX` routes created. If you need this, please file an issue."); + } + Self(id) + } +} + +#[derive(Default)] +pub struct Router { + routes: FxHashMap, Response, Status>>, + node: matchit::Router, +} + +impl Clone for Router { + fn clone(&self) -> Self { + Self { + routes: self.routes.clone(), + node: self.node.clone(), + } + } +} + +impl Router +where + B: HttpBody + 'static, +{ + pub fn new() -> Self { + Self { + routes: Default::default(), + node: Default::default(), + } + } + + pub fn add_service(mut self, service: S) -> Self + where + S: Service, Response = Response, Error = Status> + + NamedService + + Clone + + Send + + Sync + + 'static, + { + let path = format!("/{}/*rest", S::NAME); + + if path.is_empty() { + panic!("[VOLO] Paths must start with a `/`. Use \"/\" for root routes"); + } else if !path.starts_with('/') { + panic!("[VOLO] Paths must start with a `/`"); + } + + let id = RouteId::next(); + + self.set_node(path, id); + + self.routes.insert(id, BoxCloneService::new(service)); + + self + } + + #[track_caller] + fn set_node(&mut self, path: String, id: RouteId) { + if let Err(err) = self.node.insert(path, id) { + panic!("[VOLO] Invalid route: {err}"); + } + } +} + +impl Service> for Router +where + B: HttpBody + Send, +{ + type Response = Response; + type Error = Status; + type Future<'cx> = impl Future> + 'cx + where + Self: 'cx; + + fn call<'cx, 's>(&'s self, cx: &'cx mut ServerContext, req: Request) -> Self::Future<'cx> + where + 's: 'cx, + { + async move { + let path = cx.rpc_info.method.as_ref().unwrap(); + match self.node.at(path) { + Ok(match_) => { + let id = match_.value; + let route = self.routes.get(id).volo_unwrap().clone(); + route.call(cx, req).await + } + Err(err) => Err(Status::unimplemented(err.to_string())), + } + } + } +} + +impl fmt::Debug for Router { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Router") + .field("routes", &self.routes) + .finish() + } +} diff --git a/volo-grpc/src/server/service.rs b/volo-grpc/src/server/service.rs new file mode 100644 index 00000000..8d3dd212 --- /dev/null +++ b/volo-grpc/src/server/service.rs @@ -0,0 +1,180 @@ +use std::marker::PhantomData; + +use futures::Future; +use motore::{ + layer::{Identity, Layer, Stack}, + service::Service, +}; + +use super::NamedService; +use crate::{ + body::Body, + codec::{ + compression::{CompressionEncoding, ENCODING_HEADER}, + decode::Kind, + }, + context::{Config, ServerContext}, + message::{RecvEntryMessage, SendEntryMessage}, + metadata::MetadataValue, + Request, Response, Status, +}; + +#[derive(Clone)] +pub struct ServiceBuilder { + service: S, + layer: L, + rpc_config: Config, +} + +impl ServiceBuilder { + pub fn new(service: S) -> Self { + Self { + service, + layer: Identity::new(), + rpc_config: Config::default(), + } + } +} + +impl ServiceBuilder { + /// Sets the send compression encodings for the request, and will self-adaptive with config of + /// the client. + /// + /// Default is disable the send compression. + pub fn send_compressions(mut self, config: Vec) -> Self { + self.rpc_config.send_compressions = Some(config); + self + } + + /// Sets the accept compression encodings for the request, and will self-adaptive with config of + /// the server. + /// + /// Default is disable the accept decompression. + pub fn accept_compressions(mut self, config: Vec) -> Self { + self.rpc_config.accept_compressions = Some(config); + self + } + + pub fn layer(self, layer: O) -> ServiceBuilder> { + ServiceBuilder { + layer: Stack::new(layer, self.layer), + service: self.service, + rpc_config: self.rpc_config, + } + } + + pub fn layer_front(self, layer: Front) -> ServiceBuilder> { + ServiceBuilder { + layer: Stack::new(self.layer, layer), + service: self.service, + rpc_config: self.rpc_config, + } + } + + pub fn build(self) -> CodecService<>::Service, T, U> + where + L: Layer, + L::Service: Service, Response = Response>, + >>::Error: Into + Send, + S: Service, Response = Response, Error = Status>, + T: RecvEntryMessage, + U: SendEntryMessage, + { + let service = motore::builder::ServiceBuilder::new() + .layer(self.layer) + .service(self.service); + + CodecService::new(service, self.rpc_config) + } +} + +pub struct CodecService { + inner: S, + rpc_config: Config, + _marker: PhantomData<(T, U)>, +} + +impl Clone for CodecService +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + rpc_config: self.rpc_config.clone(), + _marker: PhantomData, + } + } +} + +impl CodecService { + pub fn new(inner: S, rpc_config: Config) -> Self { + Self { + inner, + rpc_config, + _marker: PhantomData, + } + } +} + +impl Service> for CodecService +where + S: Service, Response = Response> + Clone + Send + Sync + 'static, + S::Error: Into, + T: RecvEntryMessage + Send + Sync, + U: SendEntryMessage + Send + Sync, +{ + type Response = Response; + type Error = Status; + type Future<'cx> = impl Future> + Send + 'cx + where + Self: 'cx; + + fn call<'cx, 's>( + &'s self, + cx: &'cx mut ServerContext, + req: Request, + ) -> Self::Future<'cx> + where + 's: 'cx, + { + async move { + let (metadata, extensions, body) = req.into_parts(); + let send_compression = CompressionEncoding::from_accept_encoding_header( + metadata.headers(), + &self.rpc_config.send_compressions, + ); + + let recv_compression = CompressionEncoding::from_encoding_header( + metadata.headers(), + &self.rpc_config.accept_compressions, + )?; + + let message = T::from_body( + cx.rpc_info.method.as_deref(), + body, + Kind::Request, + recv_compression, + )?; + + let volo_req = Request::from_parts(metadata, extensions, message); + + let volo_resp = self.inner.call(cx, volo_req).await.map_err(Into::into)?; + + let mut resp = volo_resp.map(|message| Body::new(message.into_body(send_compression))); + + if let Some(encoding) = send_compression { + resp.metadata_mut().insert( + ENCODING_HEADER, + MetadataValue::unchecked_from_header_value(encoding.into_header_value()), + ); + }; + + Ok(resp) + } + } +} + +impl NamedService for CodecService { + const NAME: &'static str = S::NAME; +} diff --git a/volo-grpc/src/status.rs b/volo-grpc/src/status.rs index 2d7f5383..30796daa 100644 --- a/volo-grpc/src/status.rs +++ b/volo-grpc/src/status.rs @@ -378,7 +378,7 @@ impl Status { _ => Code::Unknown, }; - let mut status = Self::new(code, format!("h2 protocol error: {}", err)); + let mut status = Self::new(code, format!("h2 protocol error: {err}")); status.source = Some(err); status } @@ -468,7 +468,7 @@ impl Status { warn!("[VOLO] Error deserializing status message header: {}", err); Self { code: Code::Unknown, - message: format!("Error deserializing status message header: {}", err), + message: format!("Error deserializing status message header: {err}"), details, metadata: MetadataMap::from_headers(other_headers), source: None,