Skip to content

Commit

Permalink
Merge pull request #2 from jiacai2050/fix-chunk
Browse files Browse the repository at this point in the history
fix: read partial body
  • Loading branch information
jiacai2050 authored May 9, 2023
2 parents bb03715 + 5858683 commit d87bf83
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "prom-remote-api"
version = "0.2.1"
version = "0.2.2"
edition = "2021"
license = "Apache-2.0"
description = "Prometheus remote storage API for Rust"
Expand All @@ -12,6 +12,7 @@ readme = "README.md"

[dependencies]
async-trait = "0.1.61"
bytes = "1.4.0"
futures = "0.3.25"
prost = "0.11.6"
snap = "1.1.0"
Expand Down
1 change: 0 additions & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ fn generate_samples(start_ms: i64, end_ms: i64, step_ms: i64) -> Vec<Sample> {

// range query
(start_ms..end_ms)
.into_iter()
.step_by(step_ms as usize)
.enumerate()
.map(|(i, timestamp)| Sample {
Expand Down
9 changes: 5 additions & 4 deletions src/web/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::convert::Infallible;

use bytes::Bytes;
use prost::Message;
use warp::{
body,
Expand All @@ -11,7 +12,7 @@ use warp::{
StatusCode,
},
reject::{self, Reject},
reply, Buf, Filter, Rejection, Reply,
reply, Filter, Rejection, Reply,
};

use crate::{
Expand Down Expand Up @@ -63,16 +64,16 @@ impl Reject for Error {}
// https://github.com/ParkMyCar/warp-protobuf/blob/master/src/lib.rs#L102
pub fn protobuf_body<T: Message + Send + Default>(
) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
async fn from_reader<T: Message + Send + Default>(buf: impl Buf) -> Result<T, Rejection> {
util::decode_snappy(buf.chunk())
async fn from_reader<T: Message + Send + Default>(bytes: Bytes) -> Result<T, Rejection> {
util::decode_snappy(&bytes)
.map_err(reject::custom)
.and_then(|decoded_buf| {
T::decode(decoded_buf.as_slice())
.map_err(|err| reject::custom(Error::ProtoDecode(err)))
})
}

body::aggregate().and_then(from_reader)
body::bytes().and_then(from_reader)
}

impl warp::Reply for ReadResponse {
Expand Down

0 comments on commit d87bf83

Please sign in to comment.