Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tonic example #72

Merged
merged 5 commits into from
Mar 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
members = [
"tower-http",
"examples/warp-key-value-store",
"examples/tonic-key-value-store",
]
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
This folder contains various examples of how to use tower-http.

- `warp-key-value-store`: A key/value store with an HTTP API built with warp.
- `tonic-key-value-store`: A key/value store with a gRPC API and client built with tonic.
22 changes: 22 additions & 0 deletions examples/tonic-key-value-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "tonic-key-value-store"
version = "0.1.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
edition = "2018"
publish = false
license = "MIT"

[dependencies]
bytes = "1"
hyper = { version = "0.14.4", features = ["full"] }
prost = "0.7"
structopt = "0.3.21"
tokio = { version = "1.2.0", features = ["full"] }
tonic = "0.4"
tower = { version = "0.4.5", features = ["full"] }
tower-http = { path = "../../tower-http", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.2"

[build-dependencies]
tonic-build = "0.4"
23 changes: 23 additions & 0 deletions examples/tonic-key-value-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# tonic-key-value-store

This examples contains a simple key/value store with a gRPC API and client built with tonic.

## Running the example

Running a server:

```
cargo run --bin tonic-key-value-store -- -p 3000 server
```

Setting values:

```
echo "Hello, World" | cargo run --bin tonic-key-value-store -- -p 3000 set -k foo
```

Getting values:

```
cargo run --bin tonic-key-value-store -- -p 3000 get -k foo
```
3 changes: 3 additions & 0 deletions examples/tonic-key-value-store/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
tonic_build::compile_protos("proto/key_value_store.proto").unwrap();
}
23 changes: 23 additions & 0 deletions examples/tonic-key-value-store/proto/key_value_store.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package key_value_store;

service KeyValueStore {
rpc Get (GetRequest) returns (GetReply);
rpc Set (SetRequest) returns (SetReply);
}

message GetRequest {
string key = 1;
}

message GetReply {
bytes value = 1;
}

message SetRequest {
string key = 1;
bytes value = 2;
}

message SetReply {}
282 changes: 282 additions & 0 deletions examples/tonic-key-value-store/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
use bytes::Bytes;
use hyper::body::HttpBody;
use hyper::{
header::{self, HeaderValue},
Server,
};
use proto::{
key_value_store_client::KeyValueStoreClient, key_value_store_server, GetReply, GetRequest,
SetReply, SetRequest,
};
use std::{
collections::HashMap,
net::SocketAddr,
net::TcpListener,
sync::{Arc, RwLock},
time::Duration,
};
use structopt::StructOpt;
use tokio::io::AsyncReadExt;
use tonic::{async_trait, body::BoxBody, transport::Channel, Code, Request, Response, Status};
use tower::{make::Shared, ServiceBuilder};
use tower::{BoxError, Service};
use tower_http::{
compression::CompressionLayer, decompression::DecompressionLayer,
sensitive_header::SetSensitiveHeaderLayer, set_header::SetRequestHeaderLayer,
};

mod proto {
tonic::include_proto!("key_value_store");
}

/// Simple key/value store with an HTTP API
#[derive(Debug, StructOpt)]
struct Config {
/// The port to listen on
#[structopt(long, short = "p", default_value = "3000")]
port: u16,

#[structopt(subcommand)]
command: Command,
}

#[derive(Debug, StructOpt)]
enum Command {
/// Run the gRPC server
Server,
/// Get the value at some key
Get {
#[structopt(long, short = "k")]
key: String,
},
/// Set a value at some key.
///
/// The value will be read from stdin.
Set {
#[structopt(long, short = "k")]
key: String,
},
}

#[tokio::main]
async fn main() {
// Setup tracing
tracing_subscriber::fmt::init();

// Parse command line arguments
let config = Config::from_args();

// The server address
let addr = SocketAddr::from(([0, 0, 0, 0], config.port));

match config.command {
Command::Server => {
// Create a `TcpListener`
let listener = TcpListener::bind(addr).unwrap();

// Run our service
serve_forever(listener).await.expect("server error");
}
Command::Get { key } => {
// Create a client for our server
let mut client = make_client(addr).await.unwrap();

// Issue a `GetRequest`
let result = client.get(GetRequest { key }).await;

match result {
// If it succeeds print the value
Ok(response) => {
let value_bytes = response.into_inner().value;
let value = String::from_utf8_lossy(&value_bytes[..]);
print!("{}", value);
}
// If not found we shouldn't panic
Err(status) if status.code() == Code::NotFound => {
eprintln!("not found");
std::process::exit(1);
}
// Panic on other errors
Err(status) => {
panic!("{:?}", status);
}
}
}
Command::Set { key } => {
// Create a client for our server
let mut client = make_client(addr).await.unwrap();

// Read the value from stdin
let mut stdin = tokio::io::stdin();
let mut value = Vec::new();
stdin.read_to_end(&mut value).await.unwrap();

// Issue a `SetRequest`
client.set(SetRequest { key, value }).await.unwrap();

// All good :+1:
println!("OK");
}
}
}

