From ab3fbfb3106af30e0c7d761f0418de63c686c961 Mon Sep 17 00:00:00 2001 From: Will Date: Sun, 20 Oct 2024 15:24:02 -0400 Subject: [PATCH] watcher-rs: drop tokio in lib, gate serde as a feature --- Cargo.toml | 11 ++++---- watcher-rs/src/lib.rs | 62 ++++++++++++++++++++----------------------- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7dc3d771..6ba576dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,14 +39,15 @@ name = "show-events" path = "watcher-rs/examples/show-events.rs" [features] -cli = ["serde_json", "tokio/rt", "tokio/macros", "tokio/signal"] -default = ["cli"] +serde = ["dep:serde"] +cli = ["dep:serde", "serde_json", "tokio", "tokio/rt", "tokio/macros", "tokio/signal"] +default = ["cli", "serde"] [dependencies] -futures = { version = "0.3.31", default-features = false } -serde = { version = "1.0.205", features = ["derive"] } +futures = { version = "0.3.31", features = ["alloc", "std"], default-features = false } +serde = { version = "1.0.205", features = ["derive"], optional = true } serde_json = { version = "1.0.122", optional = true } -tokio = { version = "1.0.1", features = ["sync"], default-features = false } +tokio = { version = "1.0.1", default-features = false, optional = true } [build-dependencies] cc = "1" diff --git a/watcher-rs/src/lib.rs b/watcher-rs/src/lib.rs index 0c7c86ee..b897a4d5 100644 --- a/watcher-rs/src/lib.rs +++ b/watcher-rs/src/lib.rs @@ -10,16 +10,16 @@ use core::pin::Pin; use core::str; use core::task::Context; use core::task::Poll; +use futures::channel::mpsc::unbounded as async_channel; +use futures::channel::mpsc::UnboundedReceiver as Rx; +use futures::channel::mpsc::UnboundedSender as Tx; use futures::Stream; use serde::Deserialize; use serde::Serialize; -use std::ffi::CString; -use tokio::sync::mpsc::channel as async_channel; -use tokio::sync::mpsc::Receiver; -use tokio::sync::mpsc::Sender; -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] -#[serde(rename_all = "snake_case")] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] pub enum EffectType { Rename, Modify, @@ -29,8 +29,9 @@ pub enum EffectType { Other, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] -#[serde(rename_all = "snake_case")] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] pub enum PathType { Dir, File, @@ -40,8 +41,9 @@ pub enum PathType { Other, } -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(rename = "event")] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename = "event"))] pub struct Event { pub effect_time: i64, pub path_name: String, @@ -97,40 +99,34 @@ fn ev_from_c<'a>(ev: wtr_watcher_event) -> Event { } } -unsafe extern "C" fn callback_bridge(ev: wtr_watcher_event, cx: *mut c_void) { +unsafe extern "C" fn fwd_ev(ev: wtr_watcher_event, cx: *mut c_void) { let ev = ev_from_c(ev); - let tx = transmute::<*mut c_void, &Sender>(cx); - let _ = tx.blocking_send(ev); + let tx = transmute::<*mut c_void, &Tx>(cx); + let _ = tx.unbounded_send(ev); } -#[allow(dead_code)] +#[allow(dead_code)] // tx is used in fwd_ev pub struct Watch { - watcher: *mut c_void, - ev_rx: Receiver, - ev_tx: Box>, + w: *mut c_void, + tx: Box>, + rx: Pin>>, } impl Watch { pub fn try_new(path: &str) -> Result { - let path = CString::new(path).unwrap(); - let (ev_tx, ev_rx) = async_channel(1); - let ev_tx = Box::new(ev_tx); - let ev_tx_opaque = unsafe { transmute::<&Sender, *mut c_void>(&ev_tx) }; - let watcher = - unsafe { wtr_watcher_open(path.as_ptr(), Some(callback_bridge), ev_tx_opaque) }; - if watcher.is_null() { - Err("wtr_watcher_open") - } else { - Ok(Watch { - watcher, - ev_rx, - ev_tx, - }) + let p = path.as_ptr() as *const c_char; + let (tx, rx) = async_channel(); + let tx = Box::new(tx); + let rx = Box::pin(rx); + let cx = unsafe { transmute::<&Tx, *mut c_void>(&tx) }; + match unsafe { wtr_watcher_open(p, Some(fwd_ev), cx) } { + w if w.is_null() => Err("wtr_watcher_open"), + w => Ok(Watch { w, tx, rx }), } } pub fn close(&self) -> Result<(), &'static str> { - match unsafe { wtr_watcher_close(self.watcher) } { + match unsafe { wtr_watcher_close(self.w) } { false => Err("wtr_watcher_close"), true => Ok(()), } @@ -141,7 +137,7 @@ impl Stream for Watch { type Item = Event; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.ev_rx.poll_recv(cx) + self.rx.as_mut().poll_next(cx) } }