Skip to content

Commit 803f6f8

Browse files
committed
network-requester: replace websocket with mixnet client
1 parent 0baa8b2 commit 803f6f8

File tree

16 files changed

+779
-347
lines changed

16 files changed

+779
-347
lines changed

Cargo.lock

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

clients/client-core/src/client/real_messages_control/real_traffic_stream.rs

-2
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ where
342342
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
343343
match id {
344344
ConnectionCommand::Close(id) => self.on_close_connection(id),
345-
ConnectionCommand::ActiveConnections(_) => panic!(),
346345
}
347346
}
348347

@@ -421,7 +420,6 @@ where
421420
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
422421
match id {
423422
ConnectionCommand::Close(id) => self.on_close_connection(id),
424-
ConnectionCommand::ActiveConnections(_) => panic!(),
425423
}
426424
}
427425

clients/socks5/src/socks/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use client_core::client::{
1010
};
1111
use log::*;
1212
use nymsphinx::addressing::clients::Recipient;
13-
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
13+
use proxy_helpers::connection_controller::Controller;
1414
use std::net::SocketAddr;
1515
use tap::TapFallible;
1616
use task::TaskClient;
@@ -69,7 +69,7 @@ impl SphinxSocksServer {
6969
// controller for managing all active connections
7070
let (mut active_streams_controller, controller_sender) = Controller::new(
7171
client_connection_tx,
72-
BroadcastActiveConnections::Off,
72+
//BroadcastActiveConnections::Off,
7373
self.shutdown.clone(),
7474
);
7575
tokio::spawn(async move {

common/client-connections/src/lib.rs

-6
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ pub enum ConnectionCommand {
2525
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
2626
// transmission lanes.
2727
Close(ConnectionId),
28-
29-
// In the network requester for example, we usually want to broadcast active connections
30-
// regularly, so we know what connections we need to request lane queue lengths for from the
31-
// client.
32-
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
33-
ActiveConnections(Vec<ConnectionId>),
3428
}
3529

3630
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down

common/socks5/proxy-helpers/src/connection_controller.rs

+1-27
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@ use futures::StreamExt;
77
use log::*;
88
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
99
use socks5_requests::{ConnectionId, NetworkData, SendRequest};
10-
use std::{
11-
collections::{HashMap, HashSet},
12-
time::Duration,
13-
};
10+
use std::collections::{HashMap, HashSet};
1411
use task::TaskClient;
15-
use tokio::time;
1612

1713
/// A generic message produced after reading from a socket/connection. It includes data that was
1814
/// actually read alongside boolean indicating whether the connection got closed so that
@@ -116,10 +112,6 @@ pub struct Controller {
116112
// Broadcast closed connections
117113
client_connection_tx: ConnectionCommandSender,
118114

119-
// The controller can broadcast active connections. This is useful in the network-requester
120-
// where its used to query the client for lane queue lengths
121-
broadcast_connections: BroadcastActiveConnections,
122-
123115
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
124116
// how to handle it more gracefully
125117

@@ -133,7 +125,6 @@ pub struct Controller {
133125
impl Controller {
134126
pub fn new(
135127
client_connection_tx: ConnectionCommandSender,
136-
broadcast_connections: BroadcastActiveConnections,
137128
shutdown: TaskClient,
138129
) -> (Self, ControllerSender) {
139130
let (sender, receiver) = mpsc::unbounded();
@@ -143,7 +134,6 @@ impl Controller {
143134
receiver,
144135
recently_closed: HashSet::new(),
145136
client_connection_tx,
146-
broadcast_connections,
147137
pending_messages: HashMap::new(),
148138
shutdown,
149139
},
@@ -194,15 +184,6 @@ impl Controller {
194184
}
195185
}
196186

197-
fn broadcast_active_connections(&mut self) {
198-
// What about the recently closed ones? Hopefully we can ignore them ...
199-
let conn_ids = self.active_connections.keys().copied().collect();
200-
201-
self.client_connection_tx
202-
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
203-
.unwrap();
204-
}
205-
206187
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
207188
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
208189
if !payload.is_empty() {
@@ -259,8 +240,6 @@ impl Controller {
259240
}
260241

261242
pub async fn run(&mut self) {
262-
let mut interval = time::interval(Duration::from_millis(500));
263-
264243
loop {
265244
tokio::select! {
266245
command = self.receiver.next() => match command {
@@ -276,11 +255,6 @@ impl Controller {
276255
break;
277256
}
278257
},
279-
_ = interval.tick() => {
280-
if self.broadcast_connections == BroadcastActiveConnections::On {
281-
self.broadcast_active_connections();
282-
}
283-
},
284258
}
285259
}
286260
self.shutdown.recv_timeout().await;

service-providers/network-requester/Cargo.toml

+12-5
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,48 @@
44
[package]
55
name = "nym-network-requester"
66
version = "1.1.8"
7-
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
8-
edition = "2021"
7+
authors.workspace = true
8+
edition.workspace = true
99
rust-version = "1.65"
1010

1111
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1212

1313
[dependencies]
1414
async-trait = { version = "0.1.51" }
15-
clap = {version = "4.0", features = ["derive"]}
15+
clap = {version = "4.0", features = ["cargo", "derive"]}
1616
dirs = "4.0"
1717
futures = "0.3.24"
1818
ipnetwork = "0.20.0"
19+
lazy_static = { workspace = true }
1920
log = { workspace = true }
2021
pretty_env_logger = "0.4.0"
2122
publicsuffix = "1.5" # Can't update this until bip updates to support newer idna version
2223
rand = "0.7.3"
2324
reqwest = { version = "0.11.11", features = ["json"] }
2425
serde = { version = "1.0", features = ["derive"] }
2526
sqlx = { version = "0.6.1", features = ["runtime-tokio-rustls", "chrono"]}
27+
tap = { workspace = true }
2628
thiserror = "1.0"
2729
tokio = { version = "1.24.1", features = [ "net", "rt-multi-thread", "macros" ] }
2830
tokio-tungstenite = "0.17.2"
29-
31+
url = { workspace = true }
3032

3133
# internal
3234
build-information = { path = "../../common/build-information" }
3335
client-connections = { path = "../../common/client-connections" }
36+
client-core = { path = "../../clients/client-core" }
3437
completions = { path = "../../common/completions" }
38+
config = { path = "../../common/config" }
39+
crypto = { path = "../../common/crypto" }
40+
logging = { path = "../../common/logging"}
3541
network-defaults = { path = "../../common/network-defaults" }
42+
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
3643
nymsphinx = { path = "../../common/nymsphinx" }
37-
logging = { path = "../../common/logging"}
3844
ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
3945
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
4046
service-providers-common = { path = "../common" }
4147
socks5-requests = { path = "../../common/socks5/requests" }
4248
statistics-common = { path = "../../common/statistics" }
4349
task = { path = "../../common/task" }
50+
version-checker = { path = "../../common/version-checker" }
4451
websocket-requests = { path = "../../clients/native/websocket-requests" }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{
5+
cli::{override_config, OverrideConfig},
6+
config::Config,
7+
error::NetworkRequesterError,
8+
};
9+
use clap::Args;
10+
use config::NymConfig;
11+
use crypto::asymmetric::identity;
12+
use nymsphinx::addressing::clients::Recipient;
13+
use serde::Serialize;
14+
use std::fmt::Display;
15+
use tap::TapFallible;
16+
17+
#[derive(Args, Clone)]
18+
pub(crate) struct Init {
19+
/// Id of the nym-mixnet-client we want to create config for.
20+
#[clap(long)]
21+
id: String,
22+
23+
/// Id of the gateway we are going to connect to.
24+
#[clap(long)]
25+
gateway: Option<identity::PublicKey>,
26+
27+
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
28+
/// potentially causing loss of access.
29+
#[clap(long)]
30+
force_register_gateway: bool,
31+
32+
/// Comma separated list of rest endpoints of the nyxd validators
33+
#[clap(long, alias = "nymd_validators", value_delimiter = ',')]
34+
nyxd_urls: Option<Vec<url::Url>>,
35+
36+
/// Comma separated list of rest endpoints of the API validators
37+
#[clap(long, alias = "api_validators", value_delimiter = ',')]
38+
// the alias here is included for backwards compatibility (1.1.4 and before)
39+
nym_apis: Option<Vec<url::Url>>,
40+
41+
/// Set this client to work in a enabled credentials mode that would attempt to use gateway
42+
/// with bandwidth credential requirement.
43+
#[clap(long)]
44+
enabled_credentials_mode: Option<bool>,
45+
46+
/// Save a summary of the initialization to a json file
47+
#[clap(long)]
48+
output_json: bool,
49+
}
50+
51+
impl From<Init> for OverrideConfig {
52+
fn from(init_config: Init) -> Self {
53+
OverrideConfig {
54+
nym_apis: init_config.nym_apis,
55+
fastmode: false,
56+
no_cover: false,
57+
58+
nyxd_urls: init_config.nyxd_urls,
59+
enabled_credentials_mode: init_config.enabled_credentials_mode,
60+
}
61+
}
62+
}
63+
64+
#[derive(Debug, Serialize)]
65+
pub struct InitResults {
66+
#[serde(flatten)]
67+
client_core: client_core::init::InitResults,
68+
}
69+
70+
impl InitResults {
71+
fn new(config: &Config, address: &Recipient) -> Self {
72+
Self {
73+
client_core: client_core::init::InitResults::new(config.get_base(), address),
74+
}
75+
}
76+
}
77+
78+
impl Display for InitResults {
79+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80+
write!(f, "{}", self.client_core)
81+
}
82+
}
83+
84+
pub(crate) async fn execute(args: &Init) -> Result<(), NetworkRequesterError> {
85+
println!("Initialising client...");
86+
87+
let id = &args.id;
88+
89+
let already_init = Config::default_config_file_path(id).exists();
90+
if already_init {
91+
println!("Client \"{id}\" was already initialised before");
92+
}
93+
94+
// Usually you only register with the gateway on the first init, however you can force
95+
// re-registering if wanted.
96+
let user_wants_force_register = args.force_register_gateway;
97+
if user_wants_force_register {
98+
println!("Instructed to force registering gateway. This might overwrite keys!");
99+
}
100+
101+
// If the client was already initialized, don't generate new keys and don't re-register with
102+
// the gateway (because this would create a new shared key).
103+
// Unless the user really wants to.
104+
let register_gateway = !already_init || user_wants_force_register;
105+
106+
// Attempt to use a user-provided gateway, if possible
107+
let user_chosen_gateway_id = args.gateway;
108+
109+
// Load and potentially override config
110+
let mut config = override_config(Config::new(id), OverrideConfig::from(args.clone()));
111+
112+
// Setup gateway by either registering a new one, or creating a new config from the selected
113+
// one but with keys kept, or reusing the gateway configuration.
114+
let gateway = client_core::init::setup_gateway_from_config::<Config, _>(
115+
register_gateway,
116+
user_chosen_gateway_id,
117+
config.get_base(),
118+
)
119+
.await
120+
.map_err(|source| {
121+
eprintln!("Failed to setup gateway\nError: {source}");
122+
NetworkRequesterError::FailedToSetupGateway { source }
123+
})?;
124+
125+
config.get_base_mut().set_gateway_endpoint(gateway);
126+
127+
config.save_to_file(None).tap_err(|_| {
128+
log::error!("Failed to save the config file");
129+
})?;
130+
131+
print_saved_config(&config);
132+
133+
let address = client_core::init::get_client_address_from_stored_keys(config.get_base())?;
134+
let init_results = InitResults::new(&config, &address);
135+
println!("{init_results}");
136+
137+
// Output summary to a json file, if specified
138+
if args.output_json {
139+
client_core::init::output_to_json(&init_results, "client_init_results.json");
140+
}
141+
142+
println!("\nThe address of this client is: {address}\n");
143+
Ok(())
144+
}
145+
146+
fn print_saved_config(config: &Config) {
147+
let config_save_location = config.get_config_file_save_location();
148+
println!("Saved configuration file to {config_save_location:?}");
149+
println!("Using gateway: {}", config.get_base().get_gateway_id());
150+
log::debug!("Gateway id: {}", config.get_base().get_gateway_id());
151+
log::debug!("Gateway owner: {}", config.get_base().get_gateway_owner());
152+
log::debug!(
153+
"Gateway listener: {}",
154+
config.get_base().get_gateway_listener()
155+
);
156+
println!("Client configuration completed.\n");
157+
}

0 commit comments

Comments
 (0)