Skip to content

Commit

Permalink
watcher-rs: drop tokio in lib, gate serde as a feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Oct 20, 2024
1 parent c9d781f commit ab3fbfb
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 38 deletions.
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ name = "show-events"
path = "watcher-rs/examples/show-events.rs"

[features]
cli = ["serde_json", "tokio/rt", "tokio/macros", "tokio/signal"]
default = ["cli"]
serde = ["dep:serde"]
cli = ["dep:serde", "serde_json", "tokio", "tokio/rt", "tokio/macros", "tokio/signal"]
default = ["cli", "serde"]

[dependencies]
futures = { version = "0.3.31", default-features = false }
serde = { version = "1.0.205", features = ["derive"] }
futures = { version = "0.3.31", features = ["alloc", "std"], default-features = false }
serde = { version = "1.0.205", features = ["derive"], optional = true }
serde_json = { version = "1.0.122", optional = true }
tokio = { version = "1.0.1", features = ["sync"], default-features = false }
tokio = { version = "1.0.1", default-features = false, optional = true }

[build-dependencies]
cc = "1"
Expand Down
62 changes: 29 additions & 33 deletions watcher-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use core::pin::Pin;
use core::str;
use core::task::Context;
use core::task::Poll;
use futures::channel::mpsc::unbounded as async_channel;
use futures::channel::mpsc::UnboundedReceiver as Rx;
use futures::channel::mpsc::UnboundedSender as Tx;
use futures::Stream;
use serde::Deserialize;
use serde::Serialize;
use std::ffi::CString;
use tokio::sync::mpsc::channel as async_channel;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(rename_all = "snake_case")]
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
pub enum EffectType {
Rename,
Modify,
Expand All @@ -29,8 +29,9 @@ pub enum EffectType {
Other,
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(rename_all = "snake_case")]
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
pub enum PathType {
Dir,
File,
Expand All @@ -40,8 +41,9 @@ pub enum PathType {
Other,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename = "event")]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename = "event"))]
pub struct Event {
pub effect_time: i64,
pub path_name: String,
Expand Down Expand Up @@ -97,40 +99,34 @@ fn ev_from_c<'a>(ev: wtr_watcher_event) -> Event {
}
}

unsafe extern "C" fn callback_bridge(ev: wtr_watcher_event, cx: *mut c_void) {
unsafe extern "C" fn fwd_ev(ev: wtr_watcher_event, cx: *mut c_void) {
let ev = ev_from_c(ev);
let tx = transmute::<*mut c_void, &Sender<Event>>(cx);
let _ = tx.blocking_send(ev);
let tx = transmute::<*mut c_void, &Tx<Event>>(cx);
let _ = tx.unbounded_send(ev);
}

#[allow(dead_code)]
#[allow(dead_code)] // tx is used in fwd_ev
pub struct Watch {
watcher: *mut c_void,
ev_rx: Receiver<Event>,
ev_tx: Box<Sender<Event>>,
w: *mut c_void,
tx: Box<Tx<Event>>,
rx: Pin<Box<Rx<Event>>>,
}

impl Watch {
pub fn try_new(path: &str) -> Result<Watch, &'static str> {
let path = CString::new(path).unwrap();
let (ev_tx, ev_rx) = async_channel(1);
let ev_tx = Box::new(ev_tx);
let ev_tx_opaque = unsafe { transmute::<&Sender<Event>, *mut c_void>(&ev_tx) };
let watcher =
unsafe { wtr_watcher_open(path.as_ptr(), Some(callback_bridge), ev_tx_opaque) };
if watcher.is_null() {
Err("wtr_watcher_open")
} else {
Ok(Watch {
watcher,
ev_rx,
ev_tx,
})
let p = path.as_ptr() as *const c_char;
let (tx, rx) = async_channel();
let tx = Box::new(tx);
let rx = Box::pin(rx);
let cx = unsafe { transmute::<&Tx<Event>, *mut c_void>(&tx) };
match unsafe { wtr_watcher_open(p, Some(fwd_ev), cx) } {
w if w.is_null() => Err("wtr_watcher_open"),
w => Ok(Watch { w, tx, rx }),
}
}

pub fn close(&self) -> Result<(), &'static str> {
match unsafe { wtr_watcher_close(self.watcher) } {
match unsafe { wtr_watcher_close(self.w) } {
false => Err("wtr_watcher_close"),
true => Ok(()),
}
Expand All @@ -141,7 +137,7 @@ impl Stream for Watch {
type Item = Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
self.ev_rx.poll_recv(cx)
self.rx.as_mut().poll_next(cx)
}
}

Expand Down

0 comments on commit ab3fbfb

Please sign in to comment.