Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add logs #132

Merged
merged 10 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"compio-dispatcher",
"compio-io",
"compio-tls",
"compio-log",
]
resolver = "2"

Expand All @@ -31,6 +32,7 @@ compio-io = { path = "./compio-io", version = "0.1.0" }
compio-net = { path = "./compio-net", version = "0.2.0" }
compio-signal = { path = "./compio-signal", version = "0.1.1" }
compio-dispatcher = { path = "./compio-dispatcher", version = "0.1.0" }
compio-log = { path = "./compio-log", version = "0.1.0" }

cfg-if = "1.0.0"
crossbeam-channel = "0.5.8"
Expand Down
1 change: 1 addition & 0 deletions compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ targets = [
[dependencies]
# Workspace dependencies
compio-buf = { workspace = true }
compio-log = { workspace = true }

# Utils
cfg-if = { workspace = true }
Expand Down
19 changes: 18 additions & 1 deletion compio-driver/src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
};

use compio_buf::arrayvec::ArrayVec;
use compio_log::{instrument, trace};
use slab::Slab;
use windows_sys::Win32::{
Foundation::{
Expand Down Expand Up @@ -155,10 +156,12 @@ impl Driver {
const DEFAULT_CAPACITY: usize = 1024;

pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
let mut data: WSADATA = unsafe { std::mem::zeroed() };
syscall!(SOCKET, WSAStartup(0x202, &mut data))?;

let port = syscall!(BOOL, CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0))?;
trace!("new iocp driver at port: {port}");
let port = unsafe { OwnedHandle::from_raw_handle(port as _) };
Ok(Self {
port,
Expand All @@ -173,6 +176,7 @@ impl Driver {
timeout: Option<Duration>,
iocp_entries: &mut ArrayVec<OVERLAPPED_ENTRY, N>,
) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll_impl", ?timeout);
let mut recv_count = 0;
let timeout = match timeout {
Some(timeout) => timeout.as_millis() as u32,
Expand All @@ -189,6 +193,7 @@ impl Driver {
0,
)
)?;
trace!("recv_count: {recv_count}");
unsafe {
iocp_entries.set_len(recv_count as _);
}
Expand All @@ -199,6 +204,7 @@ impl Driver {
if iocp_entry.lpOverlapped.is_null() {
// This entry is posted by `post_driver_nop`.
let user_data = iocp_entry.lpCompletionKey;
trace!("entry {user_data} is posted by post_driver_nop");
let result = if self.cancelled.remove(&user_data) {
Err(io::Error::from_raw_os_error(ERROR_OPERATION_ABORTED as _))
} else {
Expand All @@ -208,6 +214,7 @@ impl Driver {
} else {
let transferred = iocp_entry.dwNumberOfBytesTransferred;
// Any thin pointer is OK because we don't use the type of opcode.
trace!("entry transferred: {transferred}");
let overlapped_ptr: *mut Overlapped<()> = iocp_entry.lpOverlapped.cast();
let overlapped = unsafe { &*overlapped_ptr };
let res = if matches!(
Expand Down Expand Up @@ -242,21 +249,27 @@ impl Driver {
}

pub fn cancel(&mut self, user_data: usize, registry: &mut Slab<RawOp>) {
instrument!(compio_log::Level::TRACE, "cancel", user_data);
trace!("cancel RawOp");
self.cancelled.insert(user_data);
if let Some(op) = registry.get_mut(user_data) {
let overlapped_ptr = op.as_mut_ptr();
let op = op.as_op_pin();
// It's OK to fail to cancel.
trace!("call OpCode::cancel");
unsafe { op.cancel(overlapped_ptr.cast()) }.ok();
}
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
instrument!(compio_log::Level::TRACE, "push", user_data);
if self.cancelled.remove(&user_data) {
trace!("pushed RawOp already cancelled");
Poll::Ready(Err(io::Error::from_raw_os_error(
ERROR_OPERATION_ABORTED as _,
)))
} else {
trace!("push RawOp");
let optr = op.as_mut_ptr();
let op_pin = op.as_op_pin();
if op_pin.is_overlapped() {
Expand Down Expand Up @@ -313,6 +326,7 @@ impl Driver {
entries: &mut impl Extend<Entry>,
_registry: &mut Slab<RawOp>,
) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Prevent stack growth.
let mut iocp_entries = ArrayVec::<OVERLAPPED_ENTRY, { Self::DEFAULT_CAPACITY }>::new();
self.poll_impl(timeout, &mut iocp_entries)?;
Expand All @@ -325,7 +339,10 @@ impl Driver {
entries.extend(iocp_entries.drain(..).filter_map(|e| self.create_entry(e)));
}
Err(e) => match e.kind() {
io::ErrorKind::TimedOut => break,
io::ErrorKind::TimedOut => {
trace!("poll timeout");
break;
}
_ => return Err(e),
},
}
Expand Down
17 changes: 17 additions & 0 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{collections::VecDeque, io, pin::Pin, task::Poll, time::Duration};

use compio_log::{instrument, trace};
use io_uring::{
cqueue,
opcode::AsyncCancel,
Expand Down Expand Up @@ -34,6 +35,8 @@ impl Driver {
const CANCEL: u64 = u64::MAX;

pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
trace!("new iour driver");
Ok(Self {
inner: IoUring::new(builder.capacity)?,
squeue: VecDeque::with_capacity(builder.capacity as usize),
Expand All @@ -42,6 +45,7 @@ impl Driver {

// Auto means that it choose to wait or not automatically.
fn submit_auto(&mut self, timeout: Option<Duration>, wait: bool) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout, wait);
let res = if wait {
// Last part of submission queue, wait till timeout.
if let Some(duration) = timeout {
Expand All @@ -54,6 +58,7 @@ impl Driver {
} else {
self.inner.submit()
};
trace!("submit result: {res:?}");
match res {
Ok(_) => Ok(()),
Err(e) => match e.raw_os_error() {
Expand All @@ -65,12 +70,15 @@ impl Driver {
}

fn flush_submissions(&mut self) -> bool {
instrument!(compio_log::Level::TRACE, "flush_submissions");

let mut ended_ops = false;

let mut inner_squeue = self.inner.submission();

while !inner_squeue.is_full() {
if self.squeue.len() <= inner_squeue.capacity() - inner_squeue.len() {
trace!("inner_squeue have enough space, flush all entries");
let (s1, s2) = self.squeue.as_slices();
unsafe {
inner_squeue
Expand All @@ -84,8 +92,10 @@ impl Driver {
ended_ops = true;
break;
} else if let Some(entry) = self.squeue.pop_front() {
trace!("inner_squeue have not enough space, flush an entry");
unsafe { inner_squeue.push(&entry) }.expect("queue has enough space");
} else {
trace!("self.squeue is empty, skip");
ended_ops = true;
break;
}
Expand All @@ -112,6 +122,8 @@ impl Driver {
}

pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab<RawOp>) {
instrument!(compio_log::Level::TRACE, "cancel", user_data);
trace!("cancel RawOp");
self.squeue.push_back(
AsyncCancel::new(user_data as _)
.build()
Expand All @@ -120,7 +132,9 @@ impl Driver {
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
instrument!(compio_log::Level::TRACE, "push", user_data);
let op = op.as_pin();
trace!("push RawOp");
self.squeue
.push_back(op.create_entry().user_data(user_data as _));
Poll::Pending
Expand All @@ -132,7 +146,9 @@ impl Driver {
entries: &mut impl Extend<Entry>,
_registry: &mut Slab<RawOp>,
) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Anyway we need to submit once, no matter there are entries in squeue.
trace!("start polling");
loop {
let ended = self.flush_submissions();

Expand All @@ -141,6 +157,7 @@ impl Driver {
self.poll_entries(entries);

if ended {
trace!("polling ended");
break;
}
}
Expand Down
19 changes: 19 additions & 0 deletions compio-log/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "compio-log"
version = "0.1.0"
description = "log of compio"
categories = ["asynchronous"]
edition = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[dependencies]
tracing = { version = "0.1", default-features = false }

[dev-dependencies]
tracing-subscriber = "0.3"

[features]
enable_log = []
71 changes: 71 additions & 0 deletions compio-log/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#[macro_export]
macro_rules! debug {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! debug_span {
($($args:tt)*) => {
$crate::Span::none()
};
}

#[macro_export]
macro_rules! error {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! error_span {
($($args:tt)*) => {
$crate::Span::none()
};
}

#[macro_export]
macro_rules! event {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! info {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! info_span {
($($args:tt)*) => {
$crate::Span::none()
};
}

#[macro_export]
macro_rules! span {
($($args:tt)*) => {
$crate::Span::none()
};
}

#[macro_export]
macro_rules! trace {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! trace_span {
($($args:tt)*) => {
$crate::Span::none()
};
}

#[macro_export]
macro_rules! warn {
($($args:tt)*) => {};
}

#[macro_export]
macro_rules! warn_span {
($($args:tt)*) => {
$crate::Span::none()
};
}
23 changes: 23 additions & 0 deletions compio-log/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#[cfg_attr(not(feature = "enable_log"), doc(hidden))]
pub use tracing::*;
ho-229 marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(not(feature = "enable_log"))]
pub mod dummy;

#[cfg(feature = "enable_log")]
#[macro_export]
macro_rules! instrument {
ho-229 marked this conversation as resolved.
Show resolved Hide resolved
($lvl:expr, $name:expr, $($fields:tt)*) => {
let _guard = $crate::span!(target:module_path!(), $lvl, $name, $($fields)*).entered();
};
($lvl:expr, $name:expr) => {
let _guard = $crate::span!(target:module_path!(), $lvl, $name).entered();
};
}

#[cfg(not(feature = "enable_log"))]
#[macro_export]
macro_rules! instrument {
($lvl:expr, $name:expr, $($fields:tt)*) => {};
($lvl:expr, $name:expr) => {};
}
15 changes: 15 additions & 0 deletions compio-log/tests/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use compio_log::Level;

#[test]
fn test_log() {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();

compio_log::debug!("debug");
compio_log::error!("error");
compio_log::event!(Level::DEBUG, "event");
compio_log::info!("info");
compio_log::warn!("warn");
compio_log::trace!("trace");
}
1 change: 1 addition & 0 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ targets = [
# Workspace dependencies
compio-driver = { workspace = true }
compio-buf = { workspace = true }
compio-log = { workspace = true }

async-task = "4.5.0"
cfg-if = { workspace = true, optional = true }
Expand Down
8 changes: 7 additions & 1 deletion compio-runtime/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A typed wrapper for key of Ops submitted into driver
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash)]
pub struct Key<T> {
user_data: usize,
_p: std::marker::PhantomData<fn(T)>,
Expand Down Expand Up @@ -38,3 +38,9 @@ impl<T> std::ops::Deref for Key<T> {
&self.user_data
}
}

impl<T> std::fmt::Debug for Key<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Key({})", self.user_data)
}
}
Loading