Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport Integration - Space Notify Send & Receive #54

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/api/src/kitsune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2.
pub trait KitsuneHandler: 'static + Send + Sync + std::fmt::Debug {
/// A notification that a new listening address has been bound.
/// Peers should now go to this new address to reach this node.
fn new_listening_address(&self, this_url: Url) {
drop(this_url);
jost-s marked this conversation as resolved.
Show resolved Hide resolved
}

/// A peer has disconnected from us. If they did so gracefully
/// the reason will be is_some().
fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
drop((peer, reason));
}

/// Gather preflight data to send to a new opening connection.
/// Returning an Err result will close this connection.
///
Expand Down
27 changes: 18 additions & 9 deletions crates/api/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2 such as messages from peers.
pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug {
/// We have received an incomming message from a remote peer.
///
/// If this callback handler returns an `Err` response, the connection
/// will be closed immediately.
/// The sync handler for receiving notifications sent by a remote
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
//
// Note: this is the minimal low-level messaging unit. We can decide
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn incoming_message(
fn recv_notify(
&self,
peer: AgentId,
to_agent: AgentId,
from_agent: AgentId,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()>;
) -> K2Result<()> {
drop((to_agent, from_agent, space, data));
Ok(())
}
}

/// Trait-object [SpaceHandler].
Expand Down Expand Up @@ -56,8 +60,12 @@ pub trait Space: 'static + Send + Sync + std::fmt::Debug {
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn send_message(&self, peer: AgentId, data: bytes::Bytes)
-> BoxFut<'_, ()>;
fn send_notify(
&self,
to_agent: AgentId,
from_agent: AgentId,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>>;
}

/// Trait-object [Space].
Expand All @@ -75,6 +83,7 @@ pub trait SpaceFactory: 'static + Send + Sync + std::fmt::Debug {
builder: Arc<builder::Builder>,
handler: DynSpaceHandler,
space: SpaceId,
tx: transport::DynTransport,
) -> BoxFut<'static, K2Result<DynSpace>>;
}

Expand Down
25 changes: 17 additions & 8 deletions crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl TxImpHnd {
let space = SpaceId::from(space);
if let Some(h) = self.space_map.lock().unwrap().get(&space)
{
h.recv_space_notify(peer, space, data);
h.recv_space_notify(peer, space, data)?;
}
}
Ok(())
Expand All @@ -104,7 +104,7 @@ impl TxImpHnd {
.unwrap()
.get(&(space.clone(), module.clone()))
{
h.recv_module_msg(peer, space, module, data);
h.recv_module_msg(peer, space, module, data)?;
}
}
Ok(())
Expand Down Expand Up @@ -139,7 +139,7 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
pub type DynTxImp = Arc<dyn TxImp>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
pub trait Transport: Send + Sync {
pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
/// Register a space handler for receiving incoming notifications.
///
/// Panics if you attempt to register a duplicate handler for
Expand Down Expand Up @@ -371,9 +371,16 @@ pub type DynTxHandler = Arc<dyn TxHandler>;
/// Handler for space-related events.
pub trait TxSpaceHandler: TxBaseHandler {
/// The sync handler for receiving notifications sent by a remote
/// peer in reference to a particular space.
fn recv_space_notify(&self, peer: Url, space: SpaceId, data: bytes::Bytes) {
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
fn recv_space_notify(
&self,
peer: Url,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()> {
drop((peer, space, data));
Ok(())
}
}

Expand All @@ -383,15 +390,17 @@ pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
/// Handler for module-related events.
pub trait TxModuleHandler: TxBaseHandler {
/// The sync handler for receiving module messages sent by a remote
/// peer in reference to a particular space.
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
fn recv_module_msg(
&self,
peer: Url,
space: SpaceId,
module: String,
data: bytes::Bytes,
) {
) -> K2Result<()> {
drop((peer, space, module, data));
Ok(())
}
}

Expand All @@ -408,7 +417,7 @@ pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
fn create(
&self,
builder: Arc<builder::Builder>,
handler: Arc<TxImpHnd>,
handler: DynTxHandler,
) -> BoxFut<'static, K2Result<DynTransport>>;
}

Expand Down
17 changes: 17 additions & 0 deletions crates/core/proto/gen/kitsune2.space.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This file is @generated by prost-build.
/// A Kitsune2 space protocol message.
///
/// There is only a single space-level message type. That is a notify
/// between two agents at that space level. Making this a very simple message.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct K2SpaceProto {
/// The destination agent.
#[prost(bytes = "bytes", tag = "1")]
pub to_agent: ::prost::bytes::Bytes,
/// The source agent.
#[prost(bytes = "bytes", tag = "2")]
pub from_agent: ::prost::bytes::Bytes,
/// The payload or content of this message.
#[prost(bytes = "bytes", tag = "3")]
pub data: ::prost::bytes::Bytes,
}
18 changes: 18 additions & 0 deletions crates/core/proto/space.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package kitsune2.space;

// A Kitsune2 space protocol message.
//
// There is only a single space-level message type. That is a notify
// between two agents at that space level. Making this a very simple message.
message K2SpaceProto {
// The destination agent.
bytes to_agent = 1;

// The source agent.
bytes from_agent = 2;

// The payload or content of this message.
bytes data = 3;
}
51 changes: 47 additions & 4 deletions crates/core/src/factories/core_kitsune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,50 @@ impl KitsuneFactory for CoreKitsuneFactory {
handler: DynKitsuneHandler,
) -> BoxFut<'static, K2Result<DynKitsune>> {
Box::pin(async move {
let tx = builder
.transport
.create(
builder.clone(),
Arc::new(TxHandlerTranslator(handler.clone())),
)
.await?;
let out: DynKitsune =
Arc::new(CoreKitsune::new(builder.clone(), handler));
Arc::new(CoreKitsune::new(builder.clone(), handler, tx));
Ok(out)
})
}
}

