From 0d457d3d374b363f7b39e1c12e2bf3b491d4a4ea Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Sun, 11 Oct 2020 23:54:44 +0200 Subject: [PATCH 01/17] Handle gzip compressed messages --- tonic/Cargo.toml | 3 + tonic/benches/decode.rs | 90 ++++++++++++++++++- tonic/src/client/grpc.rs | 5 +- tonic/src/codec/compression/bufwriter.rs | 26 ++++++ tonic/src/codec/compression/compressors.rs | 81 +++++++++++++++++ tonic/src/codec/compression/decompression.rs | 94 ++++++++++++++++++++ tonic/src/codec/compression/errors.rs | 39 ++++++++ tonic/src/codec/compression/gzip.rs | 30 +++++++ tonic/src/codec/compression/mod.rs | 12 +++ tonic/src/codec/decode.rs | 74 +++++++++++---- tonic/src/codec/mod.rs | 3 + tonic/src/server/grpc.rs | 10 ++- 12 files changed, 440 insertions(+), 27 deletions(-) create mode 100644 tonic/src/codec/compression/bufwriter.rs create mode 100644 tonic/src/codec/compression/compressors.rs create mode 100644 tonic/src/codec/compression/decompression.rs create mode 100644 tonic/src/codec/compression/errors.rs create mode 100644 tonic/src/codec/compression/gzip.rs create mode 100644 tonic/src/codec/compression/mod.rs diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 39b0276cc..1ec052818 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -36,6 +36,7 @@ transport = [ tls = ["transport", "tokio-rustls"] tls-roots = ["tls", "rustls-native-certs"] prost = ["prost1", "prost-derive"] +gzip = ["flate2"] # [[bench]] # name = "bench_main" @@ -48,6 +49,8 @@ futures-util = { version = "0.3", default-features = false } tracing = "0.1" http = "0.2" base64 = "0.12" +flate2 = { version = "1.0", optional = true } +once_cell = "1.0" percent-encoding = "2.0" tower-service = "0.3" diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 529e69377..c1e4e472b 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -6,10 +6,13 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming}; +use tonic::{codec::DecodeBuf, codec::Decoder, codec::Decompression, Status, Streaming}; macro_rules! bench { ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => { + bench!($name, $message_size, $chunk_size, $message_count, None); + }; + ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr, $encoding:expr) => { fn $name(b: &mut Bencher) { let mut rt = tokio::runtime::Builder::new() .basic_scheduler() @@ -23,7 +26,9 @@ macro_rules! bench { b.iter(|| { rt.block_on(async { let decoder = MockDecoder::new($message_size); - let mut stream = Streaming::new_request(decoder, body.clone()); + + let decompression = Decompression::new($encoding); + let mut stream = Streaming::new_request(decoder, body.clone(), decompression); let mut count = 0; while let Some(msg) = stream.message().await.unwrap() { @@ -137,6 +142,57 @@ bench!(message_count_1, 500, 505, 1); bench!(message_count_10, 500, 505, 10); bench!(message_count_20, 500, 505, 20); +// gzip change body chunk size only +bench!(chunk_size_100_gzip, 1_000, 100, 1, Some("gzip".to_string())); +bench!(chunk_size_500_gzip, 1_000, 500, 1, Some("gzip".to_string())); +bench!( + chunk_size_1005_gzip, + 1_000, + 1_005, + 1, + Some("gzip".to_string()) +); + +// gzip change message size only +bench!( + message_size_1k_gzip, + 1_000, + 1_005, + 2, + Some("gzip".to_string()) +); +bench!( + message_size_5k_gzip, + 5_000, + 1_005, + 2, + Some("gzip".to_string()) +); +bench!( + message_size_10k_gzip, + 10_000, + 1_005, + 2, + Some("gzip".to_string()) +); + +// gzip change message count only +bench!(message_count_1_gzip, 500, 505, 1, Some("gzip".to_string())); +bench!( + message_count_10_gzip, + 500, + 505, + 10, + Some("gzip".to_string()) +); +bench!( + message_count_20_gzip, + 500, + 505, + 20, + Some("gzip".to_string()) +); + benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005); benchmark_group!( @@ -153,4 +209,32 @@ benchmark_group!( message_count_20 ); -benchmark_main!(chunk_size, message_size, message_count); +benchmark_group!( + chunk_size_gzip, + chunk_size_100_gzip, + chunk_size_500_gzip, + chunk_size_1005_gzip +); + +benchmark_group!( + message_size_gzip, + message_size_1k_gzip, + message_size_5k_gzip, + message_size_10k_gzip +); + +benchmark_group!( + message_count_gzip, + message_count_1_gzip, + message_count_10_gzip, + message_count_20_gzip +); + +benchmark_main!( + chunk_size, + message_size, + message_count, + chunk_size_gzip, + message_size_gzip, + message_count_gzip +); diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 6a42b4022..a40a5e677 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -1,7 +1,7 @@ use crate::{ body::{Body, BoxBody}, client::GrpcService, - codec::{encode_client, Codec, Streaming}, + codec::{encode_client, Codec, Decompression, Streaming}, interceptor::Interceptor, Code, Request, Response, Status, }; @@ -196,9 +196,10 @@ impl Grpc { true }; + let decompression = Decompression::from_headers(response.headers()); let response = response.map(|body| { if expect_additional_trailers { - Streaming::new_response(codec.decoder(), body, status_code) + Streaming::new_response(codec.decoder(), body, status_code, decompression) } else { Streaming::new_empty(codec.decoder(), body) } diff --git a/tonic/src/codec/compression/bufwriter.rs b/tonic/src/codec/compression/bufwriter.rs new file mode 100644 index 000000000..9aad3d39a --- /dev/null +++ b/tonic/src/codec/compression/bufwriter.rs @@ -0,0 +1,26 @@ +use bytes::BufMut; + +use std::{cmp, io}; + +/// A `BufMut` adapter which implements `io::Write` for the inner value. +#[derive(Debug)] +pub(crate) struct Writer<'a, B> { + buf: &'a mut B, +} + +pub(crate) fn new<'a, B>(buf: &'a mut B) -> Writer<'a, B> { + Writer { buf } +} + +impl<'a, B: BufMut + Sized> io::Write for Writer<'a, B> { + fn write(&mut self, src: &[u8]) -> io::Result { + let n = cmp::min(self.buf.remaining_mut(), src.len()); + + self.buf.put(&src[0..n]); + Ok(n) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs new file mode 100644 index 000000000..655a19ab1 --- /dev/null +++ b/tonic/src/codec/compression/compressors.rs @@ -0,0 +1,81 @@ +use std::{collections::HashMap, io}; + +use super::bufwriter; +use bytes::{Buf, BytesMut}; +use once_cell::sync::Lazy; + +pub(crate) const IDENTITY: &str = "identity"; + +/// List of known compressors +static COMPRESSORS: Lazy>> = Lazy::new(|| { + let mut m = HashMap::new(); + + let mut add = |compressor: Box| { + m.insert(compressor.name().to_string(), compressor); + }; + + add(Box::new(IdentityCompressor {})); + + #[cfg(feature = "gzip")] + add(Box::new(super::gzip::GZipCompressor {})); + + m +}); + +/// Get a compressor from it's name +pub(crate) fn get(name: impl AsRef) -> Option<&'static Box> { + COMPRESSORS.get(name.as_ref()) +} + +/// Get all the known compressors +pub(crate) fn names() -> Vec { + COMPRESSORS.keys().map(|n| n.clone()).collect() +} + +/// A compressor implement compression and decompression of GRPC frames +pub(crate) trait Compressor: Sync + Send { + /// Get the name of this compressor as present in http headers + fn name(&self) -> &'static str; + + /// Decompress `len` bytes from `in_buffer` into `out_buffer` + fn decompress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()>; + + /// Estimate the space necessary to decompress `compressed_len` bytes of compressed data + fn estimate_decompressed_len(&self, compressed_len: usize) -> usize { + compressed_len * 2 + } +} + +/// The identity compressor doesn't compress +#[derive(Debug)] +struct IdentityCompressor {} + +impl Compressor for IdentityCompressor { + fn name(&self) -> &'static str { + IDENTITY + } + + fn decompress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()> { + let mut in_reader = &in_buffer[0..len]; + let mut out_writer = bufwriter::new(out_buffer); + + std::io::copy(&mut in_reader, &mut out_writer)?; + in_buffer.advance(len); + + Ok(()) + } + + fn estimate_decompressed_len(&self, compressed_len: usize) -> usize { + compressed_len + } +} diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs new file mode 100644 index 000000000..872b23976 --- /dev/null +++ b/tonic/src/codec/compression/decompression.rs @@ -0,0 +1,94 @@ +use bytes::BytesMut; +use tracing::debug; + +use super::{compressors, Compressor, DecompressionError}; + +const BUFFER_SIZE: usize = 8 * 1024; +const ENCODING_HEADER: &str = "grpc-encoding"; + +/// Information related to the decompression of a request or response +#[derive(Debug)] +pub struct Decompression { + encoding: Option, +} + +impl Decompression { + /// Create a `Decompression` structure + pub fn new(encoding: Option) -> Decompression { + Decompression { encoding } + } + + /// Create a `Decompression` structure from http headers + pub fn from_headers(metadata: &http::HeaderMap) -> Decompression { + let encoding = metadata + .get(ENCODING_HEADER) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + Decompression::new(encoding) + } + + /// Get if the current encoding is the no-op "identity" one + pub fn is_identity(&self) -> bool { + match &self.encoding { + Some(encoding) => encoding == compressors::IDENTITY, + None => false, + } + } + + /// Clear the buffer and reserve an estimated number of bytes + fn prepare_decompress_buf(buffer: &mut BytesMut, estimated_len: usize) { + buffer.clear(); + + if buffer.capacity() < estimated_len { + let capacity = ((estimated_len / BUFFER_SIZE) + 1) * BUFFER_SIZE; + buffer.reserve(capacity) + } + } + + /// Find a compressor in the registry for the current encoding + fn get_compressor(&self) -> Result<&Box, DecompressionError> { + match &self.encoding { + None => { + Ok(compressors::get(compressors::IDENTITY).expect("Identity is always present")) + } + Some(encoding) => match compressors::get(encoding) { + Some(compressor) => Ok(compressor), + None => Err(DecompressionError::NotFound { + requested: encoding.clone(), + known: compressors::names(), + }), + }, + } + } + + /// Decompress `len` bytes from `in_buffer` into `out_buffer` + pub fn decompress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> Result<(), DecompressionError> { + let compressor = self.get_compressor()?; + + Decompression::prepare_decompress_buf( + out_buffer, + compressor.estimate_decompressed_len(len), + ); + compressor.decompress(in_buffer, out_buffer, len)?; + + debug!( + "Decompressed {} bytes into {} bytes using {:?}", + len, + out_buffer.len(), + self.encoding + ); + Ok(()) + } +} + +impl Default for Decompression { + fn default() -> Self { + Decompression { encoding: None } + } +} diff --git a/tonic/src/codec/compression/errors.rs b/tonic/src/codec/compression/errors.rs new file mode 100644 index 000000000..cac39bd98 --- /dev/null +++ b/tonic/src/codec/compression/errors.rs @@ -0,0 +1,39 @@ +#[derive(Debug)] +pub enum DecompressionError { + NotFound { + requested: String, + known: Vec, + }, + Failed(std::io::Error), +} + +impl std::fmt::Display for DecompressionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + DecompressionError::NotFound { requested, known } => { + let known_joined = known.join(", "); + write!( + f, + "Compressor for '{}' not found. Known compressors: {}", + requested, known_joined + ) + } + DecompressionError::Failed(error) => write!(f, "Decompression failed: {}", error), + } + } +} + +impl std::error::Error for DecompressionError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match &self { + DecompressionError::NotFound { .. } => None, + DecompressionError::Failed(err) => Some(err), + } + } +} + +impl From for DecompressionError { + fn from(error: std::io::Error) -> Self { + DecompressionError::Failed(error) + } +} diff --git a/tonic/src/codec/compression/gzip.rs b/tonic/src/codec/compression/gzip.rs new file mode 100644 index 000000000..31e864f1b --- /dev/null +++ b/tonic/src/codec/compression/gzip.rs @@ -0,0 +1,30 @@ +use std::io; + +use super::{Compressor, bufwriter}; +use bytes::{Buf, BytesMut}; +use flate2::read::GzDecoder; + +/// Compress using GZIP +#[derive(Debug)] +pub(crate) struct GZipCompressor {} + +impl Compressor for GZipCompressor { + fn name(&self) -> &'static str { + "gzip" + } + + fn decompress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()> { + let mut gzip_decoder = GzDecoder::new(&in_buffer[0..len]); + let mut out_writer = bufwriter::new(out_buffer); + + std::io::copy(&mut gzip_decoder, &mut out_writer)?; + in_buffer.advance(len); + + Ok(()) + } +} diff --git a/tonic/src/codec/compression/mod.rs b/tonic/src/codec/compression/mod.rs new file mode 100644 index 000000000..f642541dd --- /dev/null +++ b/tonic/src/codec/compression/mod.rs @@ -0,0 +1,12 @@ +mod bufwriter; +mod compressors; +mod decompression; +mod errors; + +#[cfg(feature = "gzip")] +mod gzip; + +pub(crate) use self::compressors::Compressor; +#[doc(hidden)] +pub use self::decompression::Decompression; +pub(crate) use self::errors::DecompressionError; diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 157a27093..09f34fe82 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,4 +1,4 @@ -use super::{DecodeBuf, Decoder}; +use super::{DecodeBuf, Decoder, Decompression}; use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; use bytes::{Buf, BufMut, BytesMut}; use futures_core::Stream; @@ -24,6 +24,8 @@ pub struct Streaming { state: State, direction: Direction, buf: BytesMut, + decompression: Decompression, + decompress_buf: BytesMut, trailers: Option, } @@ -43,13 +45,23 @@ enum Direction { } impl Streaming { - pub(crate) fn new_response(decoder: D, body: B, status_code: StatusCode) -> Self + pub(crate) fn new_response( + decoder: D, + body: B, + status_code: StatusCode, + decompression: Decompression, + ) -> Self where B: Body + Send + Sync + 'static, B::Error: Into, D: Decoder + Send + Sync + 'static, { - Self::new(decoder, body, Direction::Response(status_code)) + Self::new( + decoder, + body, + Direction::Response(status_code), + decompression, + ) } pub(crate) fn new_empty(decoder: D, body: B) -> Self @@ -58,20 +70,25 @@ impl Streaming { B::Error: Into, D: Decoder + Send + Sync + 'static, { - Self::new(decoder, body, Direction::EmptyResponse) + Self::new( + decoder, + body, + Direction::EmptyResponse, + Decompression::default(), + ) } #[doc(hidden)] - pub fn new_request(decoder: D, body: B) -> Self + pub fn new_request(decoder: D, body: B, decompression: Decompression) -> Self where B: Body + Send + Sync + 'static, B::Error: Into, D: Decoder + Send + Sync + 'static, { - Self::new(decoder, body, Direction::Request) + Self::new(decoder, body, Direction::Request, decompression) } - fn new(decoder: D, body: B, direction: Direction) -> Self + fn new(decoder: D, body: B, direction: Direction, decompression: Decompression) -> Self where B: Body + Send + Sync + 'static, B::Error: Into, @@ -83,6 +100,8 @@ impl Streaming { state: State::ReadHeader, direction, buf: BytesMut::with_capacity(BUFFER_SIZE), + decompress_buf: BytesMut::new(), + decompression, trailers: None, } } @@ -159,13 +178,7 @@ impl Streaming { let is_compressed = match self.buf.get_u8() { 0 => false, - 1 => { - trace!("message compressed, compression not supported yet"); - return Err(Status::new( - Code::Unimplemented, - "Message compressed, compression not supported yet.".to_string(), - )); - } + 1 => true, f => { trace!("unexpected compression flag"); let message = if let Direction::Response(status) = self.direction { @@ -187,17 +200,40 @@ impl Streaming { } } - if let State::ReadBody { len, .. } = &self.state { + if let State::ReadBody { len, compression } = &self.state { // if we haven't read enough of the message then return and keep // reading if self.buf.remaining() < *len || self.buf.len() < *len { return Ok(None); } - return match self - .decoder - .decode(&mut DecodeBuf::new(&mut self.buf, *len)) - { + let decode_result = if *compression && !self.decompression.is_identity() { + if let Err(err) = + self.decompression + .decompress(&mut self.buf, &mut self.decompress_buf, *len) + { + trace!(error = ?err, "Error decompressing: {}", err); + let message = if let Direction::Response(status) = self.direction { + format!( + "Error decompressing: {}, while receiving response with status: {}", + err, status + ) + } else { + format!("Error decompressing: {}, while sending request", err) + }; + return Err(Status::new(Code::Internal, message)); + } + let uncompressed_len = self.decompress_buf.len(); + self.decoder.decode(&mut DecodeBuf::new( + &mut self.decompress_buf, + uncompressed_len, + )) + } else { + self.decoder + .decode(&mut DecodeBuf::new(&mut self.buf, *len)) + }; + + return match decode_result { Ok(Some(msg)) => { self.state = State::ReadHeader; Ok(Some(msg)) diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index e100556c3..bb88dc860 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -4,6 +4,7 @@ //! and a protobuf codec based on prost. mod buffer; +mod compression; mod decode; mod encode; #[cfg(feature = "prost")] @@ -11,6 +12,8 @@ mod prost; use std::io; +#[doc(hidden)] +pub use self::compression::Decompression; pub use self::decode::Streaming; pub(crate) use self::encode::{encode_client, encode_server}; #[cfg(feature = "prost")] diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 7852d7fe7..d09a5c7ac 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -1,6 +1,6 @@ use crate::{ body::BoxBody, - codec::{encode_server, Codec, Streaming}, + codec::{encode_server, Codec, Decompression, Streaming}, interceptor::Interceptor, server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, Code, Request, Status, @@ -160,7 +160,8 @@ where B::Error: Into + Send, { let (parts, body) = request.into_parts(); - let stream = Streaming::new_request(self.codec.decoder(), body); + let decompression = Decompression::from_headers(&parts.headers); + let stream = Streaming::new_request(self.codec.decoder(), body, decompression); futures_util::pin_mut!(stream); @@ -186,7 +187,10 @@ where B: Body + Send + Sync + 'static, B::Error: Into + Send, { - Request::from_http(request.map(|body| Streaming::new_request(self.codec.decoder(), body))) + let decompression = Decompression::from_headers(request.headers()); + Request::from_http( + request.map(|body| Streaming::new_request(self.codec.decoder(), body, decompression)), + ) } fn map_response( From 3f26526276de700e24a2541adb991e52acb7b066 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Thu, 15 Oct 2020 17:42:47 +0200 Subject: [PATCH 02/17] fmt --- tonic/src/codec/compression/gzip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tonic/src/codec/compression/gzip.rs b/tonic/src/codec/compression/gzip.rs index 31e864f1b..ac6636aab 100644 --- a/tonic/src/codec/compression/gzip.rs +++ b/tonic/src/codec/compression/gzip.rs @@ -1,6 +1,6 @@ use std::io; -use super::{Compressor, bufwriter}; +use super::{bufwriter, Compressor}; use bytes::{Buf, BytesMut}; use flate2::read::GzDecoder; From 7168646ce50949cd6b19ec7d60de4438179c0144 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Thu, 15 Oct 2020 17:45:40 +0200 Subject: [PATCH 03/17] Prepare for really gziping in benches --- tonic/benches/decode.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index c1e4e472b..2b7bddf22 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -19,7 +19,7 @@ macro_rules! bench { .build() .expect("runtime"); - let payload = make_payload($message_size, $message_count); + let payload = make_payload($message_size, $message_count, $encoding); let body = MockBody::new(payload, $chunk_size); b.bytes = body.len() as u64; @@ -113,13 +113,16 @@ impl Decoder for MockDecoder { } } -fn make_payload(message_length: usize, message_count: usize) -> Bytes { +fn make_payload(message_length: usize, message_count: usize, encoding: Option) -> Bytes { let mut buf = BytesMut::new(); for _ in 0..message_count { let msg = vec![97u8; message_length]; buf.reserve(msg.len() + 5); - buf.put_u8(0); + buf.put_u8(match encoding { + Some(_) => 1, + None => 0, + }); buf.put_u32(msg.len() as u32); buf.put(&msg[..]); } From 1500dd53975fdd57d9a02d1773d2bf220e0dd5c9 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 14:57:06 +0200 Subject: [PATCH 04/17] Bench decode with gzip --- tonic/Cargo.toml | 1 - tonic/benches/decode.rs | 46 ++++++++++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 1ec052818..75166e55e 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -92,4 +92,3 @@ rustdoc-args = ["--cfg", "docsrs"] [[bench]] name = "decode" harness = false - diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 2b7bddf22..3ab0640cd 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -19,7 +19,7 @@ macro_rules! bench { .build() .expect("runtime"); - let payload = make_payload($message_size, $message_count, $encoding); + let payload = make_payload($message_size, $message_count, &$encoding); let body = MockBody::new(payload, $chunk_size); b.bytes = body.len() as u64; @@ -113,18 +113,36 @@ impl Decoder for MockDecoder { } } -fn make_payload(message_length: usize, message_count: usize, encoding: Option) -> Bytes { +fn make_payload(message_length: usize, message_count: usize, encoding: &Option) -> Bytes { let mut buf = BytesMut::new(); + let raw_msg = vec![97u8; message_length]; + + let msg_buf = match encoding { + #[cfg(feature = "gzip")] + Some(encoding) if encoding == "gzip" => { + use bytes::buf::BufMutExt; + let mut reader = flate2::read::GzEncoder::new(&raw_msg[..], flate2::Compression::best()); + let mut writer = BytesMut::new().writer(); + + std::io::copy(&mut reader, &mut writer).expect("copy"); + writer.into_inner() + } + None => { + let mut msg_buf = BytesMut::new(); + msg_buf.put(&raw_msg[..]); + msg_buf + } + Some(encoding) => { + panic!("Encoding {} isn't supported", encoding) + } + }; + for _ in 0..message_count { - let msg = vec![97u8; message_length]; - buf.reserve(msg.len() + 5); - buf.put_u8(match encoding { - Some(_) => 1, - None => 0, - }); - buf.put_u32(msg.len() as u32); - buf.put(&msg[..]); + buf.reserve(msg_buf.len() + 5); + buf.put_u8(match encoding { Some(_) => 1, None => 0}); + buf.put_u32(msg_buf.len() as u32); + buf.put(&msg_buf[..]); } buf.freeze() @@ -233,6 +251,7 @@ benchmark_group!( message_count_20_gzip ); +#[cfg(feature = "gzip")] benchmark_main!( chunk_size, message_size, @@ -241,3 +260,10 @@ benchmark_main!( message_size_gzip, message_count_gzip ); + +#[cfg(not(feature = "gzip"))] +benchmark_main!( + chunk_size, + message_size, + message_count +); From 97afb0a0dfd6b5c7a897f6334fae175b7531cde9 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 15:14:26 +0200 Subject: [PATCH 05/17] Add compress in Compressor interface --- tonic/src/codec/compression/compressors.rs | 33 ++++++++++++++++++++-- tonic/src/codec/compression/gzip.rs | 33 ++++++++++++++++++++-- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index 655a19ab1..823955a2e 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -14,10 +14,10 @@ static COMPRESSORS: Lazy>> = Lazy::new(|| { m.insert(compressor.name().to_string(), compressor); }; - add(Box::new(IdentityCompressor {})); + add(Box::new(IdentityCompressor::default())); #[cfg(feature = "gzip")] - add(Box::new(super::gzip::GZipCompressor {})); + add(Box::new(super::gzip::GZipCompressor::default())); m }); @@ -45,6 +45,14 @@ pub(crate) trait Compressor: Sync + Send { len: usize, ) -> io::Result<()>; + /// Compress `len` bytes from `in_buffer` into `out_buffer` + fn compress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()>; + /// Estimate the space necessary to decompress `compressed_len` bytes of compressed data fn estimate_decompressed_len(&self, compressed_len: usize) -> usize { compressed_len * 2 @@ -55,6 +63,12 @@ pub(crate) trait Compressor: Sync + Send { #[derive(Debug)] struct IdentityCompressor {} +impl Default for IdentityCompressor { + fn default() -> Self { + Self {} + } +} + impl Compressor for IdentityCompressor { fn name(&self) -> &'static str { IDENTITY @@ -75,6 +89,21 @@ impl Compressor for IdentityCompressor { Ok(()) } + fn compress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()> { + let mut in_reader = &in_buffer[0..len]; + let mut out_writer = bufwriter::new(out_buffer); + + std::io::copy(&mut in_reader, &mut out_writer)?; + in_buffer.advance(len); + + Ok(()) + } + fn estimate_decompressed_len(&self, compressed_len: usize) -> usize { compressed_len } diff --git a/tonic/src/codec/compression/gzip.rs b/tonic/src/codec/compression/gzip.rs index ac6636aab..38657ac44 100644 --- a/tonic/src/codec/compression/gzip.rs +++ b/tonic/src/codec/compression/gzip.rs @@ -2,11 +2,25 @@ use std::io; use super::{bufwriter, Compressor}; use bytes::{Buf, BytesMut}; -use flate2::read::GzDecoder; +use flate2::read::{GzDecoder, GzEncoder}; /// Compress using GZIP #[derive(Debug)] -pub(crate) struct GZipCompressor {} +pub(crate) struct GZipCompressor { + compression_level: flate2::Compression +} + +impl GZipCompressor { + fn new(compression_level: flate2::Compression) -> GZipCompressor { + GZipCompressor { compression_level } + } +} + +impl Default for GZipCompressor { + fn default() -> Self { + Self::new(flate2::Compression::new(6)) + } +} impl Compressor for GZipCompressor { fn name(&self) -> &'static str { @@ -27,4 +41,19 @@ impl Compressor for GZipCompressor { Ok(()) } + + fn compress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> io::Result<()> { + let mut gzip_decoder = GzEncoder::new(&in_buffer[0..len], self.compression_level); + let mut out_writer = bufwriter::new(out_buffer); + + std::io::copy(&mut gzip_decoder, &mut out_writer)?; + in_buffer.advance(len); + + Ok(()) + } } From 16777d2d9c707075e65201f9f710ca4a009d7e38 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 15:20:44 +0200 Subject: [PATCH 06/17] Don't force all compressors to advance the in buffer --- tonic/src/codec/compression/compressors.rs | 12 +++++------- tonic/src/codec/compression/decompression.rs | 3 ++- tonic/src/codec/compression/gzip.rs | 8 +++----- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index 823955a2e..dcf337c7f 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, io}; use super::bufwriter; -use bytes::{Buf, BytesMut}; +use bytes::BytesMut; use once_cell::sync::Lazy; pub(crate) const IDENTITY: &str = "identity"; @@ -40,7 +40,7 @@ pub(crate) trait Compressor: Sync + Send { /// Decompress `len` bytes from `in_buffer` into `out_buffer` fn decompress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()>; @@ -48,7 +48,7 @@ pub(crate) trait Compressor: Sync + Send { /// Compress `len` bytes from `in_buffer` into `out_buffer` fn compress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()>; @@ -76,7 +76,7 @@ impl Compressor for IdentityCompressor { fn decompress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()> { @@ -84,14 +84,13 @@ impl Compressor for IdentityCompressor { let mut out_writer = bufwriter::new(out_buffer); std::io::copy(&mut in_reader, &mut out_writer)?; - in_buffer.advance(len); Ok(()) } fn compress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()> { @@ -99,7 +98,6 @@ impl Compressor for IdentityCompressor { let mut out_writer = bufwriter::new(out_buffer); std::io::copy(&mut in_reader, &mut out_writer)?; - in_buffer.advance(len); Ok(()) } diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index 872b23976..ae02d12f3 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -1,4 +1,4 @@ -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; use tracing::debug; use super::{compressors, Compressor, DecompressionError}; @@ -76,6 +76,7 @@ impl Decompression { compressor.estimate_decompressed_len(len), ); compressor.decompress(in_buffer, out_buffer, len)?; + in_buffer.advance(len); debug!( "Decompressed {} bytes into {} bytes using {:?}", diff --git a/tonic/src/codec/compression/gzip.rs b/tonic/src/codec/compression/gzip.rs index 38657ac44..bbb0cfdd9 100644 --- a/tonic/src/codec/compression/gzip.rs +++ b/tonic/src/codec/compression/gzip.rs @@ -1,7 +1,7 @@ use std::io; use super::{bufwriter, Compressor}; -use bytes::{Buf, BytesMut}; +use bytes::BytesMut; use flate2::read::{GzDecoder, GzEncoder}; /// Compress using GZIP @@ -29,7 +29,7 @@ impl Compressor for GZipCompressor { fn decompress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()> { @@ -37,14 +37,13 @@ impl Compressor for GZipCompressor { let mut out_writer = bufwriter::new(out_buffer); std::io::copy(&mut gzip_decoder, &mut out_writer)?; - in_buffer.advance(len); Ok(()) } fn compress( &self, - in_buffer: &mut BytesMut, + in_buffer: &BytesMut, out_buffer: &mut BytesMut, len: usize, ) -> io::Result<()> { @@ -52,7 +51,6 @@ impl Compressor for GZipCompressor { let mut out_writer = bufwriter::new(out_buffer); std::io::copy(&mut gzip_decoder, &mut out_writer)?; - in_buffer.advance(len); Ok(()) } From f8dd33fc4a19be3e8ae52f1d2db47bf56e973c09 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 17:37:58 +0200 Subject: [PATCH 07/17] Add compression (on responses) --- tonic/src/client/grpc.rs | 4 +- tonic/src/codec/compression/compression.rs | 88 ++++++++++++++++++++ tonic/src/codec/compression/compressors.rs | 6 ++ tonic/src/codec/compression/decompression.rs | 5 +- tonic/src/codec/compression/mod.rs | 6 ++ tonic/src/codec/encode.rs | 25 ++++-- tonic/src/codec/mod.rs | 1 + tonic/src/server/grpc.rs | 31 +++---- 8 files changed, 139 insertions(+), 27 deletions(-) create mode 100644 tonic/src/codec/compression/compression.rs diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index a40a5e677..36ee7c1b7 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -1,7 +1,7 @@ use crate::{ body::{Body, BoxBody}, client::GrpcService, - codec::{encode_client, Codec, Decompression, Streaming}, + codec::{encode_client, Codec, Compression, Decompression, Streaming}, interceptor::Interceptor, Code, Request, Response, Status, }; @@ -160,7 +160,7 @@ impl Grpc { let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri"); let request = request - .map(|s| encode_client(codec.encoder(), s)) + .map(|s| encode_client(codec.encoder(), s, Compression::new_request())) .map(BoxBody::new); let mut request = request.into_http(uri); diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs new file mode 100644 index 000000000..db9b562da --- /dev/null +++ b/tonic/src/codec/compression/compression.rs @@ -0,0 +1,88 @@ +use std::{fmt::Debug, io}; + +use bytes::{Buf, BytesMut}; +use tracing::debug; + +use crate::metadata::MetadataMap; + +use super::{Compressor, compressors::{self, IDENTITY}}; + +pub(crate) const BUFFER_SIZE: usize = 8 * 1024; +pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; + +pub(crate) struct Compression { + compressor: Option<&'static Box>, +} + +impl Debug for Compression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Compression") + .field("compressor", &self.compressor.map(|c| c.name()).unwrap_or(IDENTITY)) + .finish() + } +} + +fn parse_accept_encoding_header(value: &str) -> Vec<&str> { + value.split(",").map(|v| v.trim()).filter(|v| !v.is_empty()).collect::>() +} + +fn first_supported_compressor(accepted: &Vec<&str>) -> Option<&'static Box> { + accepted.iter() + .filter(|name| **name != IDENTITY) + .filter_map(|name|compressors::get(name)) + .next() +} + +impl Compression { + pub(crate) fn new_request() -> Compression { + Compression { compressor: None } + } + + pub(crate) fn response_from_metadata(request_metadata: &MetadataMap) -> Compression { + let accept_encoding_header = request_metadata + .get(ACCEPT_ENCODING_HEADER) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + let parsed = parse_accept_encoding_header(accept_encoding_header); + let compressor = first_supported_compressor(&parsed); + Compression { compressor } + } + + pub(crate) fn response_from_headers(request_headers: &http::HeaderMap) -> Compression { + let accept_encoding_header = request_headers + .get(ACCEPT_ENCODING_HEADER) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + let parsed = parse_accept_encoding_header(accept_encoding_header); + let compressor = first_supported_compressor(&parsed); + Compression { compressor } + } + + pub(crate) fn is_enabled(&self) -> bool { + self.compressor.is_some() + } + + /// Decompress `len` bytes from `in_buffer` into `out_buffer` + pub(crate) fn compress( + &self, + in_buffer: &mut BytesMut, + out_buffer: &mut BytesMut, + len: usize, + ) -> Result<(), io::Error> { + out_buffer.reserve(((len / BUFFER_SIZE) + 1) * BUFFER_SIZE); + + let compressor = self.compressor.unwrap_or_else(compressors::identity); + compressor.compress(in_buffer, out_buffer, len)?; + in_buffer.advance(len); + + debug!( + "Decompressed {} bytes into {} bytes using {:?}", + len, + out_buffer.len(), + compressor.name() + ); + Ok(()) + } +} diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index dcf337c7f..ba1902078 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -106,3 +106,9 @@ impl Compressor for IdentityCompressor { compressed_len } } + +static BOXED_IDENTITY_COMPRESSOR: Lazy> = Lazy::new(|| Box::new(IdentityCompressor::default())); + +pub(crate) fn identity() -> &'static Box { + &BOXED_IDENTITY_COMPRESSOR +} \ No newline at end of file diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index ae02d12f3..0b67d5c69 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -71,10 +71,7 @@ impl Decompression { ) -> Result<(), DecompressionError> { let compressor = self.get_compressor()?; - Decompression::prepare_decompress_buf( - out_buffer, - compressor.estimate_decompressed_len(len), - ); + out_buffer.reserve(((compressor.estimate_decompressed_len(len) / BUFFER_SIZE) + 1) * BUFFER_SIZE); compressor.decompress(in_buffer, out_buffer, len)?; in_buffer.advance(len); diff --git a/tonic/src/codec/compression/mod.rs b/tonic/src/codec/compression/mod.rs index f642541dd..acf0e503b 100644 --- a/tonic/src/codec/compression/mod.rs +++ b/tonic/src/codec/compression/mod.rs @@ -1,12 +1,18 @@ mod bufwriter; mod compressors; +mod compression; mod decompression; mod errors; #[cfg(feature = "gzip")] mod gzip; +use bytes::BytesMut; + pub(crate) use self::compressors::Compressor; + #[doc(hidden)] pub use self::decompression::Decompression; pub(crate) use self::errors::DecompressionError; + +pub(crate) use self::compression::Compression; diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 58f4f99da..873110b38 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,4 +1,4 @@ -use super::{EncodeBuf, Encoder}; +use super::{EncodeBuf, Encoder, compression::Compression}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; use futures_core::{Stream, TryStream}; @@ -16,36 +16,42 @@ const BUFFER_SIZE: usize = 8 * 1024; pub(crate) fn encode_server( encoder: T, source: U, + compression: Compression, ) -> EncodeBody>> where T: Encoder + Send + Sync + 'static, T::Item: Send + Sync, U: Stream> + Send + Sync + 'static, { - let stream = encode(encoder, source).into_stream(); + let stream = encode(encoder, source, compression).into_stream(); EncodeBody::new_server(stream) } pub(crate) fn encode_client( encoder: T, source: U, + compression: Compression, ) -> EncodeBody>> where T: Encoder + Send + Sync + 'static, T::Item: Send + Sync, U: Stream + Send + Sync + 'static, { - let stream = encode(encoder, source.map(Ok)).into_stream(); + let stream = encode(encoder, source.map(Ok), compression).into_stream(); EncodeBody::new_client(stream) } -fn encode(mut encoder: T, source: U) -> impl TryStream +fn encode(mut encoder: T, source: U, compression: Compression) -> impl TryStream where T: Encoder, U: Stream>, { + let compression_enabled = compression.is_enabled(); + let compressed_u8 = if compression_enabled { 1 } else { 0 }; + async_stream::stream! { let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + let mut compress_buf = if compression_enabled { BytesMut::with_capacity(BUFFER_SIZE) } else { BytesMut::new() }; futures_util::pin_mut!(source); loop { @@ -55,14 +61,21 @@ where unsafe { buf.advance_mut(5); } - encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(drop).unwrap(); + if compression_enabled { + compress_buf.clear(); + encoder.encode(item, &mut EncodeBuf::new(&mut compress_buf)).map_err(drop).unwrap(); + let compressed_len = compress_buf.len(); + compression.compress(&mut compress_buf, &mut buf, compressed_len).map_err(drop).unwrap(); + } else { + encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(drop).unwrap(); + } // now that we know length, we can write the header let len = buf.len() - 5; assert!(len <= std::u32::MAX as usize); { let mut buf = &mut buf[..5]; - buf.put_u8(0); // byte must be 0, reserve doesn't auto-zero + buf.put_u8(compressed_u8); buf.put_u32(len as u32); } diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index bb88dc860..896c04512 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -14,6 +14,7 @@ use std::io; #[doc(hidden)] pub use self::compression::Decompression; +pub(crate) use self::compression::Compression; pub use self::decode::Streaming; pub(crate) use self::encode::{encode_client, encode_server}; #[cfg(feature = "prost")] diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index d09a5c7ac..1bc91d73d 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -1,10 +1,4 @@ -use crate::{ - body::BoxBody, - codec::{encode_server, Codec, Decompression, Streaming}, - interceptor::Interceptor, - server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, - Code, Request, Status, -}; +use crate::{Code, Request, Status, body::BoxBody, codec::{Codec, Compression, Decompression, Streaming, encode_server}, interceptor::Interceptor, server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}}; use futures_core::TryStream; use futures_util::{future, stream, TryStreamExt}; use http_body::Body; @@ -67,24 +61,26 @@ where B: Body + Send + Sync + 'static, B::Error: Into + Send, { + let compression = Compression::response_from_headers(req.headers()); let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { return self .map_response::>>>(Err( status, - )); + ), compression); } }; let request = t!(self.intercept_request(request)); + let compression = Compression::response_from_metadata(request.metadata()); let response = service .call(request) .await .map(|r| r.map(|m| stream::once(future::ok(m)))); - self.map_response(response) + self.map_response(response, compression) } /// Handle a server side streaming request. @@ -99,18 +95,20 @@ where B: Body + Send + Sync + 'static, B::Error: Into + Send, { + let compression = Compression::response_from_headers(req.headers()); let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { - return self.map_response::(Err(status)); + + return self.map_response::(Err(status), compression); } }; let request = t!(self.intercept_request(request)); - + let compression = Compression::response_from_metadata(request.metadata()); let response = service.call(request).await; - self.map_response(response) + self.map_response(response, compression) } /// Handle a client side streaming gRPC request. @@ -126,11 +124,12 @@ where { let request = self.map_request_streaming(req); let request = t!(self.intercept_request(request)); + let compression = Compression::response_from_metadata(request.metadata()); let response = service .call(request) .await .map(|r| r.map(|m| stream::once(future::ok(m)))); - self.map_response(response) + self.map_response(response, compression) } /// Handle a bi-directional streaming gRPC request. @@ -147,8 +146,9 @@ where { let request = self.map_request_streaming(req); let request = t!(self.intercept_request(request)); + let compression = Compression::response_from_metadata(request.metadata()); let response = service.call(request).await; - self.map_response(response) + self.map_response(response, compression) } async fn map_request_unary( @@ -196,6 +196,7 @@ where fn map_response( &mut self, response: Result, Status>, + compression: Compression, ) -> http::Response where B: TryStream + Send + Sync + 'static, @@ -210,7 +211,7 @@ where http::header::HeaderValue::from_static("application/grpc"), ); - let body = encode_server(self.codec.encoder(), body.into_stream()); + let body = encode_server(self.codec.encoder(), body.into_stream(), compression); http::Response::from_parts(parts, BoxBody::new(body)) } From 17609bcb002d134451c789e9e802654fc379e2cf Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 17:38:44 +0200 Subject: [PATCH 08/17] fmt --- tonic/benches/decode.rs | 18 +++++++-------- tonic/src/codec/compression/compression.rs | 23 +++++++++++++++----- tonic/src/codec/compression/compressors.rs | 5 +++-- tonic/src/codec/compression/decompression.rs | 13 ++--------- tonic/src/codec/compression/gzip.rs | 2 +- tonic/src/codec/compression/mod.rs | 4 +--- tonic/src/codec/encode.rs | 8 +++++-- tonic/src/codec/mod.rs | 2 +- tonic/src/server/grpc.rs | 16 +++++++++----- 9 files changed, 50 insertions(+), 41 deletions(-) diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 3ab0640cd..8f760630c 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -122,7 +122,8 @@ fn make_payload(message_length: usize, message_count: usize, encoding: &Option { use bytes::buf::BufMutExt; - let mut reader = flate2::read::GzEncoder::new(&raw_msg[..], flate2::Compression::best()); + let mut reader = + flate2::read::GzEncoder::new(&raw_msg[..], flate2::Compression::best()); let mut writer = BytesMut::new().writer(); std::io::copy(&mut reader, &mut writer).expect("copy"); @@ -133,14 +134,15 @@ fn make_payload(message_length: usize, message_count: usize, encoding: &Option { - panic!("Encoding {} isn't supported", encoding) - } + Some(encoding) => panic!("Encoding {} isn't supported", encoding), }; for _ in 0..message_count { buf.reserve(msg_buf.len() + 5); - buf.put_u8(match encoding { Some(_) => 1, None => 0}); + buf.put_u8(match encoding { + Some(_) => 1, + None => 0, + }); buf.put_u32(msg_buf.len() as u32); buf.put(&msg_buf[..]); } @@ -262,8 +264,4 @@ benchmark_main!( ); #[cfg(not(feature = "gzip"))] -benchmark_main!( - chunk_size, - message_size, - message_count -); +benchmark_main!(chunk_size, message_size, message_count); diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index db9b562da..43db0c6a5 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -5,7 +5,10 @@ use tracing::debug; use crate::metadata::MetadataMap; -use super::{Compressor, compressors::{self, IDENTITY}}; +use super::{ + compressors::{self, IDENTITY}, + Compressor, +}; pub(crate) const BUFFER_SIZE: usize = 8 * 1024; pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; @@ -17,19 +20,27 @@ pub(crate) struct Compression { impl Debug for Compression { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Compression") - .field("compressor", &self.compressor.map(|c| c.name()).unwrap_or(IDENTITY)) - .finish() + .field( + "compressor", + &self.compressor.map(|c| c.name()).unwrap_or(IDENTITY), + ) + .finish() } } fn parse_accept_encoding_header(value: &str) -> Vec<&str> { - value.split(",").map(|v| v.trim()).filter(|v| !v.is_empty()).collect::>() + value + .split(",") + .map(|v| v.trim()) + .filter(|v| !v.is_empty()) + .collect::>() } fn first_supported_compressor(accepted: &Vec<&str>) -> Option<&'static Box> { - accepted.iter() + accepted + .iter() .filter(|name| **name != IDENTITY) - .filter_map(|name|compressors::get(name)) + .filter_map(|name| compressors::get(name)) .next() } diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index ba1902078..e5b7f4b33 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -107,8 +107,9 @@ impl Compressor for IdentityCompressor { } } -static BOXED_IDENTITY_COMPRESSOR: Lazy> = Lazy::new(|| Box::new(IdentityCompressor::default())); +static BOXED_IDENTITY_COMPRESSOR: Lazy> = + Lazy::new(|| Box::new(IdentityCompressor::default())); pub(crate) fn identity() -> &'static Box { &BOXED_IDENTITY_COMPRESSOR -} \ No newline at end of file +} diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index 0b67d5c69..cada9e1b8 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -36,16 +36,6 @@ impl Decompression { } } - /// Clear the buffer and reserve an estimated number of bytes - fn prepare_decompress_buf(buffer: &mut BytesMut, estimated_len: usize) { - buffer.clear(); - - if buffer.capacity() < estimated_len { - let capacity = ((estimated_len / BUFFER_SIZE) + 1) * BUFFER_SIZE; - buffer.reserve(capacity) - } - } - /// Find a compressor in the registry for the current encoding fn get_compressor(&self) -> Result<&Box, DecompressionError> { match &self.encoding { @@ -71,7 +61,8 @@ impl Decompression { ) -> Result<(), DecompressionError> { let compressor = self.get_compressor()?; - out_buffer.reserve(((compressor.estimate_decompressed_len(len) / BUFFER_SIZE) + 1) * BUFFER_SIZE); + out_buffer + .reserve(((compressor.estimate_decompressed_len(len) / BUFFER_SIZE) + 1) * BUFFER_SIZE); compressor.decompress(in_buffer, out_buffer, len)?; in_buffer.advance(len); diff --git a/tonic/src/codec/compression/gzip.rs b/tonic/src/codec/compression/gzip.rs index bbb0cfdd9..fa671c1de 100644 --- a/tonic/src/codec/compression/gzip.rs +++ b/tonic/src/codec/compression/gzip.rs @@ -7,7 +7,7 @@ use flate2::read::{GzDecoder, GzEncoder}; /// Compress using GZIP #[derive(Debug)] pub(crate) struct GZipCompressor { - compression_level: flate2::Compression + compression_level: flate2::Compression, } impl GZipCompressor { diff --git a/tonic/src/codec/compression/mod.rs b/tonic/src/codec/compression/mod.rs index acf0e503b..b89e08aa7 100644 --- a/tonic/src/codec/compression/mod.rs +++ b/tonic/src/codec/compression/mod.rs @@ -1,14 +1,12 @@ mod bufwriter; -mod compressors; mod compression; +mod compressors; mod decompression; mod errors; #[cfg(feature = "gzip")] mod gzip; -use bytes::BytesMut; - pub(crate) use self::compressors::Compressor; #[doc(hidden)] diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 873110b38..b698adcb4 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,4 +1,4 @@ -use super::{EncodeBuf, Encoder, compression::Compression}; +use super::{compression::Compression, EncodeBuf, Encoder}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; use futures_core::{Stream, TryStream}; @@ -41,7 +41,11 @@ where EncodeBody::new_client(stream) } -fn encode(mut encoder: T, source: U, compression: Compression) -> impl TryStream +fn encode( + mut encoder: T, + source: U, + compression: Compression, +) -> impl TryStream where T: Encoder, U: Stream>, diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index 896c04512..3daa59c3a 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -12,9 +12,9 @@ mod prost; use std::io; +pub(crate) use self::compression::Compression; #[doc(hidden)] pub use self::compression::Decompression; -pub(crate) use self::compression::Compression; pub use self::decode::Streaming; pub(crate) use self::encode::{encode_client, encode_server}; #[cfg(feature = "prost")] diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 1bc91d73d..3699e07a7 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -1,4 +1,10 @@ -use crate::{Code, Request, Status, body::BoxBody, codec::{Codec, Compression, Decompression, Streaming, encode_server}, interceptor::Interceptor, server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}}; +use crate::{ + body::BoxBody, + codec::{encode_server, Codec, Compression, Decompression, Streaming}, + interceptor::Interceptor, + server::{ClientStreamingService, ServerStreamingService, StreamingService, UnaryService}, + Code, Request, Status, +}; use futures_core::TryStream; use futures_util::{future, stream, TryStreamExt}; use http_body::Body; @@ -66,9 +72,10 @@ where Ok(r) => r, Err(status) => { return self - .map_response::>>>(Err( - status, - ), compression); + .map_response::>>>( + Err(status), + compression, + ); } }; @@ -99,7 +106,6 @@ where let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { - return self.map_response::(Err(status), compression); } }; From a14b4c43657999ce07c5edaa22792bc1b53f1557 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 18:20:10 +0200 Subject: [PATCH 09/17] Set the compression header when answering compressed --- examples/Cargo.toml | 2 +- tonic/src/client/grpc.rs | 4 +++- tonic/src/codec/compression/compression.rs | 24 ++++++++++++++++---- tonic/src/codec/compression/decompression.rs | 3 +-- tonic/src/codec/compression/mod.rs | 2 ++ tonic/src/codec/prost.rs | 2 +- tonic/src/server/grpc.rs | 1 + 7 files changed, 28 insertions(+), 10 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 6edecbed6..531ca9109 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -139,7 +139,7 @@ name = "hyper-warp-multiplex-server" path = "src/hyper_warp_multiplex/server.rs" [dependencies] -tonic = { path = "../tonic", features = ["tls"] } +tonic = { path = "../tonic", features = ["tls", "gzip"] } prost = "0.6" tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 36ee7c1b7..70ccb2159 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -159,11 +159,13 @@ impl Grpc { let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri"); + let compression = Compression::disabled(); let request = request - .map(|s| encode_client(codec.encoder(), s, Compression::new_request())) + .map(|s| encode_client(codec.encoder(), s, compression.clone())) .map(BoxBody::new); let mut request = request.into_http(uri); + compression.set_headers(request.headers_mut()); // Add the gRPC related HTTP headers request diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index 43db0c6a5..5703560d0 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -1,18 +1,17 @@ use std::{fmt::Debug, io}; use bytes::{Buf, BytesMut}; +use http::HeaderValue; use tracing::debug; use crate::metadata::MetadataMap; -use super::{ - compressors::{self, IDENTITY}, - Compressor, -}; +use super::{Compressor, ENCODING_HEADER, compressors::{self, IDENTITY}}; pub(crate) const BUFFER_SIZE: usize = 8 * 1024; pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; +#[derive(Clone)] pub(crate) struct Compression { compressor: Option<&'static Box>, } @@ -45,10 +44,12 @@ fn first_supported_compressor(accepted: &Vec<&str>) -> Option<&'static Box Compression { + /// Create an instance of compression that doesn't compress anything + pub(crate) fn disabled() -> Compression { Compression { compressor: None } } + /// Create an instance of compression from GRPC metadata pub(crate) fn response_from_metadata(request_metadata: &MetadataMap) -> Compression { let accept_encoding_header = request_metadata .get(ACCEPT_ENCODING_HEADER) @@ -60,6 +61,7 @@ impl Compression { Compression { compressor } } + /// Create an instance of compression from HTTP headers pub(crate) fn response_from_headers(request_headers: &http::HeaderMap) -> Compression { let accept_encoding_header = request_headers .get(ACCEPT_ENCODING_HEADER) @@ -71,6 +73,7 @@ impl Compression { Compression { compressor } } + /// Get if compression is enabled pub(crate) fn is_enabled(&self) -> bool { self.compressor.is_some() } @@ -94,6 +97,17 @@ impl Compression { out_buffer.len(), compressor.name() ); + Ok(()) } + + /// Set the `grpc-encoding` header with the compressor name + pub(crate) fn set_headers(&self, headers: &mut http::HeaderMap) { + match self.compressor { + None => {}, + Some(compressor) => { + headers.insert(ENCODING_HEADER, HeaderValue::from_static(compressor.name())); + } + } + } } diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index cada9e1b8..e7388629a 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -1,10 +1,9 @@ use bytes::{Buf, BytesMut}; use tracing::debug; -use super::{compressors, Compressor, DecompressionError}; +use super::{Compressor, DecompressionError, ENCODING_HEADER, compressors}; const BUFFER_SIZE: usize = 8 * 1024; -const ENCODING_HEADER: &str = "grpc-encoding"; /// Information related to the decompression of a request or response #[derive(Debug)] diff --git a/tonic/src/codec/compression/mod.rs b/tonic/src/codec/compression/mod.rs index b89e08aa7..a6105540e 100644 --- a/tonic/src/codec/compression/mod.rs +++ b/tonic/src/codec/compression/mod.rs @@ -4,6 +4,8 @@ mod compressors; mod decompression; mod errors; +pub(crate) const ENCODING_HEADER: &str = "grpc-encoding"; + #[cfg(feature = "gzip")] mod gzip; diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index 2135b8211..0c7aad835 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -121,7 +121,7 @@ mod tests { let messages = std::iter::repeat(Ok::<_, Status>(msg)).take(10000); let source = futures_util::stream::iter(messages); - let body = encode_server(encoder, source); + let body = encode_server(encoder, source, Compression::disabled()); futures_util::pin_mut!(body); diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 3699e07a7..5a64fc41c 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -216,6 +216,7 @@ where http::header::CONTENT_TYPE, http::header::HeaderValue::from_static("application/grpc"), ); + compression.set_headers(&mut parts.headers); let body = encode_server(self.codec.encoder(), body.into_stream(), compression); From c32f8106e23318d05f98001689d4d4eaf99b94e0 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 18:35:23 +0200 Subject: [PATCH 10/17] Set accept-encoding headers on client --- tonic/src/client/grpc.rs | 2 +- tonic/src/codec/compression/compression.rs | 9 ++++++--- tonic/src/codec/compression/compressors.rs | 4 ++++ tonic/src/codec/compression/mod.rs | 1 + tonic/src/server/grpc.rs | 2 +- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 70ccb2159..d1554abae 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -165,7 +165,7 @@ impl Grpc { .map(BoxBody::new); let mut request = request.into_http(uri); - compression.set_headers(request.headers_mut()); + compression.set_headers(request.headers_mut(), true); // Add the gRPC related HTTP headers request diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index 5703560d0..28dbbcf12 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -6,10 +6,9 @@ use tracing::debug; use crate::metadata::MetadataMap; -use super::{Compressor, ENCODING_HEADER, compressors::{self, IDENTITY}}; +use super::{ACCEPT_ENCODING_HEADER, Compressor, ENCODING_HEADER, compressors::{self, IDENTITY}}; pub(crate) const BUFFER_SIZE: usize = 8 * 1024; -pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; #[derive(Clone)] pub(crate) struct Compression { @@ -102,7 +101,11 @@ impl Compression { } /// Set the `grpc-encoding` header with the compressor name - pub(crate) fn set_headers(&self, headers: &mut http::HeaderMap) { + pub(crate) fn set_headers(&self, headers: &mut http::HeaderMap, set_accept_encoding: bool) { + if set_accept_encoding { + headers.insert(ACCEPT_ENCODING_HEADER, HeaderValue::from_str(&compressors::get_accept_encoding_header()).unwrap()); + } + match self.compressor { None => {}, Some(compressor) => { diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index e5b7f4b33..e5bc69776 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -59,6 +59,10 @@ pub(crate) trait Compressor: Sync + Send { } } +pub(crate) fn get_accept_encoding_header() -> String { + COMPRESSORS.keys().map(|s| &**s).filter(|name| *name != IDENTITY).collect::>().join(",") +} + /// The identity compressor doesn't compress #[derive(Debug)] struct IdentityCompressor {} diff --git a/tonic/src/codec/compression/mod.rs b/tonic/src/codec/compression/mod.rs index a6105540e..dc41f7402 100644 --- a/tonic/src/codec/compression/mod.rs +++ b/tonic/src/codec/compression/mod.rs @@ -5,6 +5,7 @@ mod decompression; mod errors; pub(crate) const ENCODING_HEADER: &str = "grpc-encoding"; +pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; #[cfg(feature = "gzip")] mod gzip; diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 5a64fc41c..4d510ad0c 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -216,7 +216,7 @@ where http::header::CONTENT_TYPE, http::header::HeaderValue::from_static("application/grpc"), ); - compression.set_headers(&mut parts.headers); + compression.set_headers(&mut parts.headers, false); let body = encode_server(self.codec.encoder(), body.into_stream(), compression); From 63573037a90bdaeebbf55da34c179f035417d546 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 18:35:47 +0200 Subject: [PATCH 11/17] fmt --- tonic/src/codec/compression/compression.rs | 12 +++++++++--- tonic/src/codec/compression/compressors.rs | 7 ++++++- tonic/src/codec/compression/decompression.rs | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index 28dbbcf12..b054178f4 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -6,7 +6,10 @@ use tracing::debug; use crate::metadata::MetadataMap; -use super::{ACCEPT_ENCODING_HEADER, Compressor, ENCODING_HEADER, compressors::{self, IDENTITY}}; +use super::{ + compressors::{self, IDENTITY}, + Compressor, ACCEPT_ENCODING_HEADER, ENCODING_HEADER, +}; pub(crate) const BUFFER_SIZE: usize = 8 * 1024; @@ -103,11 +106,14 @@ impl Compression { /// Set the `grpc-encoding` header with the compressor name pub(crate) fn set_headers(&self, headers: &mut http::HeaderMap, set_accept_encoding: bool) { if set_accept_encoding { - headers.insert(ACCEPT_ENCODING_HEADER, HeaderValue::from_str(&compressors::get_accept_encoding_header()).unwrap()); + headers.insert( + ACCEPT_ENCODING_HEADER, + HeaderValue::from_str(&compressors::get_accept_encoding_header()).unwrap(), + ); } match self.compressor { - None => {}, + None => {} Some(compressor) => { headers.insert(ENCODING_HEADER, HeaderValue::from_static(compressor.name())); } diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index e5bc69776..c67f19918 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -60,7 +60,12 @@ pub(crate) trait Compressor: Sync + Send { } pub(crate) fn get_accept_encoding_header() -> String { - COMPRESSORS.keys().map(|s| &**s).filter(|name| *name != IDENTITY).collect::>().join(",") + COMPRESSORS + .keys() + .map(|s| &**s) + .filter(|name| *name != IDENTITY) + .collect::>() + .join(",") } /// The identity compressor doesn't compress diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index e7388629a..3f2c3f293 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -1,7 +1,7 @@ use bytes::{Buf, BytesMut}; use tracing::debug; -use super::{Compressor, DecompressionError, ENCODING_HEADER, compressors}; +use super::{compressors, Compressor, DecompressionError, ENCODING_HEADER}; const BUFFER_SIZE: usize = 8 * 1024; From c2e315c98b667e5e3e3304053da3eb61573c138f Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Fri, 16 Oct 2020 18:43:30 +0200 Subject: [PATCH 12/17] Match go decompression error --- tonic/src/codec/compression/decompression.rs | 6 +++--- tonic/src/codec/decode.rs | 9 ++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index 3f2c3f293..03d5cbb6e 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -27,11 +27,11 @@ impl Decompression { Decompression::new(encoding) } - /// Get if the current encoding is the no-op "identity" one - pub fn is_identity(&self) -> bool { + /// Get if the current encoding is the no-op "identity" one or no decompression is configured + pub fn is_identity_or_none(&self) -> bool { match &self.encoding { Some(encoding) => encoding == compressors::IDENTITY, - None => false, + None => true, } } diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 09f34fe82..ef5c45f30 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -207,7 +207,14 @@ impl Streaming { return Ok(None); } - let decode_result = if *compression && !self.decompression.is_identity() { + let decode_result = if *compression { + if self.decompression.is_identity_or_none() { + return Err(Status::new( + Code::Internal, + "compressed flag set with identity or empty encoding", + )); + } + if let Err(err) = self.decompression .decompress(&mut self.buf, &mut self.decompress_buf, *len) From f1a63f0581212c84c276bc8a499050b830fd1b98 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Mon, 19 Oct 2020 09:39:55 +0200 Subject: [PATCH 13/17] Resolve compressor at start --- tonic/src/codec/compression/bufwriter.rs | 1 + tonic/src/codec/compression/compression.rs | 23 +++--- tonic/src/codec/compression/compressors.rs | 79 +++----------------- tonic/src/codec/compression/decompression.rs | 78 ++++++++++--------- tonic/src/codec/compression/errors.rs | 37 +++++++++ tonic/src/codec/decode.rs | 7 -- 6 files changed, 103 insertions(+), 122 deletions(-) diff --git a/tonic/src/codec/compression/bufwriter.rs b/tonic/src/codec/compression/bufwriter.rs index 9aad3d39a..4c4e0cd67 100644 --- a/tonic/src/codec/compression/bufwriter.rs +++ b/tonic/src/codec/compression/bufwriter.rs @@ -8,6 +8,7 @@ pub(crate) struct Writer<'a, B> { buf: &'a mut B, } +#[cfg(feature = "gzip")] pub(crate) fn new<'a, B>(buf: &'a mut B) -> Writer<'a, B> { Writer { buf } } diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index b054178f4..c625a6573 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -1,15 +1,13 @@ -use std::{fmt::Debug, io}; - -use bytes::{Buf, BytesMut}; -use http::HeaderValue; -use tracing::debug; - -use crate::metadata::MetadataMap; - use super::{ compressors::{self, IDENTITY}, + errors::CompressionError, Compressor, ACCEPT_ENCODING_HEADER, ENCODING_HEADER, }; +use crate::metadata::MetadataMap; +use bytes::{Buf, BytesMut}; +use http::HeaderValue; +use std::fmt::Debug; +use tracing::debug; pub(crate) const BUFFER_SIZE: usize = 8 * 1024; @@ -86,10 +84,11 @@ impl Compression { in_buffer: &mut BytesMut, out_buffer: &mut BytesMut, len: usize, - ) -> Result<(), io::Error> { - out_buffer.reserve(((len / BUFFER_SIZE) + 1) * BUFFER_SIZE); + ) -> Result<(), CompressionError> { + let capacity = ((len / BUFFER_SIZE) + 1) * BUFFER_SIZE; + out_buffer.reserve(capacity); - let compressor = self.compressor.unwrap_or_else(compressors::identity); + let compressor = self.compressor.ok_or(CompressionError::NoCompression)?; compressor.compress(in_buffer, out_buffer, len)?; in_buffer.advance(len); @@ -108,7 +107,7 @@ impl Compression { if set_accept_encoding { headers.insert( ACCEPT_ENCODING_HEADER, - HeaderValue::from_str(&compressors::get_accept_encoding_header()).unwrap(), + HeaderValue::from_str(&compressors::get_accept_encoding_header()).expect("All encoding names should be ASCII"), ); } diff --git a/tonic/src/codec/compression/compressors.rs b/tonic/src/codec/compression/compressors.rs index c67f19918..8da32861b 100644 --- a/tonic/src/codec/compression/compressors.rs +++ b/tonic/src/codec/compression/compressors.rs @@ -1,25 +1,26 @@ -use std::{collections::HashMap, io}; - -use super::bufwriter; use bytes::BytesMut; use once_cell::sync::Lazy; +use std::{collections::HashMap, io}; pub(crate) const IDENTITY: &str = "identity"; /// List of known compressors static COMPRESSORS: Lazy>> = Lazy::new(|| { - let mut m = HashMap::new(); + #[cfg(feature = "gzip")] + { + let mut m = HashMap::new(); - let mut add = |compressor: Box| { - m.insert(compressor.name().to_string(), compressor); - }; + let mut add = |compressor: Box| { + m.insert(compressor.name().to_string(), compressor); + }; - add(Box::new(IdentityCompressor::default())); + add(Box::new(super::gzip::GZipCompressor::default())); - #[cfg(feature = "gzip")] - add(Box::new(super::gzip::GZipCompressor::default())); + m + } - m + #[cfg(not(feature = "gzip"))] + HashMap::new() }); /// Get a compressor from it's name @@ -63,62 +64,6 @@ pub(crate) fn get_accept_encoding_header() -> String { COMPRESSORS .keys() .map(|s| &**s) - .filter(|name| *name != IDENTITY) .collect::>() .join(",") } - -/// The identity compressor doesn't compress -#[derive(Debug)] -struct IdentityCompressor {} - -impl Default for IdentityCompressor { - fn default() -> Self { - Self {} - } -} - -impl Compressor for IdentityCompressor { - fn name(&self) -> &'static str { - IDENTITY - } - - fn decompress( - &self, - in_buffer: &BytesMut, - out_buffer: &mut BytesMut, - len: usize, - ) -> io::Result<()> { - let mut in_reader = &in_buffer[0..len]; - let mut out_writer = bufwriter::new(out_buffer); - - std::io::copy(&mut in_reader, &mut out_writer)?; - - Ok(()) - } - - fn compress( - &self, - in_buffer: &BytesMut, - out_buffer: &mut BytesMut, - len: usize, - ) -> io::Result<()> { - let mut in_reader = &in_buffer[0..len]; - let mut out_writer = bufwriter::new(out_buffer); - - std::io::copy(&mut in_reader, &mut out_writer)?; - - Ok(()) - } - - fn estimate_decompressed_len(&self, compressed_len: usize) -> usize { - compressed_len - } -} - -static BOXED_IDENTITY_COMPRESSOR: Lazy> = - Lazy::new(|| Box::new(IdentityCompressor::default())); - -pub(crate) fn identity() -> &'static Box { - &BOXED_IDENTITY_COMPRESSOR -} diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index 03d5cbb6e..74e2f1c76 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -1,53 +1,45 @@ use bytes::{Buf, BytesMut}; +use std::fmt::Debug; use tracing::debug; -use super::{compressors, Compressor, DecompressionError, ENCODING_HEADER}; +use super::{ + compressors::{self, IDENTITY}, + Compressor, DecompressionError, ENCODING_HEADER, +}; const BUFFER_SIZE: usize = 8 * 1024; /// Information related to the decompression of a request or response -#[derive(Debug)] pub struct Decompression { encoding: Option, + compressor: Option<&'static Box>, } -impl Decompression { - /// Create a `Decompression` structure - pub fn new(encoding: Option) -> Decompression { - Decompression { encoding } +impl Debug for Decompression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let encoding = self.encoding.as_ref().map(|e| &e[..]).unwrap_or(""); + f.debug_struct("Compression") + .field("encoding", &encoding) + .field( + "compressor", + &self.compressor.map(|c| c.name()).unwrap_or(""), + ) + .finish() } +} +impl Decompression { /// Create a `Decompression` structure from http headers pub fn from_headers(metadata: &http::HeaderMap) -> Decompression { let encoding = metadata .get(ENCODING_HEADER) .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()); - - Decompression::new(encoding) - } + .and_then(|v| if v == IDENTITY { None } else { Some(v) }); + let compressor = encoding.and_then(compressors::get); - /// Get if the current encoding is the no-op "identity" one or no decompression is configured - pub fn is_identity_or_none(&self) -> bool { - match &self.encoding { - Some(encoding) => encoding == compressors::IDENTITY, - None => true, - } - } - - /// Find a compressor in the registry for the current encoding - fn get_compressor(&self) -> Result<&Box, DecompressionError> { - match &self.encoding { - None => { - Ok(compressors::get(compressors::IDENTITY).expect("Identity is always present")) - } - Some(encoding) => match compressors::get(encoding) { - Some(compressor) => Ok(compressor), - None => Err(DecompressionError::NotFound { - requested: encoding.clone(), - known: compressors::names(), - }), - }, + Decompression { + encoding: encoding.map(|v| v.to_string()), + compressor, } } @@ -58,10 +50,21 @@ impl Decompression { out_buffer: &mut BytesMut, len: usize, ) -> Result<(), DecompressionError> { - let compressor = self.get_compressor()?; + let compressor = self.compressor.ok_or_else(|| { + match &self.encoding { + // Asked to decompress but not compression was specified + None => DecompressionError::NoCompression, + // Asked to decompress but the decompressor wasn't found + Some(encoding) => DecompressionError::NotFound { + requested: encoding.clone(), + known: compressors::names(), + }, + } + })?; - out_buffer - .reserve(((compressor.estimate_decompressed_len(len) / BUFFER_SIZE) + 1) * BUFFER_SIZE); + let capacity = + ((compressor.estimate_decompressed_len(len) / BUFFER_SIZE) + 1) * BUFFER_SIZE; + out_buffer.reserve(capacity); compressor.decompress(in_buffer, out_buffer, len)?; in_buffer.advance(len); @@ -69,7 +72,7 @@ impl Decompression { "Decompressed {} bytes into {} bytes using {:?}", len, out_buffer.len(), - self.encoding + compressor.name() ); Ok(()) } @@ -77,6 +80,9 @@ impl Decompression { impl Default for Decompression { fn default() -> Self { - Decompression { encoding: None } + Decompression { + encoding: None, + compressor: None, + } } } diff --git a/tonic/src/codec/compression/errors.rs b/tonic/src/codec/compression/errors.rs index cac39bd98..93203fdd1 100644 --- a/tonic/src/codec/compression/errors.rs +++ b/tonic/src/codec/compression/errors.rs @@ -4,6 +4,7 @@ pub enum DecompressionError { requested: String, known: Vec, }, + NoCompression, Failed(std::io::Error), } @@ -19,6 +20,9 @@ impl std::fmt::Display for DecompressionError { ) } DecompressionError::Failed(error) => write!(f, "Decompression failed: {}", error), + DecompressionError::NoCompression => { + write!(f, "Compressed flag set with identity or empty encoding") + } } } } @@ -26,6 +30,7 @@ impl std::fmt::Display for DecompressionError { impl std::error::Error for DecompressionError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match &self { + DecompressionError::NoCompression => None, DecompressionError::NotFound { .. } => None, DecompressionError::Failed(err) => Some(err), } @@ -37,3 +42,35 @@ impl From for DecompressionError { DecompressionError::Failed(error) } } + +#[derive(Debug)] +pub(crate) enum CompressionError { + NoCompression, + Failed(std::io::Error), +} + +impl std::fmt::Display for CompressionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + CompressionError::Failed(error) => write!(f, "Compression failed: {}", error), + CompressionError::NoCompression => { + write!(f, "Compression attempted without being configured") + } + } + } +} + +impl std::error::Error for CompressionError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match &self { + CompressionError::NoCompression { .. } => None, + CompressionError::Failed(err) => Some(err), + } + } +} + +impl From for CompressionError { + fn from(error: std::io::Error) -> Self { + CompressionError::Failed(error) + } +} diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index ef5c45f30..80d4eb645 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -208,13 +208,6 @@ impl Streaming { } let decode_result = if *compression { - if self.decompression.is_identity_or_none() { - return Err(Status::new( - Code::Internal, - "compressed flag set with identity or empty encoding", - )); - } - if let Err(err) = self.decompression .decompress(&mut self.buf, &mut self.decompress_buf, *len) From 89a61a83c47cbc63187172f2b2d3accfe55bc262 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Mon, 19 Oct 2020 09:41:14 +0200 Subject: [PATCH 14/17] fmt --- tonic/src/codec/compression/compression.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index c625a6573..2419f5989 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -107,7 +107,8 @@ impl Compression { if set_accept_encoding { headers.insert( ACCEPT_ENCODING_HEADER, - HeaderValue::from_str(&compressors::get_accept_encoding_header()).expect("All encoding names should be ASCII"), + HeaderValue::from_str(&compressors::get_accept_encoding_header()) + .expect("All encoding names should be ASCII"), ); } From d92ce879f71d8bfcbde070e50acd2aadcdd783c3 Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Mon, 19 Oct 2020 09:59:16 +0200 Subject: [PATCH 15/17] Fix bench build --- tonic/benches/decode.rs | 24 ++++++++++---------- tonic/src/codec/compression/decompression.rs | 16 +++++++++---- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 8f760630c..3c4c218d0 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -19,7 +19,7 @@ macro_rules! bench { .build() .expect("runtime"); - let payload = make_payload($message_size, $message_count, &$encoding); + let payload = make_payload($message_size, $message_count, $encoding); let body = MockBody::new(payload, $chunk_size); b.bytes = body.len() as u64; @@ -27,7 +27,7 @@ macro_rules! bench { rt.block_on(async { let decoder = MockDecoder::new($message_size); - let decompression = Decompression::new($encoding); + let decompression = Decompression::from_encoding($encoding); let mut stream = Streaming::new_request(decoder, body.clone(), decompression); let mut count = 0; @@ -113,7 +113,7 @@ impl Decoder for MockDecoder { } } -fn make_payload(message_length: usize, message_count: usize, encoding: &Option) -> Bytes { +fn make_payload(message_length: usize, message_count: usize, encoding: Option<&str>) -> Bytes { let mut buf = BytesMut::new(); let raw_msg = vec![97u8; message_length]; @@ -166,14 +166,14 @@ bench!(message_count_10, 500, 505, 10); bench!(message_count_20, 500, 505, 20); // gzip change body chunk size only -bench!(chunk_size_100_gzip, 1_000, 100, 1, Some("gzip".to_string())); -bench!(chunk_size_500_gzip, 1_000, 500, 1, Some("gzip".to_string())); +bench!(chunk_size_100_gzip, 1_000, 100, 1, Some("gzip")); +bench!(chunk_size_500_gzip, 1_000, 500, 1, Some("gzip")); bench!( chunk_size_1005_gzip, 1_000, 1_005, 1, - Some("gzip".to_string()) + Some("gzip") ); // gzip change message size only @@ -182,38 +182,38 @@ bench!( 1_000, 1_005, 2, - Some("gzip".to_string()) + Some("gzip") ); bench!( message_size_5k_gzip, 5_000, 1_005, 2, - Some("gzip".to_string()) + Some("gzip") ); bench!( message_size_10k_gzip, 10_000, 1_005, 2, - Some("gzip".to_string()) + Some("gzip") ); // gzip change message count only -bench!(message_count_1_gzip, 500, 505, 1, Some("gzip".to_string())); +bench!(message_count_1_gzip, 500, 505, 1, Some("gzip")); bench!( message_count_10_gzip, 500, 505, 10, - Some("gzip".to_string()) + Some("gzip") ); bench!( message_count_20_gzip, 500, 505, 20, - Some("gzip".to_string()) + Some("gzip") ); benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005); diff --git a/tonic/src/codec/compression/decompression.rs b/tonic/src/codec/compression/decompression.rs index 74e2f1c76..3cb2d64a9 100644 --- a/tonic/src/codec/compression/decompression.rs +++ b/tonic/src/codec/compression/decompression.rs @@ -29,18 +29,24 @@ impl Debug for Decompression { } impl Decompression { + /// Create a `Decompression` structure from an encoding name + pub fn from_encoding(encoding: Option<&str>) -> Decompression { + let compressor = encoding.and_then(compressors::get); + + Decompression { + encoding: encoding.map(|v| v.to_string()), + compressor, + } + } + /// Create a `Decompression` structure from http headers pub fn from_headers(metadata: &http::HeaderMap) -> Decompression { let encoding = metadata .get(ENCODING_HEADER) .and_then(|v| v.to_str().ok()) .and_then(|v| if v == IDENTITY { None } else { Some(v) }); - let compressor = encoding.and_then(compressors::get); - Decompression { - encoding: encoding.map(|v| v.to_string()), - compressor, - } + Decompression::from_encoding(encoding) } /// Decompress `len` bytes from `in_buffer` into `out_buffer` From 166666487a41f77227d987b079e7bc0f11dbcdef Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Tue, 20 Oct 2020 11:53:52 +0200 Subject: [PATCH 16/17] Don't compress on errors --- tonic/src/codec/compression/compression.rs | 12 ------------ tonic/src/server/grpc.rs | 6 ++---- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index 2419f5989..444b76d17 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -61,18 +61,6 @@ impl Compression { Compression { compressor } } - /// Create an instance of compression from HTTP headers - pub(crate) fn response_from_headers(request_headers: &http::HeaderMap) -> Compression { - let accept_encoding_header = request_headers - .get(ACCEPT_ENCODING_HEADER) - .and_then(|v| v.to_str().ok()) - .unwrap_or(""); - - let parsed = parse_accept_encoding_header(accept_encoding_header); - let compressor = first_supported_compressor(&parsed); - Compression { compressor } - } - /// Get if compression is enabled pub(crate) fn is_enabled(&self) -> bool { self.compressor.is_some() diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 4d510ad0c..ab995c159 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -67,14 +67,13 @@ where B: Body + Send + Sync + 'static, B::Error: Into + Send, { - let compression = Compression::response_from_headers(req.headers()); let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { return self .map_response::>>>( Err(status), - compression, + Compression::disabled(), ); } }; @@ -102,11 +101,10 @@ where B: Body + Send + Sync + 'static, B::Error: Into + Send, { - let compression = Compression::response_from_headers(req.headers()); let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { - return self.map_response::(Err(status), compression); + return self.map_response::(Err(status), Compression::disabled()); } }; From 7555d3ebc0147b176d10d11e9a76996274b0717f Mon Sep 17 00:00:00 2001 From: Julien Roncaglia Date: Tue, 20 Oct 2020 17:57:30 +0200 Subject: [PATCH 17/17] Use a more conservative approach to response compression Similar to what grpc-go does, as otherwise it breaks grpc-web --- tonic/benches/decode.rs | 48 +++------------------- tonic/src/codec/compression/compression.rs | 31 ++++---------- tonic/src/server/grpc.rs | 3 +- 3 files changed, 17 insertions(+), 65 deletions(-) diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 3c4c218d0..a4e84ff0d 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -168,53 +168,17 @@ bench!(message_count_20, 500, 505, 20); // gzip change body chunk size only bench!(chunk_size_100_gzip, 1_000, 100, 1, Some("gzip")); bench!(chunk_size_500_gzip, 1_000, 500, 1, Some("gzip")); -bench!( - chunk_size_1005_gzip, - 1_000, - 1_005, - 1, - Some("gzip") -); +bench!(chunk_size_1005_gzip, 1_000, 1_005, 1, Some("gzip")); // gzip change message size only -bench!( - message_size_1k_gzip, - 1_000, - 1_005, - 2, - Some("gzip") -); -bench!( - message_size_5k_gzip, - 5_000, - 1_005, - 2, - Some("gzip") -); -bench!( - message_size_10k_gzip, - 10_000, - 1_005, - 2, - Some("gzip") -); +bench!(message_size_1k_gzip, 1_000, 1_005, 2, Some("gzip")); +bench!(message_size_5k_gzip, 5_000, 1_005, 2, Some("gzip")); +bench!(message_size_10k_gzip, 10_000, 1_005, 2, Some("gzip")); // gzip change message count only bench!(message_count_1_gzip, 500, 505, 1, Some("gzip")); -bench!( - message_count_10_gzip, - 500, - 505, - 10, - Some("gzip") -); -bench!( - message_count_20_gzip, - 500, - 505, - 20, - Some("gzip") -); +bench!(message_count_10_gzip, 500, 505, 10, Some("gzip")); +bench!(message_count_20_gzip, 500, 505, 20, Some("gzip")); benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005); diff --git a/tonic/src/codec/compression/compression.rs b/tonic/src/codec/compression/compression.rs index 444b76d17..cca8a5149 100644 --- a/tonic/src/codec/compression/compression.rs +++ b/tonic/src/codec/compression/compression.rs @@ -27,22 +27,6 @@ impl Debug for Compression { } } -fn parse_accept_encoding_header(value: &str) -> Vec<&str> { - value - .split(",") - .map(|v| v.trim()) - .filter(|v| !v.is_empty()) - .collect::>() -} - -fn first_supported_compressor(accepted: &Vec<&str>) -> Option<&'static Box> { - accepted - .iter() - .filter(|name| **name != IDENTITY) - .filter_map(|name| compressors::get(name)) - .next() -} - impl Compression { /// Create an instance of compression that doesn't compress anything pub(crate) fn disabled() -> Compression { @@ -51,14 +35,17 @@ impl Compression { /// Create an instance of compression from GRPC metadata pub(crate) fn response_from_metadata(request_metadata: &MetadataMap) -> Compression { - let accept_encoding_header = request_metadata - .get(ACCEPT_ENCODING_HEADER) + // The following implementation is very conservative, and similar to the Golang GRPC implementation. + // Instead of looking at 'grpc-accept-encoding' and potentially compressing the response with a different + // compressor than the one used by the request it uses the same compressor + let request_compressor = request_metadata + .get(ENCODING_HEADER) .and_then(|v| v.to_str().ok()) - .unwrap_or(""); + .and_then(compressors::get); - let parsed = parse_accept_encoding_header(accept_encoding_header); - let compressor = first_supported_compressor(&parsed); - Compression { compressor } + Compression { + compressor: request_compressor, + } } /// Get if compression is enabled diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index ab995c159..df2294ed6 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -104,7 +104,8 @@ where let request = match self.map_request_unary(req).await { Ok(r) => r, Err(status) => { - return self.map_response::(Err(status), Compression::disabled()); + return self + .map_response::(Err(status), Compression::disabled()); } };