diff --git a/Cargo.lock b/Cargo.lock index 0caf970f..52f165b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.4" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" +checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" dependencies = [ "shlex", ] @@ -1021,9 +1021,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.168" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "linux-raw-sys" @@ -1082,9 +1082,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ef2593ffb6958c941575cee70c8e257438749971869c4ae5acf6f91a168a61" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" dependencies = [ "adler2", ] @@ -1685,18 +1685,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767" +checksum = "08f5383f3e0071702bf93ab5ee99b52d26936be9dedd9413067cbdcddcb6141a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36" +checksum = "f2f357fcec90b3caef6623a099691be676d033b40a058ac95d2a6ade6fa0c943" dependencies = [ "proc-macro2", "quote", diff --git a/crates/api/src/kitsune.rs b/crates/api/src/kitsune.rs index 6ee66bc5..5ac098c7 100644 --- a/crates/api/src/kitsune.rs +++ b/crates/api/src/kitsune.rs @@ -5,6 +5,18 @@ use std::sync::Arc; /// Handler for events coming out of Kitsune2. pub trait KitsuneHandler: 'static + Send + Sync + std::fmt::Debug { + /// A notification that a new listening address has been bound. + /// Peers should now go to this new address to reach this node. + fn new_listening_address(&self, this_url: Url) { + drop(this_url); + } + + /// A peer has disconnected from us. If they did so gracefully + /// the reason will be is_some(). + fn peer_disconnect(&self, peer: Url, reason: Option) { + drop((peer, reason)); + } + /// Gather preflight data to send to a new opening connection. /// Returning an Err result will close this connection. /// diff --git a/crates/api/src/space.rs b/crates/api/src/space.rs index e908fd9a..897cb243 100644 --- a/crates/api/src/space.rs +++ b/crates/api/src/space.rs @@ -5,20 +5,24 @@ use std::sync::Arc; /// Handler for events coming out of Kitsune2 such as messages from peers. pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug { - /// We have received an incomming message from a remote peer. - /// - /// If this callback handler returns an `Err` response, the connection - /// will be closed immediately. + /// The sync handler for receiving notifications sent by a remote + /// peer in reference to a particular space. If this callback returns + /// an error, then the connection which sent the message will be closed. // // Note: this is the minimal low-level messaging unit. We can decide // later if we want to handle request/response tracking in // kitsune itself as a convenience or if users of this lib // should have to implement that if they want it. - fn incoming_message( + fn recv_notify( &self, - peer: AgentId, + to_agent: AgentId, + from_agent: AgentId, + space: SpaceId, data: bytes::Bytes, - ) -> K2Result<()>; + ) -> K2Result<()> { + drop((to_agent, from_agent, space, data)); + Ok(()) + } } /// Trait-object [SpaceHandler]. @@ -56,8 +60,12 @@ pub trait Space: 'static + Send + Sync + std::fmt::Debug { // later if we want to handle request/response tracking in // kitsune itself as a convenience or if users of this lib // should have to implement that if they want it. - fn send_message(&self, peer: AgentId, data: bytes::Bytes) - -> BoxFut<'_, ()>; + fn send_notify( + &self, + to_agent: AgentId, + from_agent: AgentId, + data: bytes::Bytes, + ) -> BoxFut<'_, K2Result<()>>; } /// Trait-object [Space]. @@ -75,6 +83,7 @@ pub trait SpaceFactory: 'static + Send + Sync + std::fmt::Debug { builder: Arc, handler: DynSpaceHandler, space: SpaceId, + tx: transport::DynTransport, ) -> BoxFut<'static, K2Result>; } diff --git a/crates/api/src/transport.rs b/crates/api/src/transport.rs index baf9a1a5..8d8c3682 100644 --- a/crates/api/src/transport.rs +++ b/crates/api/src/transport.rs @@ -90,7 +90,7 @@ impl TxImpHnd { let space = SpaceId::from(space); if let Some(h) = self.space_map.lock().unwrap().get(&space) { - h.recv_space_notify(peer, space, data); + h.recv_space_notify(peer, space, data)?; } } Ok(()) @@ -104,7 +104,7 @@ impl TxImpHnd { .unwrap() .get(&(space.clone(), module.clone())) { - h.recv_module_msg(peer, space, module, data); + h.recv_module_msg(peer, space, module, data)?; } } Ok(()) @@ -139,7 +139,7 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug { pub type DynTxImp = Arc; /// A high-level wrapper around a low-level [DynTxImp] transport implementation. -pub trait Transport: Send + Sync { +pub trait Transport: 'static + Send + Sync + std::fmt::Debug { /// Register a space handler for receiving incoming notifications. /// /// Panics if you attempt to register a duplicate handler for @@ -371,9 +371,16 @@ pub type DynTxHandler = Arc; /// Handler for space-related events. pub trait TxSpaceHandler: TxBaseHandler { /// The sync handler for receiving notifications sent by a remote - /// peer in reference to a particular space. - fn recv_space_notify(&self, peer: Url, space: SpaceId, data: bytes::Bytes) { + /// peer in reference to a particular space. If this callback returns + /// an error, then the connection which sent the message will be closed. + fn recv_space_notify( + &self, + peer: Url, + space: SpaceId, + data: bytes::Bytes, + ) -> K2Result<()> { drop((peer, space, data)); + Ok(()) } } @@ -383,15 +390,17 @@ pub type DynTxSpaceHandler = Arc; /// Handler for module-related events. pub trait TxModuleHandler: TxBaseHandler { /// The sync handler for receiving module messages sent by a remote - /// peer in reference to a particular space. + /// peer in reference to a particular space. If this callback returns + /// an error, then the connection which sent the message will be closed. fn recv_module_msg( &self, peer: Url, space: SpaceId, module: String, data: bytes::Bytes, - ) { + ) -> K2Result<()> { drop((peer, space, module, data)); + Ok(()) } } @@ -408,7 +417,7 @@ pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug { fn create( &self, builder: Arc, - handler: Arc, + handler: DynTxHandler, ) -> BoxFut<'static, K2Result>; } diff --git a/crates/core/proto/gen/kitsune2.space.rs b/crates/core/proto/gen/kitsune2.space.rs new file mode 100644 index 00000000..64506d8d --- /dev/null +++ b/crates/core/proto/gen/kitsune2.space.rs @@ -0,0 +1,17 @@ +// This file is @generated by prost-build. +/// A Kitsune2 space protocol message. +/// +/// There is only a single space-level message type. That is a notify +/// between two agents at that space level. Making this a very simple message. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct K2SpaceProto { + /// The destination agent. + #[prost(bytes = "bytes", tag = "1")] + pub to_agent: ::prost::bytes::Bytes, + /// The source agent. + #[prost(bytes = "bytes", tag = "2")] + pub from_agent: ::prost::bytes::Bytes, + /// The payload or content of this message. + #[prost(bytes = "bytes", tag = "3")] + pub data: ::prost::bytes::Bytes, +} diff --git a/crates/core/proto/space.proto b/crates/core/proto/space.proto new file mode 100644 index 00000000..e9cffb4e --- /dev/null +++ b/crates/core/proto/space.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package kitsune2.space; + +// A Kitsune2 space protocol message. +// +// There is only a single space-level message type. That is a notify +// between two agents at that space level. Making this a very simple message. +message K2SpaceProto { + // The destination agent. + bytes to_agent = 1; + + // The source agent. + bytes from_agent = 2; + + // The payload or content of this message. + bytes data = 3; +} diff --git a/crates/core/src/factories/core_kitsune.rs b/crates/core/src/factories/core_kitsune.rs index 8f90adf4..885cf4bc 100644 --- a/crates/core/src/factories/core_kitsune.rs +++ b/crates/core/src/factories/core_kitsune.rs @@ -29,13 +29,50 @@ impl KitsuneFactory for CoreKitsuneFactory { handler: DynKitsuneHandler, ) -> BoxFut<'static, K2Result> { Box::pin(async move { + let tx = builder + .transport + .create( + builder.clone(), + Arc::new(TxHandlerTranslator(handler.clone())), + ) + .await?; let out: DynKitsune = - Arc::new(CoreKitsune::new(builder.clone(), handler)); + Arc::new(CoreKitsune::new(builder.clone(), handler, tx)); Ok(out) }) } } +#[derive(Debug)] +struct TxHandlerTranslator(DynKitsuneHandler); + +impl transport::TxBaseHandler for TxHandlerTranslator { + fn new_listening_address(&self, this_url: Url) { + self.0.new_listening_address(this_url); + } + + fn peer_disconnect(&self, peer: Url, reason: Option) { + self.0.peer_disconnect(peer, reason); + } +} + +impl transport::TxHandler for TxHandlerTranslator { + fn preflight_gather_outgoing( + &self, + peer_url: Url, + ) -> K2Result { + self.0.preflight_gather_outgoing(peer_url) + } + + fn preflight_validate_incoming( + &self, + peer_url: Url, + data: bytes::Bytes, + ) -> K2Result<()> { + self.0.preflight_validate_incoming(peer_url, data) + } +} + type SpaceFut = futures::future::Shared>>; type Map = HashMap; @@ -45,17 +82,20 @@ struct CoreKitsune { builder: Arc, handler: DynKitsuneHandler, map: std::sync::Mutex, + tx: transport::DynTransport, } impl CoreKitsune { pub fn new( builder: Arc, handler: DynKitsuneHandler, + tx: transport::DynTransport, ) -> Self { Self { builder, handler, map: std::sync::Mutex::new(HashMap::new()), + tx, } } } @@ -73,13 +113,14 @@ impl Kitsune for CoreKitsune { Entry::Vacant(e) => { let builder = self.builder.clone(); let handler = self.handler.clone(); + let tx = self.tx.clone(); e.insert(futures::future::FutureExt::shared(Box::pin( async move { let sh = handler.create_space(space.clone()).await?; let s = builder .space - .create(builder.clone(), sh, space) + .create(builder.clone(), sh, space, tx) .await?; Ok(s) }, @@ -104,9 +145,11 @@ mod test { struct S; impl SpaceHandler for S { - fn incoming_message( + fn recv_notify( &self, - _peer: AgentId, + _to_agent: AgentId, + _from_agent: AgentId, + _space: SpaceId, _data: bytes::Bytes, ) -> K2Result<()> { // this test is a bit of a stub for now until we have the diff --git a/crates/core/src/factories/core_space.rs b/crates/core/src/factories/core_space.rs index 9c342f81..c6cc3cb7 100644 --- a/crates/core/src/factories/core_space.rs +++ b/crates/core/src/factories/core_space.rs @@ -3,6 +3,8 @@ use kitsune2_api::{config::*, space::*, *}; use std::sync::Arc; +mod protocol; + /// The core space implementation provided by Kitsune2. /// You probably will have no reason to use something other than this. /// This abstraction is mainly here for testing purposes. @@ -25,25 +27,61 @@ impl SpaceFactory for CoreSpaceFactory { fn create( &self, builder: Arc, - _handler: DynSpaceHandler, - _space: SpaceId, + handler: DynSpaceHandler, + space: SpaceId, + tx: transport::DynTransport, ) -> BoxFut<'static, K2Result> { Box::pin(async move { let peer_store = builder.peer_store.create(builder.clone()).await?; - let out: DynSpace = Arc::new(CoreSpace::new(peer_store)); + tx.register_space_handler( + space.clone(), + Arc::new(TxHandlerTranslator(handler)), + ); + let out: DynSpace = Arc::new(CoreSpace::new(space, tx, peer_store)); Ok(out) }) } } +#[derive(Debug)] +struct TxHandlerTranslator(DynSpaceHandler); + +impl transport::TxBaseHandler for TxHandlerTranslator {} +impl transport::TxSpaceHandler for TxHandlerTranslator { + fn recv_space_notify( + &self, + _peer: Url, + space: SpaceId, + data: bytes::Bytes, + ) -> K2Result<()> { + let dec = protocol::K2SpaceProto::decode(&data)?; + self.0.recv_notify( + dec.to_agent.into(), + dec.from_agent.into(), + space, + dec.data, + ) + } +} + #[derive(Debug)] struct CoreSpace { + space: SpaceId, + tx: transport::DynTransport, peer_store: peer_store::DynPeerStore, } impl CoreSpace { - pub fn new(peer_store: peer_store::DynPeerStore) -> Self { - Self { peer_store } + pub fn new( + space: SpaceId, + tx: transport::DynTransport, + peer_store: peer_store::DynPeerStore, + ) -> Self { + Self { + space, + tx, + peer_store, + } } } @@ -63,11 +101,45 @@ impl Space for CoreSpace { Box::pin(async move { todo!() }) } - fn send_message( + fn send_notify( &self, - _peer: AgentId, - _data: bytes::Bytes, - ) -> BoxFut<'_, ()> { - Box::pin(async move { todo!() }) + to_agent: AgentId, + from_agent: AgentId, + data: bytes::Bytes, + ) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + let info = match self.peer_store.get(to_agent.clone()).await? { + Some(info) => info, + None => { + // TODO - once discovery is implemented try to + // look up the peer from the network. + return Err(K2Error::other(format!( + "to_agent {to_agent} not found" + ))); + } + }; + let url = match &info.url { + Some(url) => url.clone(), + None => { + return Err(K2Error::other(format!( + "to_agent {to_agent} is offline" + ))); + } + }; + + let enc = protocol::K2SpaceProto { + to_agent: to_agent.into(), + from_agent: from_agent.into(), + data, + } + .encode()?; + + self.tx + .send_space_notify(url, self.space.clone(), enc) + .await + }) } } + +#[cfg(test)] +mod test; diff --git a/crates/core/src/factories/core_space/protocol.rs b/crates/core/src/factories/core_space/protocol.rs new file mode 100644 index 00000000..cc9a26c8 --- /dev/null +++ b/crates/core/src/factories/core_space/protocol.rs @@ -0,0 +1,45 @@ +//! Kitsune2 wire protocol types. + +use crate::*; + +include!("../../../proto/gen/kitsune2.space.rs"); + +impl K2SpaceProto { + /// Decode this message from a byte array. + pub fn decode(bytes: &[u8]) -> K2Result { + prost::Message::decode(std::io::Cursor::new(bytes)).map_err(|err| { + K2Error::other_src("Failed to decode K2Proto Message", err) + }) + } + + /// Encode this message as a bytes::Bytes buffer. + pub fn encode(&self) -> K2Result { + let mut out = bytes::BytesMut::new(); + + prost::Message::encode(self, &mut out).map_err(|err| { + K2Error::other_src("Failed to encode K2Proto Message", err) + })?; + + Ok(out.freeze()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn happy_encode_decode() { + let t = K2SpaceProto { + to_agent: bytes::Bytes::from_static(b"a1"), + from_agent: bytes::Bytes::from_static(b"a2"), + data: bytes::Bytes::from_static(b"d"), + }; + + let e = t.encode().unwrap(); + + let d = K2SpaceProto::decode(&e).unwrap(); + + assert_eq!(t, d); + } +} diff --git a/crates/core/src/factories/core_space/test.rs b/crates/core/src/factories/core_space/test.rs new file mode 100644 index 00000000..53908292 --- /dev/null +++ b/crates/core/src/factories/core_space/test.rs @@ -0,0 +1,165 @@ +use kitsune2_api::*; +use std::sync::{Arc, Mutex}; + +const SIG: &[u8] = b"fake-signature"; + +#[derive(Debug)] +struct TestCrypto; + +impl agent::Signer for TestCrypto { + fn sign( + &self, + _agent_info: &agent::AgentInfo, + _encoded: &[u8], + ) -> BoxFut<'_, K2Result> { + Box::pin(async move { Ok(bytes::Bytes::from_static(SIG)) }) + } +} + +impl agent::Verifier for TestCrypto { + fn verify( + &self, + _agent_info: &agent::AgentInfo, + _message: &[u8], + signature: &[u8], + ) -> bool { + signature == SIG + } +} + +const S1: SpaceId = SpaceId(id::Id(bytes::Bytes::from_static(b"space1"))); + +fn make_agent_info(id: AgentId, url: Url) -> Arc { + let created_at = Timestamp::now(); + let expires_at = created_at + std::time::Duration::from_secs(60 * 20); + futures::executor::block_on(agent::AgentInfoSigned::sign( + &TestCrypto, + agent::AgentInfo { + agent: id, + space: S1.clone(), + created_at, + expires_at, + is_tombstone: false, + url: Some(url), + storage_arc: DhtArc::FULL, + }, + )) + .unwrap() +} + +#[tokio::test(flavor = "multi_thread")] +async fn space_notify_send_recv() { + use kitsune2_api::{kitsune::*, space::*, *}; + + type Item = (AgentId, AgentId, SpaceId, bytes::Bytes); + type Recv = Arc>>; + let recv = Arc::new(Mutex::new(Vec::new())); + + #[derive(Debug)] + struct S(Recv); + + impl SpaceHandler for S { + fn recv_notify( + &self, + to_agent: AgentId, + from_agent: AgentId, + space: SpaceId, + data: bytes::Bytes, + ) -> K2Result<()> { + self.0 + .lock() + .unwrap() + .push((to_agent, from_agent, space, data)); + Ok(()) + } + } + + let (u_s, mut u_r) = tokio::sync::mpsc::unbounded_channel(); + + #[derive(Debug)] + struct K(Recv, tokio::sync::mpsc::UnboundedSender); + + impl KitsuneHandler for K { + fn new_listening_address(&self, this_url: Url) { + let _ = self.1.send(this_url); + } + + fn create_space( + &self, + _space: SpaceId, + ) -> BoxFut<'_, K2Result> { + Box::pin(async move { + let s: DynSpaceHandler = Arc::new(S(self.0.clone())); + Ok(s) + }) + } + } + + let k: DynKitsuneHandler = Arc::new(K(recv.clone(), u_s.clone())); + let k1 = builder::Builder { + verifier: Arc::new(TestCrypto), + ..crate::default_builder() + } + .with_default_config() + .unwrap() + .build(k) + .await + .unwrap(); + let s1 = k1.space(S1.clone()).await.unwrap(); + let u1 = u_r.recv().await.unwrap(); + + let k: DynKitsuneHandler = Arc::new(K(recv.clone(), u_s.clone())); + let k2 = builder::Builder { + verifier: Arc::new(TestCrypto), + ..crate::default_builder() + } + .with_default_config() + .unwrap() + .build(k) + .await + .unwrap(); + let s2 = k2.space(S1.clone()).await.unwrap(); + let u2 = u_r.recv().await.unwrap(); + + println!("url: {u1}, {u2}"); + + let bob = AgentId::from(bytes::Bytes::from_static(b"bob")); + let ned = AgentId::from(bytes::Bytes::from_static(b"ned")); + + s1.peer_store() + .insert(vec![make_agent_info(bob.clone(), u2)]) + .await + .unwrap(); + s2.peer_store() + .insert(vec![make_agent_info(ned.clone(), u1)]) + .await + .unwrap(); + + s1.send_notify( + bob.clone(), + ned.clone(), + bytes::Bytes::from_static(b"hello"), + ) + .await + .unwrap(); + + let (t, f, s, d) = recv.lock().unwrap().remove(0); + assert_eq!(&bob, &t); + assert_eq!(&ned, &f); + assert_eq!(S1, s); + assert_eq!("hello", String::from_utf8_lossy(&d)); + + s2.send_notify( + ned.clone(), + bob.clone(), + bytes::Bytes::from_static(b"world"), + ) + .await + .unwrap(); + + let (t, f, s, d) = recv.lock().unwrap().remove(0); + assert_eq!(&ned, &t); + assert_eq!(&bob, &f); + assert_eq!(S1, s); + assert_eq!("world", String::from_utf8_lossy(&d)); +} diff --git a/crates/core/src/factories/mem_transport.rs b/crates/core/src/factories/mem_transport.rs index 07ba9c2f..7029e519 100644 --- a/crates/core/src/factories/mem_transport.rs +++ b/crates/core/src/factories/mem_transport.rs @@ -26,9 +26,10 @@ impl TransportFactory for MemTransportFactory { fn create( &self, _builder: Arc, - handler: Arc, + handler: DynTxHandler, ) -> BoxFut<'static, K2Result> { Box::pin(async move { + let handler = TxImpHnd::new(handler); let imp = MemTransport::create(handler.clone()).await; Ok(DefaultTransport::create(&handler, imp)) }) diff --git a/crates/core/src/factories/mem_transport/test.rs b/crates/core/src/factories/mem_transport/test.rs index d7051bc0..090190ef 100644 --- a/crates/core/src/factories/mem_transport/test.rs +++ b/crates/core/src/factories/mem_transport/test.rs @@ -63,11 +63,17 @@ impl TxHandler for TrackHnd { } impl TxSpaceHandler for TrackHnd { - fn recv_space_notify(&self, peer: Url, space: SpaceId, data: bytes::Bytes) { + fn recv_space_notify( + &self, + peer: Url, + space: SpaceId, + data: bytes::Bytes, + ) -> K2Result<()> { self.track .lock() .unwrap() .push(Track::SpaceRecv(peer, space, data)); + Ok(()) } } @@ -78,11 +84,12 @@ impl TxModuleHandler for TrackHnd { space: SpaceId, module: String, data: bytes::Bytes, - ) { + ) -> K2Result<()> { self.track .lock() .unwrap() .push(Track::ModRecv(peer, space, module, data)); + Ok(()) } } @@ -194,7 +201,6 @@ impl TrackHnd { async fn gen_tx(hnd: DynTxHandler) -> DynTransport { let builder = Arc::new(crate::default_builder()); - let hnd = TxImpHnd::new(hnd); builder .transport .create(builder.clone(), hnd) diff --git a/crates/tool_proto_build/src/main.rs b/crates/tool_proto_build/src/main.rs index bbf0ac81..c892962c 100644 --- a/crates/tool_proto_build/src/main.rs +++ b/crates/tool_proto_build/src/main.rs @@ -7,4 +7,9 @@ fn main() { &["../api/proto/"], ) .expect("Failed to compile protobuf protocol files"); + std::env::set_var("OUT_DIR", "../core/proto/gen"); + prost_build::Config::new() + .bytes(["."]) + .compile_protos(&["../core/proto/space.proto"], &["../core/proto/"]) + .expect("Failed to compile protobuf protocol files"); }