Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(net): rewrite net bench and optimize #254

Merged
merged 8 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions compio-buf/src/io_buf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "allocator_api")]
use std::alloc::Allocator;
use std::mem::MaybeUninit;
use std::{mem::MaybeUninit, rc::Rc, sync::Arc};

use crate::*;

Expand Down Expand Up @@ -126,7 +126,7 @@ unsafe impl<B: IoBuf + ?Sized> IoBuf for &'static mut B {
}

unsafe impl<B: IoBuf + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBuf
for box_alloc!(B, A)
for t_alloc!(Box, B, A)
{
fn as_buf_ptr(&self) -> *const u8 {
(**self).as_buf_ptr()
Expand All @@ -141,7 +141,41 @@ unsafe impl<B: IoBuf + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator +
}
}

unsafe impl<#[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBuf for vec_alloc!(u8, A) {
unsafe impl<B: IoBuf + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBuf
for t_alloc!(Rc, B, A)
{
fn as_buf_ptr(&self) -> *const u8 {
(**self).as_buf_ptr()
}

fn buf_len(&self) -> usize {
(**self).buf_len()
}

fn buf_capacity(&self) -> usize {
(**self).buf_capacity()
}
}

unsafe impl<B: IoBuf + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBuf
for t_alloc!(Arc, B, A)
{
fn as_buf_ptr(&self) -> *const u8 {
(**self).as_buf_ptr()
}

fn buf_len(&self) -> usize {
(**self).buf_len()
}

fn buf_capacity(&self) -> usize {
(**self).buf_capacity()
}
}

unsafe impl<#[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBuf
for t_alloc!(Vec, u8, A)
{
fn as_buf_ptr(&self) -> *const u8 {
self.as_ptr()
}
Expand Down Expand Up @@ -314,15 +348,15 @@ unsafe impl<B: IoBufMut + ?Sized> IoBufMut for &'static mut B {
}

unsafe impl<B: IoBufMut + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBufMut
for box_alloc!(B, A)
for t_alloc!(Box, B, A)
{
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
(**self).as_buf_mut_ptr()
}
}

