Skip to content

Commit

Permalink
lightclient: Separate wakes and check connectivity on poll_read
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Nov 24, 2023
1 parent 860f0e0 commit 8660940
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions lightclient/src/platform/wasm_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ struct InnerWasmSocket {
state: ConnectionState,
/// Data buffer for the socket.
data: VecDeque<u8>,
/// Waker from `poll_read` / `poll_write`.
waker: Option<Waker>,

/// Waker from `poll_read` when the socket is not connected yet.
open_waker: Option<Waker>,
/// Waker from `poll_read`.
read_waker: Option<Waker>,
/// Waker from `poll_write` and `poll_flush`.
write_waker: Option<Waker>,
/// Waker from `poll_close`.
close_waker: Option<Waker>,
}

/// Registered callbacks of the [`WasmSocket`].
Expand Down Expand Up @@ -86,7 +93,10 @@ impl WasmSocket {
let inner = Arc::new(Mutex::new(InnerWasmSocket {
state: ConnectionState::Connecting,
data: VecDeque::with_capacity(16384),
waker: None,
open_waker: None,
read_waker: None,
write_waker: None,
close_waker: None,
}));

let open_callback = Closure::once_into_js({
Expand All @@ -95,7 +105,7 @@ impl WasmSocket {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Opened;

if let Some(waker) = inner.waker.take() {
if let Some(waker) = inner.open_waker.take() {
waker.wake();
}
}
Expand All @@ -113,7 +123,7 @@ impl WasmSocket {
let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
inner.data.extend(bytes.into_iter());

if let Some(waker) = inner.waker.take() {
if let Some(waker) = inner.read_waker.take() {
waker.wake();
}
}
Expand All @@ -126,10 +136,6 @@ impl WasmSocket {
// Callback does not provide useful information, signal it back to the stream.
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Error;

if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
Expand All @@ -140,7 +146,7 @@ impl WasmSocket {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Closed;

if let Some(waker) = inner.waker.take() {
if let Some(waker) = inner.close_waker.take() {
waker.wake();
}
}
Expand Down Expand Up @@ -169,8 +175,17 @@ impl AsyncRead for WasmSocket {
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
inner.waker = Some(cx.waker().clone());

// Check if the socket is ready for reading.
let state = self.socket.ready_state();
if state == web_sys::WebSocket::CLOSED || state == web_sys::WebSocket::CLOSING {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
} else if state == web_sys::WebSocket::CONNECTING {
inner.open_waker = Some(cx.waker().clone());
return Poll::Pending;
}

inner.read_waker = Some(cx.waker().clone());
match inner.state {
ConnectionState::Error => {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error")))
Expand Down Expand Up @@ -199,7 +214,7 @@ impl AsyncWrite for WasmSocket {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
inner.waker = Some(cx.waker().clone());
inner.write_waker = Some(cx.waker().clone());

match inner.state {
ConnectionState::Error => {
Expand All @@ -221,8 +236,18 @@ impl AsyncWrite for WasmSocket {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if self.socket.ready_state() == web_sys::WebSocket::CLOSED {
return Poll::Ready(Ok(()));
}

if self.socket.ready_state() != web_sys::WebSocket::CLOSING {
let _ = self.socket.close();
}

let mut inner = self.inner.lock().expect("Mutex is poised; qed");
inner.close_waker = Some(cx.waker().clone());
Poll::Pending
}
}

Expand Down

0 comments on commit 8660940

Please sign in to comment.