Skip to content

Commit a77881d

Browse files
authored
Intermediate gateway-heart surgery checkpoint (#199)
* Initial draft for ClientsHandler * Created listener struct * typo * Stateful websocket connection handler * Exposing modules * Depdendencies updates * Moved listener to correct file + made start consume listener * Main starting new listener * Catching sigint * Copied client storage from provider into gateway * Exposed websocket listener type for nicer import path * Defined websocket message receiver concrete type * Client ledger struct without implementation * ClientsHandler using more concrete types * Mixnet sender + receiver and exposed listener type * Handling mix packets * Ability to forward mix packets * "starting" both listeners at main * Depedencies updates * Initial type definitions for client messages
1 parent 8baa236 commit a77881d

19 files changed

+1114
-72
lines changed

Cargo.lock

+5-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gateway/Cargo.toml

+7-3
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,19 @@ edition = "2018"
99
[dependencies]
1010
dotenv = "0.15.0"
1111
futures = "0.3"
12-
futures-channel = "0.3"
13-
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] }
1412
log = "0.4"
15-
multi-tcp-client = { path = "../common/client-libs/multi-tcp-client" }
1613
pretty_env_logger = "0.3"
14+
rand = "0.7.2"
15+
serde = { version = "1.0.104", features = ["derive"] }
16+
#serde_json = "1.0.44"
17+
sled = "0.31"
1718
tokio = { version = "0.2", features = ["full"] }
1819
tokio-tungstenite = "0.10.1"
1920

2021
# internal
22+
crypto = { path = "../common/crypto" }
23+
mix-client = { path = "../common/client-libs/mix-client" } # to be removed very soon
24+
multi-tcp-client = { path = "../common/client-libs/multi-tcp-client" }
2125
nymsphinx = { path = "../common/nymsphinx" }
2226

2327

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use crate::client_handling::ledger::ClientLedger;
2+
use crate::client_handling::websocket::message_receiver::MixMessageSender;
3+
use futures::channel::{mpsc, oneshot};
4+
use futures::StreamExt;
5+
use log::*;
6+
use nymsphinx::DestinationAddressBytes;
7+
use std::collections::HashMap;
8+
use tokio::task::JoinHandle;
9+
10+
type temp_AuthToken = String;
11+
12+
pub(crate) type ClientsHandlerRequestSender = mpsc::UnboundedSender<ClientsHandlerRequest>;
13+
pub(crate) type ClientsHandlerRequestReceiver = mpsc::UnboundedReceiver<ClientsHandlerRequest>;
14+
15+
pub(crate) type ClientsHandlerResponseSender = oneshot::Sender<ClientsHandlerResponse>;
16+
pub(crate) type ClientsHandlerResponseReceiver = oneshot::Receiver<ClientsHandlerResponse>;
17+
18+
pub(crate) enum ClientsHandlerRequest {
19+
// client
20+
Register(
21+
DestinationAddressBytes,
22+
MixMessageSender,
23+
ClientsHandlerResponseSender,
24+
),
25+
Authenticate(
26+
temp_AuthToken,
27+
MixMessageSender,
28+
ClientsHandlerResponseSender,
29+
),
30+
31+
// mix
32+
IsOnline(DestinationAddressBytes, ClientsHandlerResponseSender),
33+
}
34+
35+
pub(crate) enum ClientsHandlerResponse {
36+
Register(Option<temp_AuthToken>),
37+
Authenticate(bool),
38+
IsOnline(Option<MixMessageSender>),
39+
}
40+
41+
pub(crate) struct ClientsHandler {
42+
open_connections: HashMap<DestinationAddressBytes, MixMessageSender>, // clients_ledger: unimplemented!(),
43+
clients_ledger: ClientLedger,
44+
request_receiver_channel: ClientsHandlerRequestReceiver,
45+
}
46+
47+
impl ClientsHandler {
48+
pub(crate) fn new(request_receiver_channel: ClientsHandlerRequestReceiver) -> Self {
49+
ClientsHandler {
50+
open_connections: HashMap::new(),
51+
request_receiver_channel,
52+
clients_ledger: unimplemented!(),
53+
}
54+
}
55+
56+
pub(crate) async fn run(&mut self) {
57+
while let Some(request) = self.request_receiver_channel.next().await {
58+
// handle request
59+
}
60+
}
61+
62+
pub(crate) fn start(mut self) -> JoinHandle<()> {
63+
tokio::spawn(async move { self.run().await })
64+
}
65+
}

gateway/src/client_handling/ledger.rs

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Copyright 2020 Nym Technologies SA
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//use directory_client::presence::providers::MixProviderClient;
16+
//use log::*;
17+
//use nymsphinx::{DestinationAddressBytes, DESTINATION_ADDRESS_LENGTH};
18+
//use sfw_provider_requests::auth_token::{AuthToken, AUTH_TOKEN_SIZE};
19+
//use std::path::PathBuf;
20+
21+
#[derive(Debug)]
22+
pub(crate) enum ClientLedgerError {
23+
DbReadError(sled::Error),
24+
DbWriteError(sled::Error),
25+
DbOpenError(sled::Error),
26+
}
27+
28+
#[derive(Debug, Clone)]
29+
// Note: you should NEVER create more than a single instance of this using 'new()'.
30+
// You should always use .clone() to create additional instances
31+
pub(crate) struct ClientLedger {
32+
db: sled::Db,
33+
}
34+
//
35+
//impl ClientLedger {
36+
// pub(crate) fn load(file: PathBuf) -> Result<Self, ClientLedgerError> {
37+
// let db = match sled::open(file) {
38+
// Err(e) => return Err(ClientLedgerError::DbOpenError(e)),
39+
// Ok(db) => db,
40+
// };
41+
//
42+
// let ledger = ClientLedger { db };
43+
//
44+
// ledger.db.iter().keys().for_each(|key| {
45+
// println!(
46+
// "key: {:?}",
47+
// ledger
48+
// .read_destination_address_bytes(key.unwrap())
49+
// .to_base58_string()
50+
// );
51+
// });
52+
//
53+
// Ok(ledger)
54+
// }
55+
//
56+
// fn read_auth_token(&self, raw_token: sled::IVec) -> AuthToken {
57+
// let token_bytes_ref = raw_token.as_ref();
58+
// // if this fails it means we have some database corruption and we
59+
// // absolutely can't continue
60+
// if token_bytes_ref.len() != AUTH_TOKEN_SIZE {
61+
// error!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH");
62+
// panic!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH");
63+
// }
64+
//
65+
// let mut token_bytes = [0u8; AUTH_TOKEN_SIZE];
66+
// token_bytes.copy_from_slice(token_bytes_ref);
67+
// AuthToken::from_bytes(token_bytes)
68+
// }
69+
//
70+
// fn read_destination_address_bytes(
71+
// &self,
72+
// raw_destination: sled::IVec,
73+
// ) -> DestinationAddressBytes {
74+
// let destination_ref = raw_destination.as_ref();
75+
// // if this fails it means we have some database corruption and we
76+
// // absolutely can't continue
77+
// if destination_ref.len() != DESTINATION_ADDRESS_LENGTH {
78+
// error!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH");
79+
// panic!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH");
80+
// }
81+
//
82+
// let mut destination_bytes = [0u8; DESTINATION_ADDRESS_LENGTH];
83+
// destination_bytes.copy_from_slice(destination_ref);
84+
// DestinationAddressBytes::from_bytes(destination_bytes)
85+
// }
86+
//
87+
// pub(crate) fn verify_token(
88+
// &self,
89+
// auth_token: &AuthToken,
90+
// client_address: &DestinationAddressBytes,
91+
// ) -> Result<bool, ClientLedgerError> {
92+
// match self.db.get(&client_address.to_bytes()) {
93+
// Err(e) => Err(ClientLedgerError::DbReadError(e)),
94+
// Ok(token) => match token {
95+
// Some(token_ivec) => Ok(&self.read_auth_token(token_ivec) == auth_token),
96+
// None => Ok(false),
97+
// },
98+
// }
99+
// }
100+
//
101+
// pub(crate) fn insert_token(
102+
// &mut self,
103+
// auth_token: AuthToken,
104+
// client_address: DestinationAddressBytes,
105+
// ) -> Result<Option<AuthToken>, ClientLedgerError> {
106+
// let insertion_result = match self
107+
// .db
108+
// .insert(&client_address.to_bytes(), &auth_token.to_bytes())
109+
// {
110+
// Err(e) => Err(ClientLedgerError::DbWriteError(e)),
111+
// Ok(existing_token) => {
112+
// Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token)))
113+
// }
114+
// };
115+
//
116+
// // registration doesn't happen that often so might as well flush it to the disk to be sure
117+
// self.db.flush().unwrap();
118+
// insertion_result
119+
// }
120+
//
121+
// pub(crate) fn remove_token(
122+
// &mut self,
123+
// client_address: &DestinationAddressBytes,
124+
// ) -> Result<Option<AuthToken>, ClientLedgerError> {
125+
// let removal_result = match self.db.remove(&client_address.to_bytes()) {
126+
// Err(e) => Err(ClientLedgerError::DbWriteError(e)),
127+
// Ok(existing_token) => {
128+
// Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token)))
129+
// }
130+
// };
131+
//
132+
// // removing of tokens happens extremely rarely, so flush is also fine here
133+
// self.db.flush().unwrap();
134+
// removal_result
135+
// }
136+
//
137+
// pub(crate) fn current_clients(&self) -> Result<Vec<MixProviderClient>, ClientLedgerError> {
138+
// let clients = self.db.iter().keys();
139+
//
140+
// let mut client_vec = Vec::new();
141+
// for client in clients {
142+
// match client {
143+
// Err(e) => return Err(ClientLedgerError::DbWriteError(e)),
144+
// Ok(client_entry) => client_vec.push(MixProviderClient {
145+
// pub_key: self
146+
// .read_destination_address_bytes(client_entry)
147+
// .to_base58_string(),
148+
// }),
149+
// }
150+
// }
151+
//
152+
// Ok(client_vec)
153+
// }
154+
//
155+
// #[cfg(test)]
156+
// pub(crate) fn create_temporary() -> Self {
157+
// let cfg = sled::Config::new().temporary(true);
158+
// ClientLedger {
159+
// db: cfg.open().unwrap(),
160+
// }
161+
// }
162+
//}

gateway/src/client_handling/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(crate) mod clients_handler;
2+
pub(crate) mod ledger;
3+
pub(crate) mod websocket;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use crate::client_handling::clients_handler::ClientsHandlerRequestSender;
2+
use log::*;
3+
use tokio::{prelude::*, stream::StreamExt};
4+
use tokio_tungstenite::{
5+
tungstenite::{protocol::Message, Error as WsError},
6+
WebSocketStream,
7+
};
8+
9+
enum SocketStream<S: AsyncRead + AsyncWrite + Unpin> {
10+
RawTCP(S),
11+
UpgradedWebSocket(WebSocketStream<S>),
12+
Invalid,
13+
}
14+
15+
pub(crate) struct Handle<S: AsyncRead + AsyncWrite + Unpin> {
16+
socket_connection: SocketStream<S>,
17+
clients_handler_sender: ClientsHandlerRequestSender,
18+
}
19+
20+
impl<S> Handle<S>
21+
where
22+
S: AsyncRead + AsyncWrite + Unpin,
23+
{
24+
// for time being we assume handle is always constructed from raw socket.
25+
// if we decide we want to change it, that's not too difficult
26+
pub(crate) fn new(conn: S, clients_handler_sender: ClientsHandlerRequestSender) -> Self {
27+
Handle {
28+
socket_connection: SocketStream::RawTCP(conn),
29+
clients_handler_sender,
30+
}
31+
}
32+
33+
async fn perform_websocket_handshake(&mut self) {
34+
self.socket_connection =
35+
match std::mem::replace(&mut self.socket_connection, SocketStream::Invalid) {
36+
SocketStream::RawTCP(conn) => {
37+
// TODO: perhaps in the future, rather than panic here (and uncleanly shut tcp stream)
38+
// return a result with an error?
39+
let ws_stream = tokio_tungstenite::accept_async(conn)
40+
.await
41+
.expect("Failed to perform websocket handshake");
42+
SocketStream::UpgradedWebSocket(ws_stream)
43+
}
44+
other => other,
45+
}
46+
}
47+
48+
async fn next_websocket_request(&mut self) -> Option<Result<Message, WsError>> {
49+
match self.socket_connection {
50+
SocketStream::UpgradedWebSocket(ref mut ws_stream) => ws_stream.next().await,
51+
_ => panic!("impossible state - websocket handshake was somehow reverted"),
52+
}
53+
}
54+
55+
async fn listen_for_requests(&mut self) {
56+
trace!("Started listening for incoming requests...");
57+
58+
while let Some(msg) = self.next_websocket_request().await {
59+
// start handling here
60+
// let msg = msg?;
61+
// if msg.is_binary() {
62+
// mixnet_client::forward_to_mixnode(msg.into_data(), Arc::clone(&client_ref)).await;
63+
// }
64+
}
65+
66+
trace!("The stream was closed!");
67+
}
68+
69+
pub(crate) async fn start_handling(&mut self) {
70+
self.perform_websocket_handshake().await;
71+
trace!("Managed to perform websocket handshake!");
72+
self.listen_for_requests().await;
73+
}
74+
}

0 commit comments

Comments
 (0)