unsafe impl<#[cfg(feature = "allocator_api")] A: Allocator + 'static> IoBufMut
for vec_alloc!(u8, A)
for t_alloc!(Vec, u8, A)
{
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
Expand Down Expand Up @@ -449,7 +483,7 @@ impl<T: IoBuf, const N: usize> IoVectoredBuf for [T; N] {
}

impl<T: IoBuf, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoVectoredBuf
for vec_alloc!(T, A)
for t_alloc!(Vec, T, A)
{
fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {
self.iter().map(|buf| buf as &dyn IoBuf)
Expand Down Expand Up @@ -540,7 +574,7 @@ impl<T: IoBufMut, const N: usize> IoVectoredBufMut for [T; N] {
}

impl<T: IoBufMut, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoVectoredBufMut
for vec_alloc!(T, A)
for t_alloc!(Vec, T, A)
{
fn as_dyn_mut_bufs(&mut self) -> impl Iterator<Item = &mut dyn IoBufMut> {
self.iter_mut().map(|buf| buf as &mut dyn IoBufMut)
Expand Down Expand Up @@ -599,7 +633,7 @@ impl<T: IoBuf, const N: usize> IoIndexedBuf for [T; N] {
}

impl<T: IoBuf, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoIndexedBuf
for vec_alloc!(T, A)
for t_alloc!(Vec, T, A)
{
fn buf_nth(&self, n: usize) -> Option<&dyn IoBuf> {
self.get(n).map(|b| b as _)
Expand Down Expand Up @@ -638,7 +672,7 @@ impl<T: IoBufMut, const N: usize> IoIndexedBufMut for [T; N] {
}

impl<T: IoBufMut, #[cfg(feature = "allocator_api")] A: Allocator + 'static> IoIndexedBufMut
for vec_alloc!(T, A)
for t_alloc!(Vec, T, A)
{
fn buf_nth_mut(&mut self, n: usize) -> Option<&mut dyn IoBufMut> {
self.get_mut(n).map(|b| b as _)
Expand Down Expand Up @@ -670,14 +704,14 @@ impl<B: SetBufInit + ?Sized> SetBufInit for &'static mut B {
}

impl<B: SetBufInit + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator + 'static> SetBufInit
for box_alloc!(B, A)
for t_alloc!(Box, B, A)
{
unsafe fn set_buf_init(&mut self, len: usize) {
(**self).set_buf_init(len)
}
}

impl<#[cfg(feature = "allocator_api")] A: Allocator + 'static> SetBufInit for vec_alloc!(u8, A) {
impl<#[cfg(feature = "allocator_api")] A: Allocator + 'static> SetBufInit for t_alloc!(Vec, u8, A) {
unsafe fn set_buf_init(&mut self, len: usize) {
if (**self).buf_len() < len {
self.set_len(len);
Expand Down Expand Up @@ -738,7 +772,7 @@ impl<T: IoBufMut, const N: usize> SetBufInit for [T; N] {
}

impl<T: IoBufMut, #[cfg(feature = "allocator_api")] A: Allocator + 'static> SetBufInit
for vec_alloc!(T, A)
for t_alloc!(Vec, T, A)
{
unsafe fn set_buf_init(&mut self, len: usize) {
default_set_buf_init(self.iter_mut(), len)
Expand Down
30 changes: 6 additions & 24 deletions compio-buf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,17 @@ pub trait IntoInner {
#[cfg(not(feature = "allocator_api"))]
#[macro_export]
#[doc(hidden)]
macro_rules! vec_alloc {
($t:ty, $a:ident) => {
Vec<$t>
macro_rules! t_alloc {
($b:tt, $t:ty, $a:ident) => {
$b<$t>
};
}

#[cfg(feature = "allocator_api")]
#[macro_export]
#[doc(hidden)]
macro_rules! vec_alloc {
($t:ty, $a:ident) => {
Vec<$t, $a>
};
}

#[cfg(feature = "allocator_api")]
#[macro_export]
#[doc(hidden)]
macro_rules! box_alloc {
($t:ty, $a:ident) => {
Box<$t, $a>
};
}

#[cfg(not(feature = "allocator_api"))]
#[macro_export]
#[doc(hidden)]
macro_rules! box_alloc {
($t:ty, $a:ident) => {
Box<$t>
macro_rules! t_alloc {
($b:tt, $t:ty, $a:ident) => {
$b<$t, $a>
};
}
3 changes: 1 addition & 2 deletions compio-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ impl Dispatcher {
.build()
.expect("cannot create compio runtime")
.block_on(async move {
let rt = Runtime::current();
while let Ok(f) = receiver.recv_async().await {
let fut = (f)();
if builder.concurrent {
rt.spawn(fut).detach()
compio_runtime::spawn(fut).detach()
George-Miao marked this conversation as resolved.
Show resolved Hide resolved
} else {
fut.await
}
Expand Down
7 changes: 5 additions & 2 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ impl Driver {
}

fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) -> bool {
while let Some(entry) = self.pool_completed.pop() {
entries.extend(Some(entry));
// Cheaper than pop.
if !self.pool_completed.is_empty() {
while let Some(entry) = self.pool_completed.pop() {
entries.extend(Some(entry));
}
}

let mut cqueue = self.inner.completion();
Expand Down
18 changes: 5 additions & 13 deletions compio-fs/src/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use compio_driver::{
AsRawFd, SharedFd, ToSharedFd,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::{Attacher, Runtime};
use compio_runtime::Attacher;
#[cfg(unix)]
use {
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
Expand Down Expand Up @@ -63,22 +63,14 @@ impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
let fd = self.inner.to_shared_fd();
let op = Recv::new(fd, buf);
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
compio_runtime::submit(op).await.into_inner().map_advanced()
}

#[cfg(unix)]
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
let fd = self.inner.to_shared_fd();
let op = RecvVectored::new(fd, buf);
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
compio_runtime::submit(op).await.into_inner().map_advanced()
}
}

Expand Down Expand Up @@ -109,14 +101,14 @@ impl<T: AsRawFd + 'static> AsyncWrite for &AsyncFd<T> {
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
let fd = self.inner.to_shared_fd();
let op = Send::new(fd, buf);
Runtime::current().submit(op).await.into_inner()
compio_runtime::submit(op).await.into_inner()
}

#[cfg(unix)]
async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
let fd = self.inner.to_shared_fd();
let op = SendVectored::new(fd, buf);
Runtime::current().submit(op).await.into_inner()
compio_runtime::submit(op).await.into_inner()
}

async fn flush(&mut self) -> io::Result<()> {
Expand Down
24 changes: 8 additions & 16 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use compio_driver::{
ToSharedFd,
};
use compio_io::{AsyncReadAt, AsyncWriteAt};
use compio_runtime::{Attacher, Runtime};
use compio_runtime::Attacher;
#[cfg(unix)]
use {
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
Expand Down Expand Up @@ -71,7 +71,7 @@ impl File {
.await;
if let Some(fd) = fd {
let op = CloseFile::new(fd.into());
Runtime::current().submit(op).await.0?;
compio_runtime::submit(op).await.0?;
}
Ok(())
}
Expand All @@ -90,7 +90,7 @@ impl File {
#[cfg(unix)]
pub async fn metadata(&self) -> io::Result<Metadata> {
let op = FileStat::new(self.to_shared_fd());
let BufResult(res, op) = Runtime::current().submit(op).await;
let BufResult(res, op) = compio_runtime::submit(op).await;
res.map(|_| Metadata::from_stat(op.into_inner()))
}

Expand Down Expand Up @@ -121,7 +121,7 @@ impl File {

async fn sync_impl(&self, datasync: bool) -> io::Result<()> {
let op = Sync::new(self.to_shared_fd(), datasync);
Runtime::current().submit(op).await.0?;
compio_runtime::submit(op).await.0?;
Ok(())
}

Expand Down Expand Up @@ -153,11 +153,7 @@ impl AsyncReadAt for File {
async fn read_at<T: IoBufMut>(&self, buffer: T, pos: u64) -> BufResult<usize, T> {
let fd = self.inner.to_shared_fd();
let op = ReadAt::new(fd, pos, buffer);
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
compio_runtime::submit(op).await.into_inner().map_advanced()
}

#[cfg(unix)]
Expand All @@ -168,11 +164,7 @@ impl AsyncReadAt for File {
) -> BufResult<usize, T> {
let fd = self.inner.to_shared_fd();
let op = ReadVectoredAt::new(fd, pos, buffer);
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
compio_runtime::submit(op).await.into_inner().map_advanced()
}
}

Expand All @@ -197,7 +189,7 @@ impl AsyncWriteAt for &File {
async fn write_at<T: IoBuf>(&mut self, buffer: T, pos: u64) -> BufResult<usize, T> {
let fd = self.inner.to_shared_fd();
let op = WriteAt::new(fd, pos, buffer);
Runtime::current().submit(op).await.into_inner()
compio_runtime::submit(op).await.into_inner()
}

#[cfg(unix)]
Expand All @@ -208,7 +200,7 @@ impl AsyncWriteAt for &File {
) -> BufResult<usize, T> {
let fd = self.inner.to_shared_fd();
let op = WriteVectoredAt::new(fd, pos, buffer);
Runtime::current().submit(op).await.into_inner()
compio_runtime::submit(op).await.into_inner()
}
}

Expand Down
3 changes: 1 addition & 2 deletions compio-fs/src/metadata/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use std::{

use compio_buf::{BufResult, IntoInner};
use compio_driver::{op::PathStat, syscall};
use compio_runtime::Runtime;

use crate::path_string;

async fn metadata_impl(path: impl AsRef<Path>, follow_symlink: bool) -> io::Result<Metadata> {
let path = path_string(path)?;
let op = PathStat::new(path, follow_symlink);
let BufResult(res, op) = Runtime::current().submit(op).await;
let BufResult(res, op) = compio_runtime::submit(op).await;
res.map(|_| Metadata::from_stat(op.into_inner()))
}

Expand Down
3 changes: 1 addition & 2 deletions compio-fs/src/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};
use compio_buf::{BufResult, IoBuf, IoBufMut};
use compio_driver::{impl_raw_fd, op::ConnectNamedPipe, syscall, AsRawFd, RawFd, ToSharedFd};
use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
use compio_runtime::Runtime;
use widestring::U16CString;
use windows_sys::Win32::{
Storage::FileSystem::{
Expand Down Expand Up @@ -142,7 +141,7 @@ impl NamedPipeServer {
/// ```
pub async fn connect(&self) -> io::Result<()> {
let op = ConnectNamedPipe::new(self.handle.to_shared_fd());
Runtime::current().submit(op).await.0?;
compio_runtime::submit(op).await.0?;
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions compio-fs/src/open_options/unix.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{io, os::fd::FromRawFd, path::Path};

use compio_driver::{op::OpenFile, RawFd};
use compio_runtime::Runtime;

use crate::{path_string, File};

Expand Down Expand Up @@ -87,7 +86,7 @@ impl OpenOptions {
| (self.custom_flags as libc::c_int & !libc::O_ACCMODE);
let p = path_string(p)?;
let op = OpenFile::new(p, flags, self.mode);
let fd = Runtime::current().submit(op).await.0? as RawFd;
let fd = compio_runtime::submit(op).await.0? as RawFd;
File::from_std(unsafe { std::fs::File::from_raw_fd(fd) })
}
}
Loading