-
Notifications
You must be signed in to change notification settings - Fork 136
/
Copy pathrecvmsg.rs
97 lines (85 loc) · 2.96 KB
/
recvmsg.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::{buf::BoundedBufMut, io::SharedFd, BufResult};
use socket2::SockAddr;
use std::{
io::IoSliceMut,
{boxed::Box, io, net::SocketAddr},
};
pub(crate) struct RecvMsg<T> {
#[allow(dead_code)]
fd: SharedFd,
pub(crate) buf: Vec<T>,
#[allow(dead_code)]
io_slices: Vec<IoSliceMut<'static>>,
pub(crate) socket_addr: Box<SockAddr>,
pub(crate) msghdr: Box<libc::msghdr>,
}
impl<T: BoundedBufMut> Op<RecvMsg<T>> {
pub(crate) fn recvmsg(fd: &SharedFd, mut bufs: Vec<T>) -> io::Result<Op<RecvMsg<T>>> {
use io_uring::{opcode, types};
let mut io_slices = Vec::with_capacity(bufs.len());
for buf in &mut bufs {
io_slices.push(IoSliceMut::new(unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
}));
}
let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 });
let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
msghdr.msg_iov = io_slices.as_mut_ptr().cast();
msghdr.msg_iovlen = io_slices.len() as _;
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();
CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
RecvMsg {
fd: fd.clone(),
buf: bufs,
io_slices,
socket_addr,
msghdr,
},
|recv_from| {
opcode::RecvMsg::new(
types::Fd(recv_from.fd.raw_fd()),
recv_from.msghdr.as_mut() as *mut _,
)
.build()
},
)
})
}
}
impl<T> Completable for RecvMsg<T>
where
T: BoundedBufMut,
{
type Output = BufResult<(usize, SocketAddr), Vec<T>>;
fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffers
let mut bufs = self.buf;
let socket_addr = (*self.socket_addr).as_socket();
let res = res.map(|n| {
let socket_addr: SocketAddr = socket_addr.unwrap();
let mut bytes = n;
for buf in &mut bufs {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf.set_init(bytes);
}
let total = buf.bytes_total();
if bytes > total {
bytes -= total;
} else {
// In the current API bytes_init is a watermark,
// so remaining don't need zeroing.
break;
}
}
(n, socket_addr)
});
(res, bufs)
}
}