Skip to content

Commit

Permalink
feat(hyper-error-handling): adds handling for hyper HTTP errors
Browse files Browse the repository at this point in the history
Adds default implementation of CacheableResponse for Result.
  • Loading branch information
singulared committed Aug 26, 2024
1 parent 40a376a commit 0ec39df
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 65 deletions.
4 changes: 2 additions & 2 deletions examples/examples/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use hitbox_tower::{
use hitbox_redis::RedisBackend;
use hitbox_tower::{Cache, EndpointConfig};

async fn handler_result(Path(_name): Path<String>) -> Result<String, String> {
async fn handler_result(Path(_name): Path<String>) -> Result<String, StatusCode> {
//dbg!("axum::handler_result");
// Ok(format!("Hello, {name}"))
Err("error".to_owned())
Err(StatusCode::INTERNAL_SERVER_ERROR)
}

async fn handler() -> String {
Expand Down
10 changes: 5 additions & 5 deletions examples/examples/tower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use hitbox_tower::Cache;
use hyper::{Body, Server};
use std::net::SocketAddr;

use http::{Request, Response};
use http::{Method, Request, Response};
use tower::make::Shared;

async fn handle(_: Request<Body>) -> Result<Response<Body>, String> {
Ok(Response::new("Hello, World!".into()))
// Err("handler error".to_owned())
async fn handle(_: Request<Body>) -> http::Result<Response<Body>> {
// Ok(Response::new("Hello, World!".into()))
Err(http::Error::from(Method::from_bytes(&[0x01]).unwrap_err()))
}

#[tokio::main]
Expand All @@ -24,7 +24,7 @@ async fn main() {
let redis = RedisBackend::builder().build().unwrap();

let service = tower::ServiceBuilder::new()
.layer(Cache::builder().backend(inmemory).build())
// .layer(Cache::builder().backend(inmemory).build())
.layer(Cache::builder().backend(redis).build())
.service_fn(handle);

Expand Down
22 changes: 19 additions & 3 deletions hitbox-backend/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Serializer for BinSerializer<Vec<u8>> {
#[cfg(test)]
mod test {
use async_trait::async_trait;
use hitbox_core::CacheableResponse;
use hitbox_core::{CachePolicy, CacheableResponse, PredicateResult};

use super::*;

Expand All @@ -142,9 +142,25 @@ mod test {
#[async_trait]
impl CacheableResponse for Test {
type Cached = Self;
type Subject = Self;

async fn cache_policy<P>(self, predicates: P) -> hitbox_core::ResponseCachePolicy<Self>
where
P: hitbox_core::Predicate<Subject = Self::Subject> + Send + Sync,
{
match predicates.check(self).await {
PredicateResult::Cacheable(cacheable) => match cacheable.into_cached().await {
CachePolicy::Cacheable(res) => {
CachePolicy::Cacheable(CachedValue::new(res, Utc::now()))
}
CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(res),
},
PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(res),
}
}

async fn into_cached(self) -> Self::Cached {
self
async fn into_cached(self) -> CachePolicy<Self::Cached, Self> {
CachePolicy::Cacheable(self)
}
async fn from_cached(cached: Self::Cached) -> Self {
cached
Expand Down
6 changes: 6 additions & 0 deletions hitbox-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ keywords = ["cache", "async", "cache-backend", "hitbox", "tokio"]
[dependencies]
async-trait = "0.1.73"
chrono = { version = "0.4.26", default-features = false, features = ["clock"] }

[dev-dependencies]
tokio = { version = "1", default-features = false, features = [
"macros",
"rt-multi-thread",
] }
46 changes: 32 additions & 14 deletions hitbox-core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,13 @@ where
Self::Cached: Clone,
{
type Cached;
type Subject: CacheableResponse;

async fn cache_policy<P>(self, predicates: P) -> ResponseCachePolicy<Self>
where
P: Predicate<Subject = Self> + Send + Sync,
{
match predicates.check(self).await {
PredicateResult::Cacheable(res) => {
CachePolicy::Cacheable(CachedValue::new(res.into_cached().await, Utc::now()))
}
PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(res),
}
}
P: Predicate<Subject = Self::Subject> + Send + Sync;

async fn into_cached(self) -> Self::Cached;
async fn into_cached(self) -> CachePolicy<Self::Cached, Self>;

async fn from_cached(cached: Self::Cached) -> Self;
}
Expand All @@ -60,12 +53,37 @@ where
T::Cached: Send,
{
type Cached = <T as CacheableResponse>::Cached;
type Subject = T;

async fn cache_policy<P>(self, predicates: P) -> ResponseCachePolicy<Self>
where
P: Predicate<Subject = Self::Subject> + Send + Sync,
{
match self {
Ok(response) => match predicates.check(response).await {
PredicateResult::Cacheable(cacheable) => match cacheable.into_cached().await {
CachePolicy::Cacheable(res) => {
CachePolicy::Cacheable(CachedValue::new(res, Utc::now()))
}
CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(Ok(res)),
},
PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(Ok(res)),
},
Err(error) => ResponseCachePolicy::NonCacheable(Err(error)),
}
}

async fn into_cached(self) -> Self::Cached {
unimplemented!()
async fn into_cached(self) -> CachePolicy<Self::Cached, Self> {
match self {
Ok(response) => match response.into_cached().await {
CachePolicy::Cacheable(res) => CachePolicy::Cacheable(res),
CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(Ok(res)),
},
Err(error) => CachePolicy::NonCacheable(Err(error)),
}
}

async fn from_cached(_cached: Self::Cached) -> Self {
unimplemented!()
async fn from_cached(cached: Self::Cached) -> Self {
Ok(T::from_cached(cached).await)
}
}
74 changes: 74 additions & 0 deletions hitbox-core/tests/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use async_trait::async_trait;
use chrono::Utc;
use hitbox_core::{CachePolicy, CacheableResponse, CachedValue, Predicate, PredicateResult};

#[derive(Clone, Debug)]
struct TestResponse {
field1: String,
field2: u8,
}

impl TestResponse {
pub fn new() -> Self {
Self {
field1: "nope".to_owned(),
field2: 42,
}
}
}

#[async_trait]
impl CacheableResponse for TestResponse {
type Cached = Self;
type Subject = Self;

async fn cache_policy<P>(self, predicates: P) -> hitbox_core::ResponseCachePolicy<Self>
where
P: hitbox_core::Predicate<Subject = Self::Subject> + Send + Sync,
{
match predicates.check(self).await {
PredicateResult::Cacheable(cacheable) => match cacheable.into_cached().await {
CachePolicy::Cacheable(res) => {
CachePolicy::Cacheable(CachedValue::new(res, Utc::now()))
}
CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(res),
},
PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(res),
}
}

async fn into_cached(self) -> CachePolicy<Self::Cached, Self> {
CachePolicy::Cacheable(self)
}
async fn from_cached(cached: Self::Cached) -> Self {
cached
}
}

struct NeuralPredicate {}

impl NeuralPredicate {
fn new() -> Self {
NeuralPredicate {}
}
}

#[async_trait]
impl Predicate for NeuralPredicate {
type Subject = TestResponse;

async fn check(&self, subject: Self::Subject) -> PredicateResult<Self::Subject> {
PredicateResult::Cacheable(subject)
}
}

#[tokio::test]
async fn test_cacheable_result() {
let response: Result<TestResponse, ()> = Ok(TestResponse::new());
let policy = response.cache_policy(NeuralPredicate::new()).await;
dbg!(&policy);

let response: Result<TestResponse, ()> = Err(());
let policy = response.cache_policy(NeuralPredicate::new()).await;
dbg!(&policy);
}
25 changes: 21 additions & 4 deletions hitbox-http/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{collections::HashMap, fmt::Debug};

use async_trait::async_trait;
use bytes::Bytes;
use hitbox::CacheableResponse;
use chrono::Utc;
use hitbox::{predicate::PredicateResult, CachePolicy, CacheableResponse, CachedValue};
use http::{response::Parts, Response};
use hyper::body::{to_bytes, HttpBody};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -71,9 +72,25 @@ where
ResBody::Data: Send,
{
type Cached = SerializableHttpResponse;
type Subject = Self;

async fn into_cached(self) -> Self::Cached {
SerializableHttpResponse {
async fn cache_policy<P>(self, predicates: P) -> hitbox::ResponseCachePolicy<Self>
where
P: hitbox::Predicate<Subject = Self::Subject> + Send + Sync,
{
match predicates.check(self).await {
PredicateResult::Cacheable(cacheable) => match cacheable.into_cached().await {
CachePolicy::Cacheable(res) => {
CachePolicy::Cacheable(CachedValue::new(res, Utc::now()))
}
CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(res),
},
PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(res),
}
}

async fn into_cached(self) -> CachePolicy<Self::Cached, Self> {
CachePolicy::Cacheable(SerializableHttpResponse {
status: 200,
version: "HTTP/1.1".to_owned(),
body: to_bytes(self.body.into_inner_body())
Expand All @@ -86,7 +103,7 @@ where
.into_iter()
.map(|(h, v)| (h.unwrap().to_string(), v.to_str().unwrap().to_string()))
.collect(),
}
})
}

async fn from_cached(cached: Self::Cached) -> Self {
Expand Down
46 changes: 32 additions & 14 deletions hitbox-tower/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{
any::type_name,
fmt::Debug,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
Expand All @@ -25,52 +27,68 @@ impl<S, ReqBody> Transformer<S, ReqBody> {
}
}

impl<S, ReqBody, ResBody> Transform<CacheableHttpRequest<ReqBody>, CacheableHttpResponse<ResBody>>
impl<S, ReqBody, ResBody>
Transform<CacheableHttpRequest<ReqBody>, Result<CacheableHttpResponse<ResBody>, S::Error>>
for Transformer<S, ReqBody>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
S::Future: Send,
ReqBody: Send + 'static,
ResBody: FromBytes,

// debug bounds
S::Error: Debug,
{
type Future = UpstreamFuture<ResBody>;
type Future = UpstreamFuture<ResBody, S::Error>;
type Response = Result<Response<ResBody>, S::Error>;

fn upstream_transform(&self, req: CacheableHttpRequest<ReqBody>) -> Self::Future {
UpstreamFuture::new(self.inner.clone(), req)
}

fn response_transform(&self, res: CacheableHttpResponse<ResBody>) -> Self::Response {
Ok(res.into_response())
fn response_transform(
&self,
res: Result<CacheableHttpResponse<ResBody>, S::Error>,
) -> Self::Response {
res.map(CacheableHttpResponse::into_response)
}
}

#[pin_project]
pub struct UpstreamFuture<ResBody> {
inner_future: BoxFuture<'static, CacheableHttpResponse<ResBody>>,
pub struct UpstreamFuture<ResBody, E> {
inner_future: BoxFuture<'static, Result<CacheableHttpResponse<ResBody>, E>>,
}

impl<ResBody> UpstreamFuture<ResBody> {
impl<ResBody, E> UpstreamFuture<ResBody, E> {
pub fn new<S, ReqBody>(mut inner_service: S, req: CacheableHttpRequest<ReqBody>) -> Self
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Send + 'static,
S: Service<Request<ReqBody>, Response = Response<ResBody>, Error = E> + Send + 'static,
S::Future: Send,
ReqBody: Send + 'static,
ResBody: FromBytes,

// debug bounds
S::Error: Debug,
{
let inner_future = Box::pin(async move {
let res = inner_service.call(req.into_request()).await;
match res {
Ok(res) => CacheableHttpResponse::from_response(res),
_ => unimplemented!(),
}
// CacheableHttpResponse::from_response(res.unwrap())
match &res {
Ok(res) => {
dbg!(res.status());
}
Err(err) => {
dbg!(err);
}
};
res.map(CacheableHttpResponse::from_response)
});
UpstreamFuture { inner_future }
}
}

impl<ResBody> Future for UpstreamFuture<ResBody> {
type Output = CacheableHttpResponse<ResBody>;
impl<ResBody, E> Future for UpstreamFuture<ResBody, E> {
type Output = Result<CacheableHttpResponse<ResBody>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.inner_future.as_mut().poll(cx)
Expand Down
3 changes: 2 additions & 1 deletion hitbox-tower/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ where
ResBody: FromBytes + HttpBody + Send + 'static,
ResBody::Error: Debug,
ResBody::Data: Send,
S::Error: Debug + Send,
{
type Response = Response<ResBody>;
type Error = S::Error;
type Future = CacheFuture<
B,
CacheableHttpRequest<ReqBody>,
CacheableHttpResponse<ResBody>,
Result<CacheableHttpResponse<ResBody>, S::Error>,
Transformer<S, ReqBody>,
>;

Expand Down
Loading

0 comments on commit 0ec39df

Please sign in to comment.