Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gzip decompression #476

Closed
wants to merge 17 commits into from
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ name = "hyper-warp-multiplex-server"
path = "src/hyper_warp_multiplex/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
tonic = { path = "../tonic", features = ["tls", "gzip"] }
prost = "0.6"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
4 changes: 3 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ transport = [
tls = ["transport", "tokio-rustls"]
tls-roots = ["tls", "rustls-native-certs"]
prost = ["prost1", "prost-derive"]
gzip = ["flate2"]

# [[bench]]
# name = "bench_main"
Expand All @@ -48,6 +49,8 @@ futures-util = { version = "0.3", default-features = false }
tracing = "0.1"
http = "0.2"
base64 = "0.12"
flate2 = { version = "1.0", optional = true }
once_cell = "1.0"

percent-encoding = "2.0"
tower-service = "0.3"
Expand Down Expand Up @@ -89,4 +92,3 @@ rustdoc-args = ["--cfg", "docsrs"]
[[bench]]
name = "decode"
harness = false

93 changes: 84 additions & 9 deletions tonic/benches/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};
use tonic::{codec::DecodeBuf, codec::Decoder, codec::Decompression, Status, Streaming};

macro_rules! bench {
($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
bench!($name, $message_size, $chunk_size, $message_count, None);
};
($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr, $encoding:expr) => {
fn $name(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.expect("runtime");

let payload = make_payload($message_size, $message_count);
let payload = make_payload($message_size, $message_count, $encoding);
let body = MockBody::new(payload, $chunk_size);
b.bytes = body.len() as u64;

b.iter(|| {
rt.block_on(async {
let decoder = MockDecoder::new($message_size);
let mut stream = Streaming::new_request(decoder, body.clone());

let decompression = Decompression::from_encoding($encoding);
let mut stream = Streaming::new_request(decoder, body.clone(), decompression);

let mut count = 0;
while let Some(msg) = stream.message().await.unwrap() {
Expand Down Expand Up @@ -108,15 +113,38 @@ impl Decoder for MockDecoder {
}
}

fn make_payload(message_length: usize, message_count: usize) -> Bytes {
fn make_payload(message_length: usize, message_count: usize, encoding: Option<&str>) -> Bytes {
let mut buf = BytesMut::new();

let raw_msg = vec![97u8; message_length];

let msg_buf = match encoding {
#[cfg(feature = "gzip")]
Some(encoding) if encoding == "gzip" => {
use bytes::buf::BufMutExt;
let mut reader =
flate2::read::GzEncoder::new(&raw_msg[..], flate2::Compression::best());
let mut writer = BytesMut::new().writer();

std::io::copy(&mut reader, &mut writer).expect("copy");
writer.into_inner()
}
None => {
let mut msg_buf = BytesMut::new();
msg_buf.put(&raw_msg[..]);
msg_buf
}
Some(encoding) => panic!("Encoding {} isn't supported", encoding),
};

for _ in 0..message_count {
let msg = vec![97u8; message_length];
buf.reserve(msg.len() + 5);
buf.put_u8(0);
buf.put_u32(msg.len() as u32);
buf.put(&msg[..]);
buf.reserve(msg_buf.len() + 5);
buf.put_u8(match encoding {
Some(_) => 1,
None => 0,
});
buf.put_u32(msg_buf.len() as u32);
buf.put(&msg_buf[..]);
}

buf.freeze()
Expand All @@ -137,6 +165,21 @@ bench!(message_count_1, 500, 505, 1);
bench!(message_count_10, 500, 505, 10);
bench!(message_count_20, 500, 505, 20);

// gzip change body chunk size only
bench!(chunk_size_100_gzip, 1_000, 100, 1, Some("gzip"));
bench!(chunk_size_500_gzip, 1_000, 500, 1, Some("gzip"));
bench!(chunk_size_1005_gzip, 1_000, 1_005, 1, Some("gzip"));

// gzip change message size only
bench!(message_size_1k_gzip, 1_000, 1_005, 2, Some("gzip"));
bench!(message_size_5k_gzip, 5_000, 1_005, 2, Some("gzip"));
bench!(message_size_10k_gzip, 10_000, 1_005, 2, Some("gzip"));

// gzip change message count only
bench!(message_count_1_gzip, 500, 505, 1, Some("gzip"));
bench!(message_count_10_gzip, 500, 505, 10, Some("gzip"));
bench!(message_count_20_gzip, 500, 505, 20, Some("gzip"));

benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);

benchmark_group!(
Expand All @@ -153,4 +196,36 @@ benchmark_group!(
message_count_20
);

benchmark_group!(
chunk_size_gzip,
chunk_size_100_gzip,
chunk_size_500_gzip,
chunk_size_1005_gzip
);

benchmark_group!(
message_size_gzip,
message_size_1k_gzip,
message_size_5k_gzip,
message_size_10k_gzip
);

benchmark_group!(
message_count_gzip,
message_count_1_gzip,
message_count_10_gzip,
message_count_20_gzip
);

#[cfg(feature = "gzip")]
benchmark_main!(
chunk_size,
message_size,
message_count,
chunk_size_gzip,
message_size_gzip,
message_count_gzip
);

#[cfg(not(feature = "gzip"))]
benchmark_main!(chunk_size, message_size, message_count);
9 changes: 6 additions & 3 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
body::{Body, BoxBody},
client::GrpcService,
codec::{encode_client, Codec, Streaming},
codec::{encode_client, Codec, Compression, Decompression, Streaming},
interceptor::Interceptor,
Code, Request, Response, Status,
};
Expand Down Expand Up @@ -159,11 +159,13 @@ impl<T> Grpc<T> {

let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");

let compression = Compression::disabled();
let request = request
.map(|s| encode_client(codec.encoder(), s))
.map(|s| encode_client(codec.encoder(), s, compression.clone()))
.map(BoxBody::new);

let mut request = request.into_http(uri);
compression.set_headers(request.headers_mut(), true);

// Add the gRPC related HTTP headers
request
Expand Down Expand Up @@ -196,9 +198,10 @@ impl<T> Grpc<T> {
true
};

let decompression = Decompression::from_headers(response.headers());
let response = response.map(|body| {
if expect_additional_trailers {
Streaming::new_response(codec.decoder(), body, status_code)
Streaming::new_response(codec.decoder(), body, status_code, decompression)
} else {
Streaming::new_empty(codec.decoder(), body)
}
Expand Down
27 changes: 27 additions & 0 deletions tonic/src/codec/compression/bufwriter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use bytes::BufMut;

use std::{cmp, io};

/// A `BufMut` adapter which implements `io::Write` for the inner value.
#[derive(Debug)]
pub(crate) struct Writer<'a, B> {
buf: &'a mut B,
}

#[cfg(feature = "gzip")]
pub(crate) fn new<'a, B>(buf: &'a mut B) -> Writer<'a, B> {
Writer { buf }
}

impl<'a, B: BufMut + Sized> io::Write for Writer<'a, B> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
let n = cmp::min(self.buf.remaining_mut(), src.len());

self.buf.put(&src[0..n]);
Ok(n)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
97 changes: 97 additions & 0 deletions tonic/src/codec/compression/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use super::{
compressors::{self, IDENTITY},
errors::CompressionError,
Compressor, ACCEPT_ENCODING_HEADER, ENCODING_HEADER,
};
use crate::metadata::MetadataMap;
use bytes::{Buf, BytesMut};
use http::HeaderValue;
use std::fmt::Debug;
use tracing::debug;

pub(crate) const BUFFER_SIZE: usize = 8 * 1024;

#[derive(Clone)]
pub(crate) struct Compression {
compressor: Option<&'static Box<dyn Compressor>>,
}

impl Debug for Compression {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Compression")
.field(
"compressor",
&self.compressor.map(|c| c.name()).unwrap_or(IDENTITY),
)
.finish()
}
}

impl Compression {
/// Create an instance of compression that doesn't compress anything
pub(crate) fn disabled() -> Compression {
Compression { compressor: None }
}

/// Create an instance of compression from GRPC metadata
pub(crate) fn response_from_metadata(request_metadata: &MetadataMap) -> Compression {
// The following implementation is very conservative, and similar to the Golang GRPC implementation.
// Instead of looking at 'grpc-accept-encoding' and potentially compressing the response with a different
// compressor than the one used by the request it uses the same compressor
let request_compressor = request_metadata
.get(ENCODING_HEADER)
.and_then(|v| v.to_str().ok())
.and_then(compressors::get);

Compression {
compressor: request_compressor,
}
}

/// Get if compression is enabled
pub(crate) fn is_enabled(&self) -> bool {
self.compressor.is_some()
}

/// Decompress `len` bytes from `in_buffer` into `out_buffer`
pub(crate) fn compress(
&self,
in_buffer: &mut BytesMut,
out_buffer: &mut BytesMut,
len: usize,
) -> Result<(), CompressionError> {
let capacity = ((len / BUFFER_SIZE) + 1) * BUFFER_SIZE;
out_buffer.reserve(capacity);

let compressor = self.compressor.ok_or(CompressionError::NoCompression)?;
compressor.compress(in_buffer, out_buffer, len)?;
in_buffer.advance(len);

debug!(
"Decompressed {} bytes into {} bytes using {:?}",
len,
out_buffer.len(),
compressor.name()
);

Ok(())
}

/// Set the `grpc-encoding` header with the compressor name
pub(crate) fn set_headers(&self, headers: &mut http::HeaderMap, set_accept_encoding: bool) {
if set_accept_encoding {
headers.insert(
ACCEPT_ENCODING_HEADER,
HeaderValue::from_str(&compressors::get_accept_encoding_header())
.expect("All encoding names should be ASCII"),
);
}

match self.compressor {
None => {}
Some(compressor) => {
headers.insert(ENCODING_HEADER, HeaderValue::from_static(compressor.name()));
}
}
}
}
69 changes: 69 additions & 0 deletions tonic/src/codec/compression/compressors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use bytes::BytesMut;
use once_cell::sync::Lazy;
use std::{collections::HashMap, io};

pub(crate) const IDENTITY: &str = "identity";

/// List of known compressors
static COMPRESSORS: Lazy<HashMap<String, Box<dyn Compressor>>> = Lazy::new(|| {
#[cfg(feature = "gzip")]
{
let mut m = HashMap::new();

let mut add = |compressor: Box<dyn Compressor>| {
m.insert(compressor.name().to_string(), compressor);
};

add(Box::new(super::gzip::GZipCompressor::default()));

m
}

#[cfg(not(feature = "gzip"))]
HashMap::new()
});

/// Get a compressor from it's name
pub(crate) fn get(name: impl AsRef<str>) -> Option<&'static Box<dyn Compressor>> {
COMPRESSORS.get(name.as_ref())
}

/// Get all the known compressors
pub(crate) fn names() -> Vec<String> {
COMPRESSORS.keys().map(|n| n.clone()).collect()
}

/// A compressor implement compression and decompression of GRPC frames
pub(crate) trait Compressor: Sync + Send {
/// Get the name of this compressor as present in http headers
fn name(&self) -> &'static str;

/// Decompress `len` bytes from `in_buffer` into `out_buffer`
fn decompress(
&self,
in_buffer: &BytesMut,
out_buffer: &mut BytesMut,
len: usize,
) -> io::Result<()>;

/// Compress `len` bytes from `in_buffer` into `out_buffer`
fn compress(
&self,
in_buffer: &BytesMut,
out_buffer: &mut BytesMut,
len: usize,
) -> io::Result<()>;

/// Estimate the space necessary to decompress `compressed_len` bytes of compressed data
fn estimate_decompressed_len(&self, compressed_len: usize) -> usize {
compressed_len * 2
}
}

pub(crate) fn get_accept_encoding_header() -> String {
COMPRESSORS
.keys()
.map(|s| &**s)
.collect::<Vec<_>>()
.join(",")
}
Loading