From bca0b58a03c9bc16b0aea8ae1069f8df75af58b7 Mon Sep 17 00:00:00 2001 From: sxyazi Date: Wed, 3 Apr 2024 00:30:32 +0800 Subject: [PATCH] fix: a race condition in DDS static messages sent as internal events --- yazi-dds/src/body/body.rs | 4 +-- yazi-dds/src/body/custom.rs | 16 ++++++----- yazi-dds/src/body/yank.rs | 6 ++-- yazi-dds/src/client.rs | 2 +- yazi-dds/src/pubsub.rs | 2 +- yazi-dds/src/sendable.rs | 11 +++---- yazi-dds/src/server.rs | 12 ++++---- yazi-dds/src/state.rs | 52 ++++++++++++++++++++-------------- yazi-fm/src/app/app.rs | 6 ++-- yazi-fm/src/main.rs | 2 ++ yazi-fm/src/signals.rs | 31 +++++++------------- yazi-plugin/src/opt.rs | 5 +--- yazi-shared/src/event/event.rs | 10 ++++++- yazi-shared/src/lib.rs | 2 ++ yazi-shared/src/ro_cell.rs | 7 ++--- 15 files changed, 87 insertions(+), 81 deletions(-) diff --git a/yazi-dds/src/body/body.rs b/yazi-dds/src/body/body.rs index 25fd87ca7..111a79c71 100644 --- a/yazi-dds/src/body/body.rs +++ b/yazi-dds/src/body/body.rs @@ -1,11 +1,11 @@ use anyhow::Result; use mlua::{ExternalResult, IntoLua, Lua, Value}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use super::{BodyBulk, BodyCd, BodyCustom, BodyHey, BodyHi, BodyHover, BodyRename, BodyYank}; use crate::Payload; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize)] #[serde(untagged)] pub enum Body<'a> { Hi(BodyHi<'a>), diff --git a/yazi-dds/src/body/custom.rs b/yazi-dds/src/body/custom.rs index bf7e0dff1..686c5d349 100644 --- a/yazi-dds/src/body/custom.rs +++ b/yazi-dds/src/body/custom.rs @@ -1,23 +1,19 @@ use mlua::{IntoLua, Lua, Value}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use super::Body; use crate::ValueSendable; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub struct BodyCustom { - #[serde(skip)] pub kind: String, - #[serde(flatten)] pub value: ValueSendable, } impl BodyCustom { #[inline] pub fn from_str(kind: &str, value: &str) -> anyhow::Result> { - let mut me = serde_json::from_str::(value)?; - kind.clone_into(&mut me.kind); - Ok(me.into()) + Ok(Self { kind: kind.to_owned(), value: serde_json::from_str(value)? }.into()) } #[inline] @@ -33,3 +29,9 @@ impl From for Body<'_> { impl IntoLua<'_> for BodyCustom { fn into_lua(self, lua: &Lua) -> mlua::Result { self.value.into_lua(lua) } } + +impl Serialize for BodyCustom { + fn serialize(&self, serializer: S) -> Result { + serde::Serialize::serialize(&self.value, serializer) + } +} diff --git a/yazi-dds/src/body/yank.rs b/yazi-dds/src/body/yank.rs index 6213f4a2b..708b4ff10 100644 --- a/yazi-dds/src/body/yank.rs +++ b/yazi-dds/src/body/yank.rs @@ -23,8 +23,8 @@ impl<'a> BodyYank<'a> { impl BodyYank<'static> { #[inline] - pub fn dummy(cut: bool) -> Body<'static> { - Self { cut, urls: Default::default(), dummy: true }.into() + pub fn dummy() -> Body<'static> { + Self { cut: false, urls: Default::default(), dummy: true }.into() } } @@ -37,7 +37,7 @@ impl IntoLua<'_> for BodyYank<'static> { if let Some(Cow::Owned(urls)) = Some(self.urls).filter(|_| !self.dummy) { BodyYankIter { cut: self.cut, urls: urls.into_iter().collect() }.into_lua(lua) } else { - lua.create_table_from([("cut", self.cut)])?.into_lua(lua) + lua.create_table()?.into_lua(lua) } } } diff --git a/yazi-dds/src/client.rs b/yazi-dds/src/client.rs index 649ff4977..5cf5c4d69 100644 --- a/yazi-dds/src/client.rs +++ b/yazi-dds/src/client.rs @@ -84,7 +84,7 @@ impl Client { server.take().map(|h| h.abort()); *server = Server::make().await.ok(); if server.is_some() { - super::STATE.load().await.ok(); + super::STATE.load_or_create().await; } if mem::replace(&mut first, false) && server.is_some() { diff --git a/yazi-dds/src/pubsub.rs b/yazi-dds/src/pubsub.rs index 023c47e7f..aa50a0211 100644 --- a/yazi-dds/src/pubsub.rs +++ b/yazi-dds/src/pubsub.rs @@ -133,7 +133,7 @@ impl Pubsub { pub fn pub_from_yank(cut: bool, urls: &HashSet) { if LOCAL.read().contains_key("yank") { - Self::pub_(BodyYank::dummy(cut)); + Self::pub_(BodyYank::dummy()); } if PEERS.read().values().any(|p| p.able("yank")) { Client::push(BodyYank::borrowed(cut, urls).with_severity(30)); diff --git a/yazi-dds/src/sendable.rs b/yazi-dds/src/sendable.rs index 17b257c0f..cc1125151 100644 --- a/yazi-dds/src/sendable.rs +++ b/yazi-dds/src/sendable.rs @@ -11,7 +11,7 @@ pub enum ValueSendable { Boolean(bool), Integer(i64), Number(f64), - String(Vec), + String(String), Table(HashMap), } @@ -31,10 +31,7 @@ impl ValueSendable { let mut map = HashMap::with_capacity(table.len()); for pair in table { - let (ValueSendableKey::String(k), Self::String(v)) = pair else { - continue; - }; - if let (Ok(k), Ok(v)) = (String::from_utf8(k), String::from_utf8(v)) { + if let (ValueSendableKey::String(k), Self::String(v)) = pair { map.insert(k, v); } } @@ -52,7 +49,7 @@ impl<'a> TryFrom> for ValueSendable { Value::LightUserData(_) => Err("light userdata is not supported".into_lua_err())?, Value::Integer(n) => Self::Integer(n), Value::Number(n) => Self::Number(n), - Value::String(s) => Self::String(s.as_bytes().to_vec()), + Value::String(s) => Self::String(s.to_str()?.to_owned()), Value::Table(t) => { let mut map = HashMap::with_capacity(t.len().map(|l| l as usize)?); for result in t.pairs::() { @@ -96,7 +93,7 @@ pub enum ValueSendableKey { Boolean(bool), Integer(i64), Number(OrderedFloat), - String(Vec), + String(String), } impl ValueSendableKey { diff --git a/yazi-dds/src/server.rs b/yazi-dds/src/server.rs index f7a243cd0..52b4b7520 100644 --- a/yazi-dds/src/server.rs +++ b/yazi-dds/src/server.rs @@ -95,18 +95,20 @@ impl Server { let Ok(payload) = Payload::from_str(&s) else { return }; let Body::Hi(hi) = payload.body else { return }; - let mut clients = CLIENTS.write(); - id.replace(payload.sender).and_then(|id| clients.remove(&id)); - - if let Some(ref state) = *STATE.read() { - state.values().for_each(|s| _ = tx.send(format!("{s}\n"))); + if id.is_none() { + if let Some(ref state) = *STATE.read() { + state.values().for_each(|s| _ = tx.send(s.clone())); + } } + let mut clients = CLIENTS.write(); + id.replace(payload.sender).and_then(|id| clients.remove(&id)); clients.insert(payload.sender, Client { id: payload.sender, tx, abilities: hi.abilities.into_iter().map(|s| s.into_owned()).collect(), }); + Self::handle_hey(&clients); } diff --git a/yazi-dds/src/state.rs b/yazi-dds/src/state.rs index 36d1ef8ef..acdc27dcb 100644 --- a/yazi-dds/src/state.rs +++ b/yazi-dds/src/state.rs @@ -30,29 +30,11 @@ impl State { } } - pub async fn load(&self) -> Result<()> { - let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?); - let mut line = String::new(); - - let mut inner = HashMap::new(); - while buf.read_line(&mut line).await? > 0 { - let mut parts = line.splitn(5, ','); - let Some(kind) = parts.next() else { continue }; - let Some(_) = parts.next() else { continue }; - let Some(severity) = parts.next().and_then(|s| s.parse::().ok()) else { continue }; - let Some(_) = parts.next() else { continue }; - let Some(body) = parts.next() else { continue }; - inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line)); - } - - let clients = CLIENTS.read(); - for payload in inner.values() { - clients.values().for_each(|c| _ = c.tx.send(format!("{payload}\n"))); + pub async fn load_or_create(&self) { + if self.load().await.is_err() { + self.inner.write().replace(Default::default()); + self.last.store(timestamp_us(), Ordering::Relaxed); } - - self.inner.write().replace(inner); - self.last.store(timestamp_us(), Ordering::Relaxed); - Ok(()) } pub async fn drain(&self) -> Result<()> { @@ -77,6 +59,32 @@ impl State { buf.write_u8(b'\n').await?; } + buf.flush().await?; + Ok(()) + } + + async fn load(&self) -> Result<()> { + let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?); + let mut line = String::new(); + + let mut inner = HashMap::new(); + while buf.read_line(&mut line).await? > 0 { + let mut parts = line.splitn(5, ','); + let Some(kind) = parts.next() else { continue }; + let Some(_) = parts.next() else { continue }; + let Some(severity) = parts.next().and_then(|s| s.parse::().ok()) else { continue }; + let Some(_) = parts.next() else { continue }; + let Some(body) = parts.next() else { continue }; + inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line)); + } + + let clients = CLIENTS.read(); + for payload in inner.values() { + clients.values().for_each(|c| _ = c.tx.send(payload.clone())); + } + + self.inner.write().replace(inner); + self.last.store(timestamp_us(), Ordering::Relaxed); Ok(()) } diff --git a/yazi-fm/src/app/app.rs b/yazi-fm/src/app/app.rs index 3d7e91d02..f69da7e34 100644 --- a/yazi-fm/src/app/app.rs +++ b/yazi-fm/src/app/app.rs @@ -17,7 +17,7 @@ pub(crate) struct App { impl App { pub(crate) async fn serve() -> Result<()> { let term = Term::start()?; - let signals = Signals::start()?; + let (mut rx, signals) = (Event::take(), Signals::start()?); Lives::register()?; let mut app = Self { cx: Ctx::make(), term: Some(term), signals }; @@ -25,7 +25,7 @@ impl App { let mut times = 0; let mut events = Vec::with_capacity(200); - while app.signals.rx.recv_many(&mut events, 50).await > 0 { + while rx.recv_many(&mut events, 50).await > 0 { for event in events.drain(..) { times += 1; app.dispatch(event)?; @@ -38,7 +38,7 @@ impl App { if times >= 50 { times = 0; app.render(); - } else if let Ok(event) = app.signals.rx.try_recv() { + } else if let Ok(event) = rx.try_recv() { events.push(event); emit!(Render); } else { diff --git a/yazi-fm/src/main.rs b/yazi-fm/src/main.rs index 92e6782c3..d8d92685d 100644 --- a/yazi-fm/src/main.rs +++ b/yazi-fm/src/main.rs @@ -39,6 +39,8 @@ async fn main() -> anyhow::Result<()> { _ = fdlimit::raise_fd_limit(); + yazi_shared::init(); + yazi_config::init()?; yazi_adaptor::init(); diff --git a/yazi-fm/src/signals.rs b/yazi-fm/src/signals.rs index 8a1a8b2ba..8d1e7193e 100644 --- a/yazi-fm/src/signals.rs +++ b/yazi-fm/src/signals.rs @@ -1,23 +1,18 @@ use anyhow::Result; use crossterm::event::{Event as CrosstermEvent, EventStream, KeyEvent, KeyEventKind}; use futures::StreamExt; -use tokio::{select, sync::mpsc, task::JoinHandle}; +use tokio::{select, task::JoinHandle}; use tokio_util::sync::CancellationToken; use yazi_shared::event::Event; pub(super) struct Signals { - tx: mpsc::UnboundedSender, - pub rx: mpsc::UnboundedReceiver, - ct: CancellationToken, + ct: CancellationToken, } impl Signals { pub(super) fn start() -> Result { - let (tx, rx) = mpsc::unbounded_channel(); - let ct = CancellationToken::new(); - let mut signals = Self { tx: tx.clone(), rx, ct }; + let mut signals = Self { ct: CancellationToken::new() }; - Event::init(tx); signals.spawn_system_task()?; signals.spawn_crossterm_task(); @@ -52,12 +47,11 @@ impl Signals { SIGCONT, ])?; - let tx = self.tx.clone(); Ok(tokio::spawn(async move { while let Some(signal) = signals.next().await { match signal { SIGHUP | SIGTERM | SIGQUIT | SIGINT => { - tx.send(Event::Quit(Default::default())).ok(); + Event::Quit(Default::default()).emit(); } SIGCONT if HIDER.try_acquire().is_ok() => AppProxy::resume(), _ => {} @@ -68,23 +62,18 @@ impl Signals { fn spawn_crossterm_task(&mut self) -> JoinHandle<()> { let mut reader = EventStream::new(); - let (tx, ct) = (self.tx.clone(), self.ct.clone()); + let ct = self.ct.clone(); tokio::spawn(async move { loop { select! { _ = ct.cancelled() => break, Some(Ok(event)) = reader.next() => { - let event = match event { - // We need to check key event kind; - // otherwise event will be dispatched twice. - CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key), - CrosstermEvent::Paste(str) => Event::Paste(str), - CrosstermEvent::Resize(..) => Event::Resize, - _ => continue, - }; - if tx.send(event).is_err() { - break; + match event { + CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key).emit(), + CrosstermEvent::Paste(str) => Event::Paste(str).emit(), + CrosstermEvent::Resize(..) => Event::Resize.emit(), + _ => {}, } } } diff --git a/yazi-plugin/src/opt.rs b/yazi-plugin/src/opt.rs index 54608a4ec..d97b5c70d 100644 --- a/yazi-plugin/src/opt.rs +++ b/yazi-plugin/src/opt.rs @@ -26,10 +26,7 @@ impl TryFrom for Opt { let mut data: OptData = c.take_data().unwrap_or_default(); if let Some(args) = c.named.get("args") { - data.args = shell_words::split(args)? - .into_iter() - .map(|s| ValueSendable::String(s.into_bytes())) - .collect(); + data.args = shell_words::split(args)?.into_iter().map(ValueSendable::String).collect(); } Ok(Self { name, sync: c.named.contains_key("sync"), data }) diff --git a/yazi-shared/src/event/event.rs b/yazi-shared/src/event/event.rs index 808d7a0d1..6acb504d2 100644 --- a/yazi-shared/src/event/event.rs +++ b/yazi-shared/src/event/event.rs @@ -7,6 +7,7 @@ use super::Cmd; use crate::{Layer, RoCell}; static TX: RoCell> = RoCell::new(); +static RX: RoCell> = RoCell::new(); #[derive(Debug)] pub enum Event { @@ -27,7 +28,14 @@ pub struct EventQuit { impl Event { #[inline] - pub fn init(tx: mpsc::UnboundedSender) { TX.init(tx); } + pub fn init() { + let (tx, rx) = mpsc::unbounded_channel(); + TX.init(tx); + RX.init(rx); + } + + #[inline] + pub fn take() -> mpsc::UnboundedReceiver { RX.drop() } #[inline] pub fn emit(self) { TX.send(self).ok(); } diff --git a/yazi-shared/src/lib.rs b/yazi-shared/src/lib.rs index 465e05faf..f38385847 100644 --- a/yazi-shared/src/lib.rs +++ b/yazi-shared/src/lib.rs @@ -32,3 +32,5 @@ pub use ro_cell::*; pub use throttle::*; pub use time::*; pub use xdg::*; + +pub fn init() { event::Event::init(); } diff --git a/yazi-shared/src/ro_cell.rs b/yazi-shared/src/ro_cell.rs index 04eea2ef6..3783f618a 100644 --- a/yazi-shared/src/ro_cell.rs +++ b/yazi-shared/src/ro_cell.rs @@ -34,10 +34,9 @@ impl RoCell { } #[inline] - pub fn drop(&self) { - unsafe { - *self.0.get() = None; - } + pub fn drop(&self) -> T { + debug_assert!(self.is_initialized()); + unsafe { mem::take(&mut *self.0.get()).unwrap_unchecked() } } #[inline]