From 8e715b2c3eb6db1106bb3673d17a01b8493540fb Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 May 2023 17:00:11 +0800 Subject: [PATCH 1/5] read all bytes --- src/types.rs | 3 +++ src/web/warp.rs | 9 +++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/types.rs b/src/types.rs index 486fa95..c092067 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,6 +17,7 @@ pub use prometheus::*; pub enum Error { SnappyEncode(snap::Error), SnappyDecode(snap::Error), + ReadRequest(std::io::Error), ProtoDecode(prost::DecodeError), } @@ -27,6 +28,7 @@ impl Display for Error { match self { Self::SnappyEncode(_) => f.write_str("SnappyEncode"), Self::SnappyDecode(_) => f.write_str("SnappyDecode"), + Self::ReadRequest(_) => f.write_str("ReadRequest"), Self::ProtoDecode(_) => f.write_str("ProtoDecode"), } } @@ -37,6 +39,7 @@ impl std::error::Error for Error { match self { Self::SnappyEncode(e) => Some(e), Self::SnappyDecode(e) => Some(e), + Self::ReadRequest(e) => Some(e), Self::ProtoDecode(e) => Some(e), } } diff --git a/src/web/warp.rs b/src/web/warp.rs index 2232fcb..ed8dfe0 100755 --- a/src/web/warp.rs +++ b/src/web/warp.rs @@ -1,6 +1,6 @@ //! Remote storage adapter for warp web framework -use std::convert::Infallible; +use std::{convert::Infallible, io::Read}; use prost::Message; use warp::{ @@ -64,7 +64,12 @@ impl Reject for Error {} pub fn protobuf_body( ) -> impl Filter + Copy { async fn from_reader(buf: impl Buf) -> Result { - util::decode_snappy(buf.chunk()) + let mut body = Vec::new(); + buf.reader() + .read_to_end(&mut body) + .map_err(Error::ReadRequest)?; + + util::decode_snappy(&body) .map_err(reject::custom) .and_then(|decoded_buf| { T::decode(decoded_buf.as_slice()) From 6984334b2b4faadb9ca02911400bf54f58d1dda5 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 May 2023 17:07:16 +0800 Subject: [PATCH 2/5] add bytes dep --- Cargo.toml | 1 + src/types.rs | 3 --- src/web/warp.rs | 12 ++++-------- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5cd342d..98153c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ readme = "README.md" [dependencies] async-trait = "0.1.61" +bytes = "1.4.0" futures = "0.3.25" prost = "0.11.6" snap = "1.1.0" diff --git a/src/types.rs b/src/types.rs index c092067..486fa95 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,7 +17,6 @@ pub use prometheus::*; pub enum Error { SnappyEncode(snap::Error), SnappyDecode(snap::Error), - ReadRequest(std::io::Error), ProtoDecode(prost::DecodeError), } @@ -28,7 +27,6 @@ impl Display for Error { match self { Self::SnappyEncode(_) => f.write_str("SnappyEncode"), Self::SnappyDecode(_) => f.write_str("SnappyDecode"), - Self::ReadRequest(_) => f.write_str("ReadRequest"), Self::ProtoDecode(_) => f.write_str("ProtoDecode"), } } @@ -39,7 +37,6 @@ impl std::error::Error for Error { match self { Self::SnappyEncode(e) => Some(e), Self::SnappyDecode(e) => Some(e), - Self::ReadRequest(e) => Some(e), Self::ProtoDecode(e) => Some(e), } } diff --git a/src/web/warp.rs b/src/web/warp.rs index ed8dfe0..a0704ec 100755 --- a/src/web/warp.rs +++ b/src/web/warp.rs @@ -2,6 +2,7 @@ use std::{convert::Infallible, io::Read}; +use bytes::Bytes; use prost::Message; use warp::{ body, @@ -63,13 +64,8 @@ impl Reject for Error {} // https://github.com/ParkMyCar/warp-protobuf/blob/master/src/lib.rs#L102 pub fn protobuf_body( ) -> impl Filter + Copy { - async fn from_reader(buf: impl Buf) -> Result { - let mut body = Vec::new(); - buf.reader() - .read_to_end(&mut body) - .map_err(Error::ReadRequest)?; - - util::decode_snappy(&body) + async fn from_reader(bytes: Bytes) -> Result { + util::decode_snappy(&bytes) .map_err(reject::custom) .and_then(|decoded_buf| { T::decode(decoded_buf.as_slice()) @@ -77,7 +73,7 @@ pub fn protobuf_body( }) } - body::aggregate().and_then(from_reader) + body::bytes().and_then(from_reader) } impl warp::Reply for ReadResponse { From 7a13eed0d6a638d1049009047a8144efe63dbaaa Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 May 2023 17:10:29 +0800 Subject: [PATCH 3/5] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 98153c3..dd88e72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prom-remote-api" -version = "0.2.1" +version = "0.2.2" edition = "2021" license = "Apache-2.0" description = "Prometheus remote storage API for Rust" From d27becd8b355cc23d806d41d653bc4a3a2dea72d Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 May 2023 17:16:59 +0800 Subject: [PATCH 4/5] fix clippy --- src/web/warp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/web/warp.rs b/src/web/warp.rs index a0704ec..6e4c5a8 100755 --- a/src/web/warp.rs +++ b/src/web/warp.rs @@ -1,6 +1,6 @@ //! Remote storage adapter for warp web framework -use std::{convert::Infallible, io::Read}; +use std::convert::Infallible; use bytes::Bytes; use prost::Message; @@ -12,7 +12,7 @@ use warp::{ StatusCode, }, reject::{self, Reject}, - reply, Buf, Filter, Rejection, Reply, + reply, Filter, Rejection, Reply, }; use crate::{ From 5858683ba99925c04420239115c06bd889cd597b Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 May 2023 18:32:06 +0800 Subject: [PATCH 5/5] fix clippy --- examples/simple.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/simple.rs b/examples/simple.rs index cde5a3a..624bde1 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -33,7 +33,6 @@ fn generate_samples(start_ms: i64, end_ms: i64, step_ms: i64) -> Vec { // range query (start_ms..end_ms) - .into_iter() .step_by(step_ms as usize) .enumerate() .map(|(i, timestamp)| Sample {