Skip to content

Commit 0baa8b2

Browse files
authored
rust-sdk: improve send api (#3011)
* nym-sdk: remove unneeded function * rust-sdk: rework send api a bit * rust-sdk: add send_wait without impl * fix doc test failures * more doctest fixes
1 parent 2ab969b commit 0baa8b2

File tree

4 files changed

+152
-103
lines changed

4 files changed

+152
-103
lines changed

clients/client-core/src/client/inbound_messages.rs

+8
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,12 @@ impl InputMessage {
7777
lane,
7878
}
7979
}
80+
81+
pub fn lane(&self) -> &TransmissionLane {
82+
match self {
83+
InputMessage::Regular { lane, .. }
84+
| InputMessage::Anonymous { lane, .. }
85+
| InputMessage::Reply { lane, .. } => lane,
86+
}
87+
}
8088
}

sdk/rust/nym-sdk/src/mixnet.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ mod connection_state;
3636
mod keys;
3737
mod paths;
3838

39-
pub use client::{DisconnectedMixnetClient, MixnetClient, MixnetClientBuilder, MixnetClientSender};
39+
pub use client::{
40+
DisconnectedMixnetClient, IncludedSurbs, MixnetClient, MixnetClientBuilder, MixnetClientSender,
41+
};
4042
pub use client_core::{
4143
client::{
4244
inbound_messages::InputMessage,

sdk/rust/nym-sdk/src/mixnet/client.rs

+125-98
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ use crate::{Error, Result};
2727

2828
use super::{connection_state::BuilderState, Config, GatewayKeyMode, Keys, KeysArc, StoragePaths};
2929

30+
// The number of surbs to include in a message by default
31+
const DEFAULT_NUMBER_OF_SURBS: u32 = 5;
32+
3033
#[derive(Default)]
3134
pub struct MixnetClientBuilder {
3235
config: Option<Config>,
@@ -119,6 +122,12 @@ impl<B> DisconnectedMixnetClient<B>
119122
where
120123
B: ReplyStorageBackend + Sync + Send + 'static,
121124
{
125+
/// Create a new mixnet client in a disconnected state. If no config options are supplied,
126+
/// creates a new client with ephemeral keys stored in RAM, which will be discarded at
127+
/// application close.
128+
///
129+
/// Callers have the option of supplying futher parameters to store persistent identities at a
130+
/// location on-disk, if desired.
122131
async fn new(
123132
config: Option<Config>,
124133
paths: Option<StoragePaths>,
@@ -127,17 +136,58 @@ where
127136
<B as ReplyStorageBackend>::StorageError: Send + Sync,
128137
{
129138
let config = config.unwrap_or_default();
130-
131139
let reply_surb_database_path = paths.as_ref().map(|p| p.reply_surb_database_path.clone());
132140

133-
let reply_storage_backend =
134-
ReplyStorageBackend::new(&config.debug_config, reply_surb_database_path)
135-
.await
136-
.map_err(|err| Error::StorageError {
137-
source: Box::new(err),
138-
})?;
141+
// The reply storage backend is generic, and can be set by the caller/instantiator
142+
let reply_storage_backend = B::new(&config.debug_config, reply_surb_database_path)
143+
.await
144+
.map_err(|err| Error::StorageError {
145+
source: Box::new(err),
146+
})?;
147+
148+
// If we are provided paths to keys, use them if they are available. And if they are
149+
// not, write the generated keys back to storage.
150+
let key_manager = if let Some(ref paths) = paths {
151+
let path_finder = ClientKeyPathfinder::from(paths.clone());
152+
153+
// Try load keys
154+
match KeyManager::load_keys_but_gateway_is_optional(&path_finder) {
155+
Ok(key_manager) => {
156+
log::debug!("Keys loaded");
157+
key_manager
158+
}
159+
Err(err) => {
160+
log::debug!("Not loading keys: {err}");
161+
if let Some(path) = path_finder.any_file_exists_and_return() {
162+
if paths.operating_mode.is_keep() {
163+
return Err(Error::DontOverwrite(path));
164+
}
165+
}
166+
167+
// Double check using a function that has slightly different internal logic. I
168+
// know this is a bit defensive, but I don't want to overwrite
169+
assert!(!(path_finder.any_file_exists() && paths.operating_mode.is_keep()));
139170

140-
create_new_client_with_custom_storage(Some(config), paths, reply_storage_backend)
171+
// Create new keys and write to storage
172+
let key_manager = client_core::init::new_client_keys();
173+
// WARN: this will overwrite!
174+
key_manager.store_keys(&path_finder)?;
175+
key_manager
176+
}
177+
}
178+
} else {
179+
// Ephemeral keys that we only store in memory
180+
log::debug!("Creating new ephemeral keys");
181+
client_core::init::new_client_keys()
182+
};
183+
184+
Ok(DisconnectedMixnetClient {
185+
key_manager,
186+
config,
187+
storage_paths: paths,
188+
state: BuilderState::New,
189+
reply_storage_backend,
190+
})
141191
}
142192

143193
/// Client keys are generated at client creation if none were found. The gateway shared
@@ -353,68 +403,28 @@ where
353403
}
354404
}
355405

356-
/// Create a new mixnet client builder. If no config options are supplied, creates a new client with
357-
/// ephemeral keys stored in RAM, which will be discarded at application close.
358-
///
359-
/// Callers have the option of supplying futher parameters to store persistent identities at a
360-
/// location on-disk, if desired.
361-
///
362-
/// A custom storage backend can be passed in.
363-
///
364-
/// NOTE: the major reason for this being a free function is to allow convenient type deduction
365-
fn create_new_client_with_custom_storage<B>(
366-
config_option: Option<Config>,
367-
paths: Option<StoragePaths>,
368-
reply_storage_backend: B,
369-
) -> Result<DisconnectedMixnetClient<B>>
370-
where
371-
B: ReplyStorageBackend + Sync + Send + 'static,
372-
{
373-
let config = config_option.unwrap_or_default();
374-
375-
// If we are provided paths to keys, use them if they are available. And if they are
376-
// not, write the generated keys back to storage.
377-
let key_manager = if let Some(ref paths) = paths {
378-
let path_finder = ClientKeyPathfinder::from(paths.clone());
379-
380-
// Try load keys
381-
match KeyManager::load_keys_but_gateway_is_optional(&path_finder) {
382-
Ok(key_manager) => {
383-
log::debug!("Keys loaded");
384-
key_manager
385-
}
386-
Err(err) => {
387-
log::debug!("Not loading keys: {err}");
388-
if let Some(path) = path_finder.any_file_exists_and_return() {
389-
if paths.operating_mode.is_keep() {
390-
return Err(Error::DontOverwrite(path));
391-
}
392-
}
406+
pub enum IncludedSurbs {
407+
Amount(u32),
408+
ExposeSelfAddress,
409+
}
410+
impl Default for IncludedSurbs {
411+
fn default() -> Self {
412+
Self::Amount(DEFAULT_NUMBER_OF_SURBS)
413+
}
414+
}
393415

394-
// Double check using a function that has slightly different internal logic. I
395-
// know this is a bit defensive, but I don't want to overwrite
396-
assert!(!(path_finder.any_file_exists() && paths.operating_mode.is_keep()));
416+
impl IncludedSurbs {
417+
pub fn new(reply_surbs: u32) -> Self {
418+
Self::Amount(reply_surbs)
419+
}
397420

398-
// Create new keys and write to storage
399-
let key_manager = client_core::init::new_client_keys();
400-
// WARN: this will overwrite!
401-
key_manager.store_keys(&path_finder)?;
402-
key_manager
403-
}
404-
}
405-
} else {
406-
// Ephemeral keys that we only store in memory
407-
log::debug!("Creating new ephemeral keys");
408-
client_core::init::new_client_keys()
409-
};
410-
411-
Ok(DisconnectedMixnetClient {
412-
key_manager,
413-
config,
414-
storage_paths: paths,
415-
state: BuilderState::New,
416-
reply_storage_backend,
417-
})
421+
pub fn none() -> Self {
422+
Self::Amount(0)
423+
}
424+
425+
pub fn expose_self_address() -> Self {
426+
Self::ExposeSelfAddress
427+
}
418428
}
419429

420430
/// Client connected to the Nym mixnet.
@@ -479,36 +489,50 @@ impl MixnetClient {
479489
&self.nym_address
480490
}
481491

482-
/// Get a shallow clone of [`MixnetClientSender`]
492+
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
493+
/// receive logic in different locations.
483494
pub fn sender(&self) -> MixnetClientSender {
484495
MixnetClientSender {
485496
client_input: self.client_input.clone(),
486497
}
487498
}
488499

489-
/// Get a shallow clone of [`ConnectionCommandSender`].
500+
/// Get a shallow clone of [`ConnectionCommandSender`]. This is useful if you want to e.g
501+
/// explictly close a transmission lane that is still sending data even though it should
502+
/// cancel.
490503
pub fn connection_command_sender(&self) -> client_connections::ConnectionCommandSender {
491504
self.client_input.connection_command_sender.clone()
492505
}
493506

494-
/// Get a shallow clone of [`LaneQueueLengths`].
507+
/// Get a shallow clone of [`LaneQueueLengths`]. This is useful to manually implement some form
508+
/// of backpressure logic.
495509
pub fn shared_lane_queue_lengths(&self) -> client_connections::LaneQueueLengths {
496510
self.client_state.shared_lane_queue_lengths.clone()
497511
}
498512

499513
/// Sends stringy data to the supplied Nym address
514+
///
515+
/// # Example
516+
///
517+
/// ```no_run
518+
/// use nym_sdk::mixnet;
519+
///
520+
/// #[tokio::main]
521+
/// async fn main() {
522+
/// let address = "foobar";
523+
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
524+
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
525+
/// client.send_str(recipient, "hi").await;
526+
/// }
527+
/// ```
500528
pub async fn send_str(&self, address: Recipient, message: &str) {
501529
let message_bytes = message.to_string().into_bytes();
502-
self.send_bytes(address, message_bytes).await;
530+
self.send_bytes(address, message_bytes, IncludedSurbs::default())
531+
.await;
503532
}
504533

505-
/// Sends stringy data to the supplied Nym address, and skip sending reply-SURBs
506-
pub async fn send_str_direct(&self, address: Recipient, message: &str) {
507-
let message_bytes = message.to_string().into_bytes();
508-
self.send_bytes_direct(address, message_bytes).await;
509-
}
510-
511-
/// Sends bytes to the supplied Nym address
534+
/// Sends bytes to the supplied Nym address. There is the option to specify the number of
535+
/// reply-SURBs to include.
512536
///
513537
/// # Example
514538
///
@@ -520,35 +544,38 @@ impl MixnetClient {
520544
/// let address = "foobar";
521545
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
522546
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
523-
/// client.send_bytes(recipient, "hi".to_owned().into_bytes()).await;
547+
/// let surbs = mixnet::IncludedSurbs::default();
548+
/// client.send_bytes(recipient, "hi".to_owned().into_bytes(), surbs).await;
524549
/// }
525550
/// ```
526-
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>) {
551+
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>, surbs: IncludedSurbs) {
527552
let lane = TransmissionLane::General;
528-
let input_msg = InputMessage::new_anonymous(address, message, 20, lane);
529-
self.send_input_message(input_msg).await
553+
let input_msg = match surbs {
554+
IncludedSurbs::Amount(surbs) => {
555+
InputMessage::new_anonymous(address, message, surbs, lane)
556+
}
557+
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(address, message, lane),
558+
};
559+
self.send(input_msg).await
530560
}
531561

532-
/// Sends a [`InputMessage`] to the mixnet.
533-
async fn send_input_message(&self, message: InputMessage) {
562+
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
563+
/// full customization.
564+
async fn send(&self, message: InputMessage) {
534565
if self.client_input.send(message).await.is_err() {
535566
log::error!("Failed to send message");
536567
}
537568
}
538569

539-
/// Sends bytes to the supplied Nym address, and skip sending reply-SURBs
540-
pub async fn send_bytes_direct(&self, address: Recipient, message: Vec<u8>) {
541-
let lane = TransmissionLane::General;
542-
let input_msg = InputMessage::new_regular(address, message, lane);
543-
if self
544-
.client_input
545-
.input_sender
546-
.send(input_msg)
547-
.await
548-
.is_err()
549-
{
550-
log::error!("Failed to send message");
551-
}
570+
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
571+
/// full customization.
572+
///
573+
/// Waits until the message is actually sent, or close to being sent, until returning.
574+
///
575+
/// NOTE: this not yet implemented.
576+
#[allow(unused)]
577+
async fn send_wait(&self, _message: InputMessage) {
578+
todo!();
552579
}
553580

554581
/// Wait for messages from the mixnet

service-providers/common/examples/control_requests.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
// use nym_client::client::config::{BaseConfig, Config, GatewayEndpointConfig};
55
// use nym_client::client::{DirectClient, KeyManager, Recipient, ReconstructedMessage, SocketClient};
6-
use nym_sdk::mixnet::{MixnetClient, Recipient, ReconstructedMessage};
6+
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, Recipient, ReconstructedMessage};
77
use service_providers_common::interface::{
88
ControlRequest, ControlResponse, ProviderInterfaceVersion, Request, Response, ResponseContent,
99
};
@@ -51,21 +51,33 @@ async fn main() -> anyhow::Result<()> {
5151
// // TODO: currently we HAVE TO use surbs unfortunately
5252
println!("Sending 'Health' request...");
5353
client
54-
.send_bytes(provider, full_request_health.into_bytes())
54+
.send_bytes(
55+
provider,
56+
full_request_health.into_bytes(),
57+
IncludedSurbs::new(10),
58+
)
5559
.await;
5660
let response = wait_for_control_response(&mut client).await;
5761
println!("response to 'Health' request: {response:#?}");
5862

5963
println!("Sending 'BinaryInfo' request...");
6064
client
61-
.send_bytes(provider, full_request_binary_info.into_bytes())
65+
.send_bytes(
66+
provider,
67+
full_request_binary_info.into_bytes(),
68+
IncludedSurbs::none(),
69+
)
6270
.await;
6371
let response = wait_for_control_response(&mut client).await;
6472
println!("response to 'BinaryInfo' request: {response:#?}");
6573

6674
println!("Sending 'SupportedRequestVersions' request...");
6775
client
68-
.send_bytes(provider, full_request_versions.into_bytes())
76+
.send_bytes(
77+
provider,
78+
full_request_versions.into_bytes(),
79+
IncludedSurbs::none(),
80+
)
6981
.await;
7082
let response = wait_for_control_response(&mut client).await;
7183
println!("response to 'SupportedRequestVersions' request: {response:#?}");

0 commit comments

Comments
 (0)