-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support for adding services dynamically. (#1408)
* Support for adding services dynamically. * Fix typo in examples/Cargo.toml Co-authored-by: Andrew Hickman <andrew.hickman1@sky.com> * PR feedback: add test for RoutesBuilder * Remove let else from the test to fix the failing build. * Fix test after removal of futures-util crate. * run cargo fmt --------- Co-authored-by: tomek.sroka <tomek.sroka@cloudkitchens.com> Co-authored-by: Andrew Hickman <andrew.hickman1@sky.com> Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
- Loading branch information
1 parent
44aa46d
commit 5f5ae24
Showing
6 changed files
with
249 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
use std::env; | ||
use tonic::{transport::server::RoutesBuilder, transport::Server, Request, Response, Status}; | ||
|
||
use hello_world::greeter_server::{Greeter, GreeterServer}; | ||
use hello_world::{HelloReply, HelloRequest}; | ||
|
||
use echo::echo_server::{Echo, EchoServer}; | ||
use echo::{EchoRequest, EchoResponse}; | ||
|
||
pub mod hello_world { | ||
tonic::include_proto!("helloworld"); | ||
} | ||
|
||
pub mod echo { | ||
tonic::include_proto!("grpc.examples.unaryecho"); | ||
} | ||
|
||
type EchoResult<T> = Result<Response<T>, Status>; | ||
|
||
#[derive(Default)] | ||
pub struct MyEcho {} | ||
|
||
#[tonic::async_trait] | ||
impl Echo for MyEcho { | ||
async fn unary_echo(&self, request: Request<EchoRequest>) -> EchoResult<EchoResponse> { | ||
println!("Got an echo request from {:?}", request.remote_addr()); | ||
|
||
let message = format!("you said: {}", request.into_inner().message); | ||
|
||
Ok(Response::new(EchoResponse { message })) | ||
} | ||
} | ||
|
||
fn init_echo(args: &[String], builder: &mut RoutesBuilder) { | ||
let enabled = args | ||
.into_iter() | ||
.find(|arg| arg.as_str() == "echo") | ||
.is_some(); | ||
if enabled { | ||
println!("Adding Echo service..."); | ||
let svc = EchoServer::new(MyEcho::default()); | ||
builder.add_service(svc); | ||
} | ||
} | ||
|
||
#[derive(Default)] | ||
pub struct MyGreeter {} | ||
|
||
#[tonic::async_trait] | ||
impl Greeter for MyGreeter { | ||
async fn say_hello( | ||
&self, | ||
request: Request<HelloRequest>, | ||
) -> Result<Response<HelloReply>, Status> { | ||
println!("Got a greet request from {:?}", request.remote_addr()); | ||
|
||
let reply = hello_world::HelloReply { | ||
message: format!("Hello {}!", request.into_inner().name), | ||
}; | ||
Ok(Response::new(reply)) | ||
} | ||
} | ||
|
||
fn init_greeter(args: &[String], builder: &mut RoutesBuilder) { | ||
let enabled = args | ||
.into_iter() | ||
.find(|arg| arg.as_str() == "greeter") | ||
.is_some(); | ||
|
||
if enabled { | ||
println!("Adding Greeter service..."); | ||
let svc = GreeterServer::new(MyGreeter::default()); | ||
builder.add_service(svc); | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let args: Vec<String> = env::args().collect(); | ||
let mut routes_builder = RoutesBuilder::default(); | ||
init_greeter(&args, &mut routes_builder); | ||
init_echo(&args, &mut routes_builder); | ||
|
||
let addr = "[::1]:50051".parse().unwrap(); | ||
|
||
println!("Grpc server listening on {}", addr); | ||
|
||
Server::builder() | ||
.add_routes(routes_builder.routes()) | ||
.serve(addr) | ||
.await?; | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
use std::time::Duration; | ||
|
||
use tokio::sync::oneshot; | ||
use tokio_stream::StreamExt; | ||
|
||
use integration_tests::pb::{ | ||
test1_client, test1_server, test_client, test_server, Input, Input1, Output, Output1, | ||
}; | ||
use tonic::codegen::BoxStream; | ||
use tonic::transport::server::RoutesBuilder; | ||
use tonic::{ | ||
transport::{Endpoint, Server}, | ||
Request, Response, Status, | ||
}; | ||
|
||
#[tokio::test] | ||
async fn multiple_service_using_routes_builder() { | ||
struct Svc1; | ||
|
||
#[tonic::async_trait] | ||
impl test_server::Test for Svc1 { | ||
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> { | ||
Ok(Response::new(Output {})) | ||
} | ||
} | ||
|
||
struct Svc2; | ||
|
||
#[tonic::async_trait] | ||
impl test1_server::Test1 for Svc2 { | ||
async fn unary_call(&self, request: Request<Input1>) -> Result<Response<Output1>, Status> { | ||
Ok(Response::new(Output1 { | ||
buf: request.into_inner().buf, | ||
})) | ||
} | ||
|
||
type StreamCallStream = BoxStream<Output1>; | ||
|
||
async fn stream_call( | ||
&self, | ||
request: Request<Input1>, | ||
) -> Result<Response<Self::StreamCallStream>, Status> { | ||
let output = Output1 { | ||
buf: request.into_inner().buf, | ||
}; | ||
let stream = tokio_stream::iter(vec![Ok(output)]); | ||
|
||
Ok(Response::new(Box::pin(stream))) | ||
} | ||
} | ||
|
||
let svc1 = test_server::TestServer::new(Svc1); | ||
let svc2 = test1_server::Test1Server::new(Svc2); | ||
|
||
let (tx, rx) = oneshot::channel::<()>(); | ||
let mut routes_builder = RoutesBuilder::default(); | ||
routes_builder.add_service(svc1).add_service(svc2); | ||
|
||
let jh = tokio::spawn(async move { | ||
Server::builder() | ||
.add_routes(routes_builder.routes()) | ||
.serve_with_shutdown("127.0.0.1:1400".parse().unwrap(), async { drop(rx.await) }) | ||
.await | ||
.unwrap(); | ||
}); | ||
|
||
tokio::time::sleep(Duration::from_millis(100)).await; | ||
|
||
let channel = Endpoint::from_static("http://127.0.0.1:1400") | ||
.connect() | ||
.await | ||
.unwrap(); | ||
|
||
let mut client1 = test_client::TestClient::new(channel.clone()); | ||
let mut client2 = test1_client::Test1Client::new(channel); | ||
|
||
client1.unary_call(Input {}).await.unwrap(); | ||
|
||
let resp2 = client2 | ||
.unary_call(Input1 { | ||
buf: b"hello".to_vec(), | ||
}) | ||
.await | ||
.unwrap() | ||
.into_inner(); | ||
assert_eq!(&resp2.buf, b"hello"); | ||
let mut stream_response = client2 | ||
.stream_call(Input1 { | ||
buf: b"world".to_vec(), | ||
}) | ||
.await | ||
.unwrap() | ||
.into_inner(); | ||
let first = match stream_response.next().await { | ||
Some(Ok(first)) => first, | ||
_ => panic!("expected one non-error item in the stream call response"), | ||
}; | ||
|
||
assert_eq!(&first.buf, b"world"); | ||
assert!(stream_response.next().await.is_none()); | ||
|
||
tx.send(()).unwrap(); | ||
|
||
jh.await.unwrap(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters