Skip to content

Commit

Permalink
Identify connections to gRPC server with Uuids
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Sep 26, 2023
1 parent 405cc14 commit 6652208
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ parking_lot.workspace = true
log.workspace = true
rand.workspace = true
once_cell.workspace = true
uuid.workspace = true
async-trait.workspace = true
futures.workspace = true

async-trait = "0.1.57"
futures = { version = "0.3" }
tonic = { version = "0.9", features = ["gzip"] }
prost = { version = "0.11" }
h2 = "0.3"
Expand Down
19 changes: 15 additions & 4 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ use tokio::sync::{
oneshot::{channel as oneshot_channel, Sender as OneshotSender},
};
use tonic::Streaming;
use uuid::Uuid;

pub type GrpcSender = MpscSender<StatusResult<KaspadResponse>>;
pub type StatusResult<T> = Result<T, tonic::Status>;
pub type ConnectionId = Uuid;

#[derive(Debug)]
struct Inner {
connection_id: ConnectionId,

/// The socket address of this client
net_address: SocketAddr,

Expand All @@ -52,12 +56,13 @@ pub struct Connection {

impl Display for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner.net_address)
write!(f, "{}@{}", self.inner.connection_id, self.inner.net_address)
}
}

impl Connection {
pub fn new(
connection_id: ConnectionId,
net_address: SocketAddr,
core_service: DynRpcService,
manager: Manager,
Expand All @@ -67,7 +72,13 @@ impl Connection {
) -> Self {
let (shutdown_sender, mut shutdown_receiver) = oneshot_channel();
let connection = Self {
inner: Arc::new(Inner { net_address, outgoing_route, manager, shutdown_signal: Mutex::new(Some(shutdown_sender)) }),
inner: Arc::new(Inner {
connection_id,
net_address,
outgoing_route,
manager,
shutdown_signal: Mutex::new(Some(shutdown_sender)),
}),
};
let connection_clone = connection.clone();
let outgoing_route = connection.inner.outgoing_route.clone();
Expand Down Expand Up @@ -150,8 +161,8 @@ impl Connection {
self.inner.net_address
}

pub fn identity(&self) -> SocketAddr {
self.inner.net_address
pub fn identity(&self) -> ConnectionId {
self.inner.connection_id
}

async fn handle_request(request: KaspadRequest, core_service: &DynRpcService) -> GrpcServerResult<KaspadResponse> {
Expand Down
2 changes: 2 additions & 0 deletions rpc/grpc/server/src/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::sync::mpsc::channel as mpsc_channel;
use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{codec::CompressionEncoding, transport::Server as TonicServer, Request, Response};
use uuid::Uuid;

/// A protowire gRPC connections handler.
pub struct ConnectionHandler {
Expand Down Expand Up @@ -151,6 +152,7 @@ impl Rpc for ConnectionHandler {

// Build the connection object & register it
let connection = Connection::new(
Uuid::new_v4(),
remote_address,
self.core_service.clone(),
self.manager.clone(),
Expand Down
9 changes: 2 additions & 7 deletions rpc/grpc/server/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::connection::Connection;
use crate::connection::{Connection, ConnectionId};
use kaspa_core::{debug, info, warn};
use kaspa_notify::connection::Connection as ConnectionT;
use parking_lot::RwLock;
Expand All @@ -10,7 +10,7 @@ use std::{

#[derive(Clone, Debug)]
pub struct Manager {
connections: Arc<RwLock<HashMap<SocketAddr, Connection>>>,
connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
max_connections: usize,
}

Expand Down Expand Up @@ -66,9 +66,4 @@ impl Manager {
pub fn has_connections(&self) -> bool {
!self.connections.read().is_empty()
}

/// Returns whether a connection matching `net_address` is registered
pub fn has_connection(&self, net_address: SocketAddr) -> bool {
self.connections.read().contains_key(&net_address)
}
}

0 comments on commit 6652208

Please sign in to comment.