Skip to content

Commit

Permalink
call filters in service
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Nov 22, 2024
1 parent 2baf3f6 commit 9c3dc5a
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 36 deletions.
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/opaq/logical/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ where
.push_map_target(|t| t)
.push_map_target(|b: Backend<T>| b.concrete)
// apply backend filters
.push_filter(filters::apply)
.push(filters::NewApplyFilters::layer())
.lift_new()
.push(NewDistribute::layer())
// The router does not take the backend's availability into
// consideration, so we must eagerly fail requests to prevent
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
// apply route level filters
.push_filter(filters::apply)
.push(filters::NewApplyFilters::layer())
.push(svc::NewMapErr::layer_with(|rt: &Self| {
let route = rt.params.route_ref.clone();
move |source| RouteError {
Expand Down
100 changes: 84 additions & 16 deletions linkerd/app/outbound/src/opaq/logical/route/filters.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,98 @@
use linkerd_app_core::{svc, Error};
use linkerd_app_core::{io, svc, Error};
use linkerd_proxy_client_policy::opaq;
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

pub(crate) fn apply<T>(t: T) -> Result<T, Error>
#[derive(Clone, Debug)]
pub struct NewApplyFilters<N> {
inner: N,
}

#[derive(Clone, Debug)]
pub struct ApplyFilters<S> {
inner: S,
filters: Arc<[opaq::Filter]>,
}

// === impl NewApplyFilters ===

impl<N> NewApplyFilters<N> {
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone {
svc::layer::mk(move |inner| Self { inner })
}
}

impl<T, N, S> svc::NewService<T> for NewApplyFilters<N>
where
T: Clone,
N: svc::NewService<T, Service = S>,
T: svc::Param<Arc<[opaq::Filter]>>,
{
let filters: &[opaq::Filter] = &t.param();
if let Some(filter) = filters.iter().next() {
match filter {
opaq::Filter::Forbidden => {
return Err(errors::TCPForbiddenRoute.into());
}
type Service = ApplyFilters<S>;

opaq::Filter::Invalid(message) => {
return Err(errors::TCPInvalidBackend(message.clone()).into());
}
fn new_service(&self, target: T) -> Self::Service {
let filters: Arc<[opaq::Filter]> = target.param();
let svc = self.inner.new_service(target);
ApplyFilters {
inner: svc,
filters,
}
}
}

opaq::Filter::InternalError(message) => {
return Err(errors::TCPInvalidPolicy(message).into());
// === impl ApplyFilters ===

impl<S> ApplyFilters<S> {
fn apply_filters(&self) -> Result<(), Error> {
if let Some(filter) = self.filters.iter().next() {
match filter {
opaq::Filter::Forbidden => {
return Err(errors::TCPForbiddenRoute.into());
}

opaq::Filter::Invalid(message) => {
return Err(errors::TCPInvalidBackend(message.clone()).into());
}

opaq::Filter::InternalError(message) => {
return Err(errors::TCPInvalidPolicy(message).into());
}
}
}

Ok(())
}
}

impl<I, S> svc::Service<I> for ApplyFilters<S>
where
I: io::AsyncRead + io::AsyncWrite + Send + 'static,
S: svc::Service<I> + Send + Clone + 'static,
S::Error: Into<Error>,
S::Future: Send,
{
type Response = S::Response;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

Ok(t)
fn call(&mut self, io: I) -> Self::Future {
let call = self.inner.call(io);
let apply = self.apply_filters();

Box::pin(async move {
apply?;
call.await.map_err(Into::into)
})
}
}

pub mod errors {
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/tls/logical/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ where
.push_map_target(|t| t)
.push_map_target(|b: Backend<T>| b.concrete)
// apply backend filters
.push_filter(filters::apply)
.push(filters::NewApplyFilters::layer())
.lift_new()
.push(NewDistribute::layer())
// The router does not take the backend's availability into
// consideration, so we must eagerly fail requests to prevent
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
// apply route level filters
.push_filter(filters::apply)
.push(filters::NewApplyFilters::layer())
.push(svc::NewMapErr::layer_with(|rt: &Self| {
let route = rt.params.route_ref.clone();
move |source| RouteError {
Expand Down
100 changes: 84 additions & 16 deletions linkerd/app/outbound/src/tls/logical/route/filters.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,98 @@
use linkerd_app_core::{svc, Error};
use linkerd_app_core::{io, svc, Error};
use linkerd_proxy_client_policy::tls;
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

pub(crate) fn apply<T>(t: T) -> Result<T, Error>
#[derive(Clone, Debug)]
pub struct NewApplyFilters<N> {
inner: N,
}

#[derive(Clone, Debug)]
pub struct ApplyFilters<S> {
inner: S,
filters: Arc<[tls::Filter]>,
}

// === impl NewApplyFilters ===

impl<N> NewApplyFilters<N> {
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone {
svc::layer::mk(move |inner| Self { inner })
}
}

impl<T, N, S> svc::NewService<T> for NewApplyFilters<N>
where
T: Clone,
N: svc::NewService<T, Service = S>,
T: svc::Param<Arc<[tls::Filter]>>,
{
let filters: &[tls::Filter] = &t.param();
if let Some(filter) = filters.iter().next() {
match filter {
tls::Filter::Forbidden => {
return Err(errors::TLSForbiddenRoute.into());
}
type Service = ApplyFilters<S>;

tls::Filter::Invalid(message) => {
return Err(errors::TLSInvalidBackend(message.clone()).into());
}
fn new_service(&self, target: T) -> Self::Service {
let filters: Arc<[tls::Filter]> = target.param();
let svc = self.inner.new_service(target);
ApplyFilters {
inner: svc,
filters,
}
}
}

tls::Filter::InternalError(message) => {
return Err(errors::TLSInvalidPolicy(message).into());
// === impl ApplyFilters ===

impl<S> ApplyFilters<S> {
fn apply_filters(&self) -> Result<(), Error> {
if let Some(filter) = self.filters.iter().next() {
match filter {
tls::Filter::Forbidden => {
return Err(errors::TLSForbiddenRoute.into());
}

tls::Filter::Invalid(message) => {
return Err(errors::TLSInvalidBackend(message.clone()).into());
}

tls::Filter::InternalError(message) => {
return Err(errors::TLSInvalidPolicy(message).into());
}
}
}

Ok(())
}
}

impl<I, S> svc::Service<I> for ApplyFilters<S>
where
I: io::AsyncRead + io::AsyncWrite + Send + 'static,
S: svc::Service<I> + Send + Clone + 'static,
S::Error: Into<Error>,
S::Future: Send,
{
type Response = S::Response;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

Ok(t)
fn call(&mut self, io: I) -> Self::Future {
let call = self.inner.call(io);
let apply = self.apply_filters();

Box::pin(async move {
apply?;
call.await.map_err(Into::into)
})
}
}

pub mod errors {
Expand Down

0 comments on commit 9c3dc5a

Please sign in to comment.