Skip to content

Commit

Permalink
Filter Store and forward messages if feature is not enabled
Browse files Browse the repository at this point in the history
- Store and forward messages are discarded when they are not supported
  by the node
- To implement this, tower-filter was used. However, it is not released
  yet for futures 0.3 so I've included it directly in this PR
  • Loading branch information
sdbondi committed Oct 11, 2019
1 parent 263d7b4 commit f38b937
Show file tree
Hide file tree
Showing 15 changed files with 477 additions and 12 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ members = [
"comms",
"comms/dht",
"comms/middleware",
# TODO: Remove this once tower filter (0.3.0-alpha.3) is released
"comms/middleware/tower-filter",
"digital_assets_layer/core",
"infrastructure/broadcast_channel",
"infrastructure/crypto",
Expand Down
1 change: 1 addition & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde_repr = "0.1.5"
tokio = "0.2.0-alpha.6"
tokio-executor = "0.2.0-alpha.6"
tower= "0.3.0-alpha.2"
tower-filter= {path="../middleware/tower-filter"}#version="=0.3.0-alpha.2", path="../"
ttl_cache = "0.5.1"

[dev-dependencies]
Expand Down
77 changes: 71 additions & 6 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,24 @@ use self::outbound::OutboundMessageRequester;
use crate::{
envelope::{DhtMessageType, NodeDestination},
inbound,
inbound::{DecryptedDhtMessage, DiscoverMessage, JoinMessage},
inbound::{DecryptedDhtMessage, DhtInboundMessage, DiscoverMessage, JoinMessage},
outbound,
outbound::{BroadcastClosestRequest, BroadcastStrategy, DhtOutboundError, DhtOutboundRequest, OutboundEncryption},
store_forward,
DhtConfig,
};
use futures::{channel::mpsc, Future};
use log::debug;
use futures::{channel::mpsc, future, Future};
use log::*;
use std::sync::Arc;
use tari_comms::{
message::InboundMessage,
outbound_message_service::OutboundMessage,
peer_manager::{NodeId, NodeIdentity, PeerManager},
peer_manager::{NodeId, NodeIdentity, PeerFeature, PeerManager},
types::CommsPublicKey,
};
use tari_comms_middleware::MiddlewareError;
use tower::{layer::Layer, Service, ServiceBuilder};
use tower_filter::error::Error as FilterError;

const LOG_TARGET: &'static str = "comms::dht";

Expand Down Expand Up @@ -99,6 +100,7 @@ impl Dht {

ServiceBuilder::new()
.layer(inbound::DeserializeLayer::new())
.layer(tower_filter::FilterLayer::new(self.unsupported_saf_messages_filter()))
.layer(inbound::DecryptionLayer::new(Arc::clone(&self.node_identity)))
.layer(store_forward::ForwardLayer::new(
Arc::clone(&self.peer_manager),
Expand Down Expand Up @@ -185,6 +187,33 @@ impl Dht {
Ok(())
}

/// Produces a filter predicate which disallows store and forward messages if that feature is not
/// supported by the node.
fn unsupported_saf_messages_filter(
&self,
) -> impl tower_filter::Predicate<DhtInboundMessage, Future = future::Ready<Result<(), FilterError>>> + Clone + Send
{
let node_identity = Arc::clone(&self.node_identity);
move |msg: &DhtInboundMessage| {
if node_identity.has_peer_feature(&PeerFeature::DhtStoreForward) {
return future::ready(Ok(()));
}

match msg.dht_header.message_type {
DhtMessageType::SAFRequestMessages | DhtMessageType::SAFStoredMessages => {
// TODO: This is an indication of node misbehaviour
warn!(
"Received store and forward message from PublicKey={}. Store and forward feature is not \
supported by this node. Discarding message.",
msg.dht_header.origin_public_key
);
future::ready(Err(FilterError::rejected()))
},
_ => future::ready(Ok(())),
}
}
}

pub async fn send_discover(
&self,
dest_public_key: CommsPublicKey,
Expand Down Expand Up @@ -233,9 +262,15 @@ impl Dht {
#[cfg(test)]
mod test {
use crate::{
envelope::DhtMessageFlags,
envelope::{DhtMessageFlags, DhtMessageType},
outbound::DhtOutboundRequest,
test_utils::{make_comms_inbound_message, make_dht_envelope, make_node_identity, make_peer_manager},
test_utils::{
make_client_identity,
make_comms_inbound_message,
make_dht_envelope,
make_node_identity,
make_peer_manager,
},
DhtBuilder,
};
use futures::{channel::mpsc, StreamExt};
Expand Down Expand Up @@ -349,4 +384,34 @@ mod test {
// Check the next service was not called
assert!(rt.block_on(next_service_rx.next()).is_none());
}

#[test]
fn stack_filter_saf_message() {
let node_identity = make_client_identity();
let peer_manager = make_peer_manager();

let dht = DhtBuilder::new(Arc::clone(&node_identity), peer_manager).finish();

let rt = Runtime::new().unwrap();

let (next_service_tx, mut next_service_rx) = mpsc::channel(10);

let mut service = dht
.inbound_middleware_layer()
.layer(SinkMiddleware::new(next_service_tx));

let msg = Message::from_message_format((), "secret".to_string()).unwrap();
let mut dht_envelope = make_dht_envelope(&node_identity, msg.to_binary().unwrap(), DhtMessageFlags::empty());
dht_envelope.header.message_type = DhtMessageType::SAFStoredMessages;
let inbound_message =
make_comms_inbound_message(&node_identity, dht_envelope.to_binary().unwrap(), MessageFlags::empty());

let err = rt.block_on(service.call(inbound_message));
assert!(err.is_err());
// This seems like the best way to tell that an open channel is empty without the test blocking indefinitely
assert_eq!(
format!("{}", next_service_rx.try_next().unwrap_err()),
"receiver channel is empty"
);
}
}
6 changes: 0 additions & 6 deletions comms/dht/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,3 @@ macro_rules! acquire_write_lock {
acquire_lock!($e, write)
};
}

macro_rules! acquire_read_lock {
($e:expr) => {
acquire_lock!($e, read)
};
}
11 changes: 11 additions & 0 deletions comms/dht/src/test_utils/makers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ pub fn make_node_identity() -> Arc<NodeIdentity> {
)
}

pub fn make_client_identity() -> Arc<NodeIdentity> {
Arc::new(
NodeIdentity::random(
&mut OsRng::new().unwrap(),
"127.0.0.1:9000".parse().unwrap(),
PeerFeatures::communication_client_default(),
)
.unwrap(),
)
}

pub fn make_comms_inbound_message(
node_identity: &NodeIdentity,
message: Vec<u8>,
Expand Down
12 changes: 12 additions & 0 deletions comms/middleware/tower-filter/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# 0.3.0-alpha.2 (September 30, 2019)

- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`

# 0.3.0-alpha.1

- Move to `std::future`

# 0.1.0 (unreleased)

- Initial release
35 changes: 35 additions & 0 deletions comms/middleware/tower-filter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "tower-filter"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.2"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-filter/0.3.0-alpha.2"
description = """
Conditionally allow requests to be dispatched to a service based on the result
of a predicate.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false

[dependencies]
tower= { version = "=0.3.0-alpha.2"}
pin-project = "0.4"
futures-core-preview = "=0.3.0-alpha.19"

[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.2" }
tokio-test = "=0.2.0-alpha.6"
tokio = "=0.2.0-alpha.6"
futures-util-preview = "=0.3.0-alpha.19"
25 changes: 25 additions & 0 deletions comms/middleware/tower-filter/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors

Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:

The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
14 changes: 14 additions & 0 deletions comms/middleware/tower-filter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Tower Filter

Conditionally allow requests to be dispatched to a service based on the result
of a predicate.

## License

This project is licensed under the [MIT license](LICENSE).

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
46 changes: 46 additions & 0 deletions comms/middleware/tower-filter/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Error types
use std::{error, fmt};

/// Error produced by `Filter`
#[derive(Debug)]
pub struct Error {
source: Option<Source>,
}

pub(crate) type Source = Box<dyn error::Error + Send + Sync>;

impl Error {
/// Create a new `Error` representing a rejected request.
pub fn rejected() -> Error {
Error { source: None }
}

/// Create a new `Error` representing an inner service error.
pub fn inner<E>(source: E) -> Error
where E: Into<Source> {
Error {
source: Some(source.into()),
}
}
}

impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.source.is_some() {
write!(fmt, "inner service errored")
} else {
write!(fmt, "rejected")
}
}
}

impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
if let Some(ref err) = self.source {
Some(&**err)
} else {
None
}
}
}
91 changes: 91 additions & 0 deletions comms/middleware/tower-filter/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! Future types
use crate::error::{self, Error};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::Service;

/// Filtered response future
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T, S, Request>
where S: Service<Request>
{
#[pin]
/// Response future state
state: State<Request, S::Future>,

#[pin]
/// Predicate future
check: T,

/// Inner service
service: S,
}

#[pin_project]
#[derive(Debug)]
enum State<Request, U> {
Check(Option<Request>),
WaitResponse(#[pin] U),
}

impl<F, T, S, Request> ResponseFuture<F, S, Request>
where
F: Future<Output = Result<T, Error>>,
S: Service<Request>,
S::Error: Into<error::Source>,
{
pub(crate) fn new(request: Request, check: F, service: S) -> Self {
ResponseFuture {
state: State::Check(Some(request)),
check,
service,
}
}
}

impl<F, T, S, Request> Future for ResponseFuture<F, S, Request>
where
F: Future<Output = Result<T, Error>>,
S: Service<Request>,
S::Error: Into<error::Source>,
{
type Output = Result<S::Response, Error>;

#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
#[project]
match this.state.as_mut().project() {
State::Check(request) => {
let request = request
.take()
.expect("we either give it back or leave State::Check once we take");

// Poll predicate
match this.check.as_mut().poll(cx)? {
Poll::Ready(_) => {
let response = this.service.call(request);
this.state.set(State::WaitResponse(response));
},
Poll::Pending => {
this.state.set(State::Check(Some(request)));
return Poll::Pending;
},
}
},
State::WaitResponse(response) => {
return Poll::Ready(ready!(response.poll(cx)).map_err(Error::inner));
},
}
}
}
}
Loading

0 comments on commit f38b937

Please sign in to comment.