Skip to content

Commit 4afcb31

Browse files
committed
Fix: changed debug lines scope to fit debug macro and fixed blocking mutex lock issue
Signed-off-by: Aditya <aditya.salunkh919@gmail.com>
1 parent feaa304 commit 4afcb31

File tree

7 files changed

+263
-96
lines changed

7 files changed

+263
-96
lines changed

sqlx-core/src/pool/inner.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::database::Database;
55
use crate::error::Error;
66
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
77
use crossbeam_queue::ArrayQueue;
8+
use log::debug;
89

910
use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};
1011

@@ -347,19 +348,14 @@ impl<DB: Database> PoolInner<DB> {
347348

348349
// result here is `Result<Result<C, Error>, TimeoutError>`
349350
// if this block does not return, sleep for the backoff timeout and try again
350-
eprintln!(
351-
"pool: attempting connect (deadline in {}ms, current size={})",
352-
timeout.as_millis(),
353-
self.size()
354-
);
355351

356352
let res = crate::rt::timeout(timeout, connect_options.connect()).await;
357353
if let Ok(Ok(_)) = &res {
358-
eprintln!("pool: connect attempt succeeded");
354+
debug!("pool: connect attempt succeeded");
359355
} else if let Ok(Err(e)) = &res {
360-
eprintln!("pool: connect attempt returned error: {:?}", e);
356+
debug!("pool: connect attempt returned error: {:?}", e);
361357
} else if res.is_err() {
362-
eprintln!(
358+
debug!(
363359
"pool: connect attempt timed out after {}ms",
364360
timeout.as_millis()
365361
);

sqlx-core/src/rt/rt_wasip3/mod.rs

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ pub async fn connect_tcp<Ws: WithSocket>(
138138
// })
139139
// }
140140
// };
141-
eprintln!("wasip3: creating tcp socket for port {}", port);
141+
debug!("wasip3: creating tcp socket for port {}", port);
142142
let sock =
143143
wasip3::sockets::types::TcpSocket::create(wasip3::sockets::types::IpAddressFamily::Ipv4)
144144
.expect("failed to create TCP socket");
145-
eprintln!("wasip3: created tcp socket for port {}", port);
145+
debug!("wasip3: created tcp socket for port {}", port);
146146
sock.connect(wasip3::sockets::types::IpSocketAddress::Ipv4(
147147
wasip3::sockets::types::Ipv4SocketAddress {
148148
address: (127, 0, 0, 1),
@@ -151,7 +151,7 @@ pub async fn connect_tcp<Ws: WithSocket>(
151151
))
152152
.await
153153
.map_err(|e| {
154-
eprintln!("wasip3: connect failed: {:?}", e);
154+
debug!("wasip3: connect failed: {:?}", e);
155155
e
156156
})
157157
.expect(&format!("failed to connect to 127.0.0.1:{port}"));
@@ -160,57 +160,110 @@ pub async fn connect_tcp<Ws: WithSocket>(
160160
let (rx_tx, rx_rx) = mpsc::channel::<Vec<u8>>(1);
161161
let (tx_tx, mut tx_rx) = mpsc::channel::<Vec<u8>>(1);
162162
let (mut send_tx, send_rx) = wasip3::wit_stream::new();
163-
eprintln!("wasip3: created wit_stream for send/recv");
163+
debug!("wasip3: created wit_stream for send/recv");
164164
let (mut recv_rx, recv_fut) = sock.receive();
165165

166166
// Spawn a background task using the wasip3 async runtime and make it abortable.
167167
let (abort_handle, abort_registration) = AbortHandle::new_pair();
168-
168+
// Give the wasip3 scheduler a quick yield before spawning the background
169+
// task. Use the host-aware `yield_async` so spawned tasks are eligible to
170+
// be polled promptly by the local runtime.
171+
async_support::yield_async().await;
169172
let background = Abortable::new(
170173
async move {
171174
let sock = Arc::new(sock);
172-
eprintln!("wasip3: background task starting; sock arc cloned");
175+
debug!("wasip3: background task starting; sock arc cloned");
173176

174177
let (ready_tx, ready_rx) = oneshot::channel();
178+
let spawn_ts = std::time::SystemTime::now()
179+
.duration_since(std::time::UNIX_EPOCH)
180+
.map(|d| d.as_millis())
181+
.unwrap_or_default();
182+
debug!("wasip3: spawning sock.send task at {}ms", spawn_ts);
183+
175184
async_support::spawn({
176185
let sock = Arc::clone(&sock);
177186
async move {
178-
eprintln!("wasip3: starting sock.send task");
187+
let start_ts = std::time::SystemTime::now()
188+
.duration_since(std::time::UNIX_EPOCH)
189+
.map(|d| d.as_millis())
190+
.unwrap_or_default();
191+
debug!("wasip3: sock.send task started at {}ms", start_ts);
179192
let fut = sock.send(send_rx);
193+
let sig_ts = std::time::SystemTime::now()
194+
.duration_since(std::time::UNIX_EPOCH)
195+
.map(|d| d.as_millis())
196+
.unwrap_or_default();
180197
_ = ready_tx.send(());
198+
debug!("wasip3: sock.send signalled ready at {}ms", sig_ts);
181199
match fut.await {
182-
Ok(_) => eprintln!("wasip3: sock.send completed"),
183-
Err(e) => eprintln!("wasip3: sock.send error: {:?}", e),
200+
Ok(_) => {
201+
let done_ts = std::time::SystemTime::now()
202+
.duration_since(std::time::UNIX_EPOCH)
203+
.map(|d| d.as_millis())
204+
.unwrap_or_default();
205+
debug!("wasip3: sock.send completed at {}ms", done_ts);
206+
}
207+
Err(e) => {
208+
let err_ts = std::time::SystemTime::now()
209+
.duration_since(std::time::UNIX_EPOCH)
210+
.map(|d| d.as_millis())
211+
.unwrap_or_default();
212+
debug!("wasip3: sock.send error at {}ms: {:?}", err_ts, e);
213+
}
184214
}
185215
drop(sock);
186216
}
187217
});
218+
// Yield after spawning the send task so the runtime can poll it.
219+
async_support::yield_async().await;
188220
async_support::spawn({
189221
let sock = Arc::clone(&sock);
190222
async move {
191-
eprintln!("wasip3: starting recv_fut task");
223+
let start_ts = std::time::SystemTime::now()
224+
.duration_since(std::time::UNIX_EPOCH)
225+
.map(|d| d.as_millis())
226+
.unwrap_or_default();
227+
debug!("wasip3: recv_fut task started at {}ms", start_ts);
192228
match recv_fut.await {
193-
Ok(_) => eprintln!("wasip3: recv_fut completed"),
194-
Err(e) => eprintln!("wasip3: recv_fut error: {:?}", e),
229+
Ok(_) => {
230+
let done_ts = std::time::SystemTime::now()
231+
.duration_since(std::time::UNIX_EPOCH)
232+
.map(|d| d.as_millis())
233+
.unwrap_or_default();
234+
debug!("wasip3: recv_fut completed at {}ms", done_ts);
235+
}
236+
Err(e) => {
237+
let err_ts = std::time::SystemTime::now()
238+
.duration_since(std::time::UNIX_EPOCH)
239+
.map(|d| d.as_millis())
240+
.unwrap_or_default();
241+
debug!("wasip3: recv_fut error at {}ms: {:?}", err_ts, e);
242+
}
195243
}
196244
drop(sock);
197245
}
198246
});
247+
// Yield to the wasip3 scheduler to give the spawned tasks a chance
248+
// to be polled immediately. Without this yield the local runtime
249+
// may not poll newly spawned tasks until the current task yields,
250+
// which can cause head-of-line blocking observed during handshakes.
251+
async_support::yield_async().await;
199252
futures_util::join!(
200253
async {
201254
while let Some(result) = recv_rx.next().await {
202255
// `recv_rx` yields single bytes from the wasip3 receive stream.
203-
eprintln!("wasip3: recv_rx.next yielded byte: {:#x}", result);
256+
debug!("wasip3: recv_rx.next yielded byte: {:#x}", result);
204257
_ = rx_tx.send(vec![result]).await;
205258
}
206259
drop(recv_rx);
207260
drop(rx_tx);
208261
},
209262
async {
210263
_ = ready_rx.await;
211-
eprintln!("wasip3: send task ready, draining tx_rx -> send_tx");
264+
debug!("wasip3: send task ready, draining tx_rx -> send_tx");
212265
while let Some(buf) = tx_rx.recv().await {
213-
eprintln!("wasip3: writing {} bytes to send_tx", buf.len());
266+
debug!("wasip3: writing {} bytes to send_tx", buf.len());
214267
let _ = send_tx.write(buf).await;
215268
}
216269
drop(tx_rx);

sqlx-core/src/rt/rt_wasip3/socket.rs

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,75 +10,84 @@ use crate::net::Socket;
1010
impl Socket for super::TcpSocket {
1111
fn try_read(&mut self, buf: &mut dyn ReadBuf) -> io::Result<usize> {
1212
let n = buf.remaining_mut();
13+
14+
// First, drain any buffered data
1315
if !self.buf.is_empty() {
14-
if self.buf.len() >= n {
15-
buf.put_slice(&self.buf.split_to(n));
16-
} else {
17-
buf.put_slice(&self.buf);
18-
self.buf.clear();
19-
}
20-
return Ok(n);
16+
let to_copy = n.min(self.buf.len());
17+
buf.put_slice(&self.buf.split_to(to_copy));
18+
return Ok(to_copy);
2119
}
20+
21+
// Try to receive new data
2222
match self.rx.try_recv() {
2323
Ok(rx_vec) => {
24-
eprintln!("wasip3 socket: try_read got {} bytes from rx", rx_vec.len());
25-
// make the item type explicit so methods like `len` and `split_off` are known
26-
let mut rx: Vec<u8> = rx_vec;
27-
if rx.len() < n {
28-
buf.put_slice(&rx);
29-
Ok(rx.len())
24+
if rx_vec.is_empty() {
25+
return Err(io::ErrorKind::WouldBlock.into());
26+
}
27+
28+
if rx_vec.len() <= n {
29+
// All data fits in the buffer
30+
buf.put_slice(&rx_vec);
31+
Ok(rx_vec.len())
3032
} else {
31-
let tail = rx.split_off(n);
32-
buf.put_slice(&rx);
33-
self.buf.extend_from_slice(&tail);
33+
// Data is larger than buffer, store remainder
34+
buf.put_slice(&rx_vec[..n]);
35+
self.buf.extend_from_slice(&rx_vec[n..]);
3436
Ok(n)
3537
}
3638
}
37-
Err(TryRecvError::Empty) => {
38-
eprintln!("wasip3 socket: try_read would block (Empty)");
39-
Err(io::ErrorKind::WouldBlock.into())
40-
}
39+
Err(TryRecvError::Empty) => Err(io::ErrorKind::WouldBlock.into()),
4140
Err(TryRecvError::Disconnected) => Ok(0),
4241
}
4342
}
4443

4544
fn try_write(&mut self, buf: &[u8]) -> io::Result<usize> {
46-
let Some(tx) = self.tx.get_ref() else {
47-
return Err(io::ErrorKind::ConnectionReset.into());
48-
};
45+
if buf.is_empty() {
46+
return Ok(0);
47+
}
48+
4949
let n = buf.len();
50-
match tx.try_send(buf.to_vec()) {
51-
Ok(()) => {
52-
eprintln!("wasip3 socket: try_write sent {} bytes", n);
53-
Ok(n)
54-
}
55-
Err(e) => {
56-
eprintln!("wasip3 socket: try_write failed: {:?}", e);
57-
Err(io::ErrorKind::WouldBlock.into())
58-
}
50+
match self.tx.try_send(buf.to_vec()) {
51+
Ok(()) => Ok(n),
52+
Err(_) => Err(io::ErrorKind::WouldBlock.into()),
5953
}
6054
}
6155

6256
fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
57+
// If we have buffered data, we're ready to read
58+
if !self.buf.is_empty() {
59+
return Poll::Ready(Ok(()));
60+
}
61+
6362
match self.rx.poll_recv(cx) {
6463
Poll::Ready(Some(v)) => {
65-
self.buf.extend(v);
64+
if !v.is_empty() {
65+
self.buf.extend(v);
66+
Poll::Ready(Ok(()))
67+
} else {
68+
// Empty vec received, wait for more
69+
Poll::Pending
70+
}
71+
}
72+
Poll::Ready(None) => {
73+
// Channel closed
6674
Poll::Ready(Ok(()))
6775
}
68-
Poll::Ready(None) => Poll::Ready(Ok(())),
6976
Poll::Pending => Poll::Pending,
7077
}
7178
}
7279

7380
fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
7481
match self.tx.poll_reserve(cx) {
7582
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
76-
Poll::Ready(Err(..)) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())),
83+
Poll::Ready(Err(())) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())),
7784
Poll::Pending => Poll::Pending,
7885
}
7986
}
8087

8188
fn poll_shutdown(&mut self, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
89+
// Drop the sender to signal shutdown
90+
// The abort_handle will be dropped when TcpSocket is dropped
8291
Poll::Ready(Ok(()))
8392
}
8493
}

0 commit comments

Comments
 (0)