diff --git a/engineioxide/src/socket.rs b/engineioxide/src/socket.rs index bf765bfc..4b2ca6cc 100644 --- a/engineioxide/src/socket.rs +++ b/engineioxide/src/socket.rs @@ -399,12 +399,15 @@ impl Socket where D: Default + Send + Sync + 'static, { - pub fn new_dummy(close_fn: Box) -> Socket { + pub fn new_dummy( + sid: Sid, + close_fn: Box, + ) -> Socket { let (internal_tx, internal_rx) = mpsc::channel(200); let (heartbeat_tx, heartbeat_rx) = mpsc::channel(1); Self { - id: Sid::new(), + id: sid, protocol: ProtocolVersion::V4, transport: AtomicU8::new(TransportType::Websocket as u8), diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index 153b59e8..39e5550a 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -60,13 +60,23 @@ impl Client { auth: Option, ns_path: String, esocket: &Arc>, - ) -> Result<(), Error> { + ) -> Result<(), serde_json::Error> { debug!("auth: {:?}", auth); let sid = esocket.id; if let Some(ns) = self.get_ns(&ns_path) { - ns.connect(sid, esocket.clone(), auth, self.config.clone())?; + let protocol: ProtocolVersion = esocket.protocol.into(); // cancel the connect timeout task for v5 + #[cfg(feature = "v5")] + if let Some(tx) = esocket.data.connect_recv_tx.lock().unwrap().take() { + tx.send(()).unwrap(); + } + + let connect_packet = Packet::connect(ns_path, sid, protocol); + if let Err(err) = esocket.emit(connect_packet.try_into()?) { + debug!("sending error during socket connection: {err:?}"); + } + ns.connect(sid, esocket.clone(), auth, self.config.clone())?; if let Some(tx) = esocket.data.connect_recv_tx.lock().unwrap().take() { tx.send(()).unwrap(); } @@ -98,9 +108,12 @@ impl Client { /// Propagate a packet to a its target namespace fn sock_propagate_packet(&self, packet: Packet, sid: Sid) -> Result<(), Error> { - self.get_ns(&packet.ns) - .ok_or(Error::InvalidNamespace(packet.ns))? - .recv(sid, packet.inner) + if let Some(ns) = self.get_ns(&packet.ns) { + ns.recv(sid, packet.inner) + } else { + debug!("invalid namespace requested: {}", packet.ns); + Ok(()) + } } /// Spawn a task that will close the socket if it is not connected to a namespace diff --git a/socketioxide/src/errors.rs b/socketioxide/src/errors.rs index 389b97d0..333e1eb8 100644 --- a/socketioxide/src/errors.rs +++ b/socketioxide/src/errors.rs @@ -19,14 +19,11 @@ pub enum Error { InvalidEventName, #[error("invalid namespace")] - InvalidNamespace(String), + InvalidNamespace, #[error("cannot find socketio socket")] SocketGone(Sid), - #[error("send error: {0}")] - SendError(#[from] SendError), - /// An engineio error #[error("engineio error: {0}")] EngineIoError(#[from] engineioxide::errors::Error), @@ -47,7 +44,7 @@ impl From<&Error> for Option { Error::SerializeError(_) | Error::InvalidPacketType | Error::InvalidEventName => { Some(PacketParsingError) } - Error::Adapter(_) | Error::InvalidNamespace(_) | Error::SendError(_) => None, + Error::Adapter(_) | Error::InvalidNamespace => None, } } } diff --git a/socketioxide/src/ns.rs b/socketioxide/src/ns.rs index 9ccc2264..6d06b453 100644 --- a/socketioxide/src/ns.rs +++ b/socketioxide/src/ns.rs @@ -7,9 +7,9 @@ use crate::{ adapter::Adapter, errors::Error, handler::{BoxedNamespaceHandler, CallbackHandler}, - packet::{Packet, PacketData}, + packet::PacketData, socket::Socket, - ProtocolVersion, SocketIoConfig, + SocketIoConfig, }; use crate::{client::SocketData, errors::AdapterError}; use engineioxide::sid::Sid; @@ -46,15 +46,10 @@ impl Namespace { esocket: Arc>, auth: Option, config: Arc, - ) -> Result<(), Error> { - let protocol: ProtocolVersion = esocket.protocol.into(); - let socket: Arc> = Socket::new(self.clone(), esocket, config).into(); + ) -> Result<(), serde_json::Error> { + let socket: Arc> = Socket::new(sid, self.clone(), esocket, config).into(); self.sockets.write().unwrap().insert(sid, socket.clone()); - - socket.send(Packet::connect(self.path.clone(), socket.id, protocol))?; - - self.handler.call(socket, auth)?; - Ok(()) + self.handler.call(socket, auth) } /// Remove a socket from a namespace and propagate the event to the adapter diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index de5db1a8..2f8d2585 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use engineioxide::{sid::Sid, socket::DisconnectReason as EIoDisconnectReason, ProtocolVersion}; +use engineioxide::{sid::Sid, socket::DisconnectReason as EIoDisconnectReason}; use futures::{future::BoxFuture, Future}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; @@ -110,22 +110,18 @@ pub struct Socket { impl Socket { pub(crate) fn new( + sid: Sid, ns: Arc>, esocket: Arc>, config: Arc, ) -> Self { - let id = if esocket.protocol == ProtocolVersion::V3 { - esocket.id - } else { - Sid::new() - }; Self { ns, message_handlers: RwLock::new(HashMap::new()), disconnect_handler: Mutex::new(None), ack_message: Mutex::new(HashMap::new()), ack_counter: AtomicI64::new(0), - id, + id: sid, extensions: Extensions::new(), config, esocket, @@ -587,16 +583,11 @@ impl Debug for Socket { impl Socket { pub fn new_dummy(sid: Sid, ns: Arc>) -> Socket { let close_fn = Box::new(move |_, _| ()); - Socket { - id: sid, + Socket::new( + sid, ns, - ack_counter: AtomicI64::new(0), - ack_message: Mutex::new(HashMap::new()), - message_handlers: RwLock::new(HashMap::new()), - disconnect_handler: Mutex::new(None), - config: Arc::new(SocketIoConfig::default()), - extensions: Extensions::new(), - esocket: engineioxide::Socket::new_dummy(close_fn).into(), - } + engineioxide::Socket::new_dummy(sid, close_fn).into(), + Arc::new(SocketIoConfig::default()), + ) } }