#[derive(Debug)]
struct TxHandlerTranslator(DynKitsuneHandler);

impl transport::TxBaseHandler for TxHandlerTranslator {
fn new_listening_address(&self, this_url: Url) {
self.0.new_listening_address(this_url);
}

fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
self.0.peer_disconnect(peer, reason);
}
}

impl transport::TxHandler for TxHandlerTranslator {
fn preflight_gather_outgoing(
&self,
peer_url: Url,
) -> K2Result<bytes::Bytes> {
self.0.preflight_gather_outgoing(peer_url)
}

fn preflight_validate_incoming(
&self,
peer_url: Url,
data: bytes::Bytes,
) -> K2Result<()> {
self.0.preflight_validate_incoming(peer_url, data)
}
}

type SpaceFut =
futures::future::Shared<BoxFut<'static, K2Result<space::DynSpace>>>;
type Map = HashMap<SpaceId, SpaceFut>;
Expand All @@ -45,17 +82,20 @@ struct CoreKitsune {
builder: Arc<builder::Builder>,
handler: DynKitsuneHandler,
map: std::sync::Mutex<Map>,
tx: transport::DynTransport,
}

impl CoreKitsune {
pub fn new(
builder: Arc<builder::Builder>,
handler: DynKitsuneHandler,
tx: transport::DynTransport,
) -> Self {
Self {
builder,
handler,
map: std::sync::Mutex::new(HashMap::new()),
tx,
}
}
}
Expand All @@ -73,13 +113,14 @@ impl Kitsune for CoreKitsune {
Entry::Vacant(e) => {
let builder = self.builder.clone();
let handler = self.handler.clone();
let tx = self.tx.clone();
e.insert(futures::future::FutureExt::shared(Box::pin(
async move {
let sh =
handler.create_space(space.clone()).await?;
let s = builder
.space
.create(builder.clone(), sh, space)
.create(builder.clone(), sh, space, tx)
.await?;
Ok(s)
},
Expand All @@ -104,9 +145,11 @@ mod test {
struct S;

impl SpaceHandler for S {
fn incoming_message(
fn recv_notify(
&self,
_peer: AgentId,
_to_agent: AgentId,
_from_agent: AgentId,
_space: SpaceId,
_data: bytes::Bytes,
) -> K2Result<()> {
// this test is a bit of a stub for now until we have the
Expand Down
Loading
Loading