// We make this a separate function so we're able to call it from tests.
async fn serve_forever(listener: TcpListener) -> Result<(), Box<dyn std::error::Error>> {
// Build our database for holding the key/value pairs
let db = Arc::new(RwLock::new(HashMap::new()));

// Build our tonic `Service`
let service = key_value_store_server::KeyValueStoreServer::new(ServerImpl { db });

// Apply middlewares to our service
let service = ServiceBuilder::new()
// Set a timeout
.timeout(Duration::from_secs(10))
// Compress responses
.layer(CompressionLayer::new())
// Mark the `Authorization` header as sensitive so it doesn't show in logs
.layer(SetSensitiveHeaderLayer::new(header::AUTHORIZATION))
// Build our final `Service`
.service(service);

// Run the service using hyper
let addr = listener.local_addr()?;

tracing::info!("Listening on {}", addr);

// We cannot use `tonic::transport::Server` directly as it requires services to implement
// `tonic::transport::NamedService` which tower-http middlewares don't
Server::from_tcp(listener)?
// Required for gRPC
.http2_only(true)
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
.serve(Shared::new(service))
.await?;

Ok(())
}

// Implementation of the server trait generated by tonic
#[derive(Debug, Clone)]
struct ServerImpl {
db: Arc<RwLock<HashMap<String, Bytes>>>,
}

#[async_trait]
impl key_value_store_server::KeyValueStore for ServerImpl {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetReply>, Status> {
let key = request.into_inner().key;

if let Some(value) = self.db.read().unwrap().get(&key).cloned() {
let reply = GetReply {
value: value.to_vec(),
};

Ok(Response::new(reply))
} else {
Err(Status::not_found("key not found"))
}
}

async fn set(&self, request: Request<SetRequest>) -> Result<Response<SetReply>, Status> {
let SetRequest { key, value } = request.into_inner();
let value = Bytes::from(value);

self.db.write().unwrap().insert(key, value);

Ok(Response::new(SetReply {}))
}
}

// Build a client with a few middlewares applied and connect to the server
async fn make_client(
addr: SocketAddr,
) -> Result<
KeyValueStoreClient<
impl Service<
hyper::Request<BoxBody>,
Response = hyper::Response<
impl HttpBody<Data = Bytes, Error = impl Into<BoxError>>,
>,
Error = impl Into<BoxError>,
> + Clone
+ Send
+ Sync
+ 'static,
>,
tonic::transport::Error,
> {
let uri = format!("http://{}", addr)
.parse::<tonic::transport::Uri>()
.unwrap();

// We have to use a `tonic::transport::Channel` as it implementes `Service` so we can apply
// middlewares to it
let channel = Channel::builder(uri).connect().await?;

// Apply middlewares to our client
let channel = ServiceBuilder::new()
// Decompress response bodies
.layer(DecompressionLayer::new())
// Set a `User-Agent` header
.layer(SetRequestHeaderLayer::<_, Request<BoxBody>>::overriding(
header::USER_AGENT,
HeaderValue::from_static("tonic-key-value-store"),
))
// Build our final `Service`
.service(channel);

// Construct our tonic client
Ok(KeyValueStoreClient::new(channel))
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn get_and_set_value() {
let addr = run_in_background();

let mut client = make_client(addr).await.unwrap();

let key = "foo".to_string();
let value = vec![1_u8, 3, 3, 7];

let status = client
.get(GetRequest { key: key.clone() })
.await
.unwrap_err();
assert_eq!(status.code(), Code::NotFound);

client
.set(SetRequest {
key: key.clone(),
value: value.clone(),
})
.await
.unwrap();

let server_value = client
.get(GetRequest { key: key.clone() })
.await
.unwrap()
.into_inner()
.value;
assert_eq!(value, server_value);
}

// Run our service in a background task.
fn run_in_background() -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").expect("Could not bind ephemeral socket");
let addr = listener.local_addr().unwrap();

// just for debugging
eprintln!("Listening on {}", addr);

tokio::spawn(async move {
serve_forever(listener).await.unwrap();
});

addr
}
}