Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: a race condition in DDS static messages sent as internal events #868

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions yazi-dds/src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use mlua::{ExternalResult, IntoLua, Lua, Value};
use serde::{Deserialize, Serialize};
use serde::Serialize;

use super::{BodyBulk, BodyCd, BodyCustom, BodyHey, BodyHi, BodyHover, BodyRename, BodyYank};
use crate::Payload;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum Body<'a> {
Hi(BodyHi<'a>),
Expand Down
16 changes: 9 additions & 7 deletions yazi-dds/src/body/custom.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use mlua::{IntoLua, Lua, Value};
use serde::{Deserialize, Serialize};
use serde::Serialize;

use super::Body;
use crate::ValueSendable;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub struct BodyCustom {
#[serde(skip)]
pub kind: String,
#[serde(flatten)]
pub value: ValueSendable,
}

impl BodyCustom {
#[inline]
pub fn from_str(kind: &str, value: &str) -> anyhow::Result<Body<'static>> {
let mut me = serde_json::from_str::<Self>(value)?;
kind.clone_into(&mut me.kind);
Ok(me.into())
Ok(Self { kind: kind.to_owned(), value: serde_json::from_str(value)? }.into())
}

#[inline]
Expand All @@ -33,3 +29,9 @@ impl From<BodyCustom> for Body<'_> {
impl IntoLua<'_> for BodyCustom {
fn into_lua(self, lua: &Lua) -> mlua::Result<Value> { self.value.into_lua(lua) }
}

impl Serialize for BodyCustom {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serde::Serialize::serialize(&self.value, serializer)
}
}
6 changes: 3 additions & 3 deletions yazi-dds/src/body/yank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl<'a> BodyYank<'a> {

impl BodyYank<'static> {
#[inline]
pub fn dummy(cut: bool) -> Body<'static> {
Self { cut, urls: Default::default(), dummy: true }.into()
pub fn dummy() -> Body<'static> {
Self { cut: false, urls: Default::default(), dummy: true }.into()
}
}

Expand All @@ -37,7 +37,7 @@ impl IntoLua<'_> for BodyYank<'static> {
if let Some(Cow::Owned(urls)) = Some(self.urls).filter(|_| !self.dummy) {
BodyYankIter { cut: self.cut, urls: urls.into_iter().collect() }.into_lua(lua)
} else {
lua.create_table_from([("cut", self.cut)])?.into_lua(lua)
lua.create_table()?.into_lua(lua)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion yazi-dds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Client {
server.take().map(|h| h.abort());
*server = Server::make().await.ok();
if server.is_some() {
super::STATE.load().await.ok();
super::STATE.load_or_create().await;
}

if mem::replace(&mut first, false) && server.is_some() {
Expand Down
2 changes: 1 addition & 1 deletion yazi-dds/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Pubsub {

pub fn pub_from_yank(cut: bool, urls: &HashSet<Url>) {
if LOCAL.read().contains_key("yank") {
Self::pub_(BodyYank::dummy(cut));
Self::pub_(BodyYank::dummy());
}
if PEERS.read().values().any(|p| p.able("yank")) {
Client::push(BodyYank::borrowed(cut, urls).with_severity(30));
Expand Down
11 changes: 4 additions & 7 deletions yazi-dds/src/sendable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum ValueSendable {
Boolean(bool),
Integer(i64),
Number(f64),
String(Vec<u8>),
String(String),
Table(HashMap<ValueSendableKey, ValueSendable>),
}

Expand All @@ -31,10 +31,7 @@ impl ValueSendable {

let mut map = HashMap::with_capacity(table.len());
for pair in table {
let (ValueSendableKey::String(k), Self::String(v)) = pair else {
continue;
};
if let (Ok(k), Ok(v)) = (String::from_utf8(k), String::from_utf8(v)) {
if let (ValueSendableKey::String(k), Self::String(v)) = pair {
map.insert(k, v);
}
}
Expand All @@ -52,7 +49,7 @@ impl<'a> TryFrom<Value<'a>> for ValueSendable {
Value::LightUserData(_) => Err("light userdata is not supported".into_lua_err())?,
Value::Integer(n) => Self::Integer(n),
Value::Number(n) => Self::Number(n),
Value::String(s) => Self::String(s.as_bytes().to_vec()),
Value::String(s) => Self::String(s.to_str()?.to_owned()),
Value::Table(t) => {
let mut map = HashMap::with_capacity(t.len().map(|l| l as usize)?);
for result in t.pairs::<Value, Value>() {
Expand Down Expand Up @@ -96,7 +93,7 @@ pub enum ValueSendableKey {
Boolean(bool),
Integer(i64),
Number(OrderedFloat),
String(Vec<u8>),
String(String),
}

impl ValueSendableKey {
Expand Down
12 changes: 7 additions & 5 deletions yazi-dds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,20 @@ impl Server {
let Ok(payload) = Payload::from_str(&s) else { return };
let Body::Hi(hi) = payload.body else { return };

let mut clients = CLIENTS.write();
id.replace(payload.sender).and_then(|id| clients.remove(&id));

if let Some(ref state) = *STATE.read() {
state.values().for_each(|s| _ = tx.send(format!("{s}\n")));
if id.is_none() {
if let Some(ref state) = *STATE.read() {
state.values().for_each(|s| _ = tx.send(s.clone()));
}
}

let mut clients = CLIENTS.write();
id.replace(payload.sender).and_then(|id| clients.remove(&id));
clients.insert(payload.sender, Client {
id: payload.sender,
tx,
abilities: hi.abilities.into_iter().map(|s| s.into_owned()).collect(),
});

Self::handle_hey(&clients);
}

Expand Down
52 changes: 30 additions & 22 deletions yazi-dds/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,11 @@ impl State {
}
}

pub async fn load(&self) -> Result<()> {
let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?);
let mut line = String::new();

let mut inner = HashMap::new();
while buf.read_line(&mut line).await? > 0 {
let mut parts = line.splitn(5, ',');
let Some(kind) = parts.next() else { continue };
let Some(_) = parts.next() else { continue };
let Some(severity) = parts.next().and_then(|s| s.parse::<u8>().ok()) else { continue };
let Some(_) = parts.next() else { continue };
let Some(body) = parts.next() else { continue };
inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line));
}

let clients = CLIENTS.read();
for payload in inner.values() {
clients.values().for_each(|c| _ = c.tx.send(format!("{payload}\n")));
pub async fn load_or_create(&self) {
if self.load().await.is_err() {
self.inner.write().replace(Default::default());
self.last.store(timestamp_us(), Ordering::Relaxed);
}

self.inner.write().replace(inner);
self.last.store(timestamp_us(), Ordering::Relaxed);
Ok(())
}

pub async fn drain(&self) -> Result<()> {
Expand All @@ -77,6 +59,32 @@ impl State {
buf.write_u8(b'\n').await?;
}

buf.flush().await?;
Ok(())
}

async fn load(&self) -> Result<()> {
let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?);
let mut line = String::new();

let mut inner = HashMap::new();
while buf.read_line(&mut line).await? > 0 {
let mut parts = line.splitn(5, ',');
let Some(kind) = parts.next() else { continue };
let Some(_) = parts.next() else { continue };
let Some(severity) = parts.next().and_then(|s| s.parse::<u8>().ok()) else { continue };
let Some(_) = parts.next() else { continue };
let Some(body) = parts.next() else { continue };
inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line));
}

let clients = CLIENTS.read();
for payload in inner.values() {
clients.values().for_each(|c| _ = c.tx.send(payload.clone()));
}

self.inner.write().replace(inner);
self.last.store(timestamp_us(), Ordering::Relaxed);
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions yazi-fm/src/app/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ pub(crate) struct App {
impl App {
pub(crate) async fn serve() -> Result<()> {
let term = Term::start()?;
let signals = Signals::start()?;
let (mut rx, signals) = (Event::take(), Signals::start()?);

Lives::register()?;
let mut app = Self { cx: Ctx::make(), term: Some(term), signals };
app.render();

let mut times = 0;
let mut events = Vec::with_capacity(200);
while app.signals.rx.recv_many(&mut events, 50).await > 0 {
while rx.recv_many(&mut events, 50).await > 0 {
for event in events.drain(..) {
times += 1;
app.dispatch(event)?;
Expand All @@ -38,7 +38,7 @@ impl App {
if times >= 50 {
times = 0;
app.render();
} else if let Ok(event) = app.signals.rx.try_recv() {
} else if let Ok(event) = rx.try_recv() {
events.push(event);
emit!(Render);
} else {
Expand Down
2 changes: 2 additions & 0 deletions yazi-fm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ async fn main() -> anyhow::Result<()> {

_ = fdlimit::raise_fd_limit();

yazi_shared::init();

yazi_config::init()?;

yazi_adaptor::init();
Expand Down
31 changes: 10 additions & 21 deletions yazi-fm/src/signals.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use anyhow::Result;
use crossterm::event::{Event as CrosstermEvent, EventStream, KeyEvent, KeyEventKind};
use futures::StreamExt;
use tokio::{select, sync::mpsc, task::JoinHandle};
use tokio::{select, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use yazi_shared::event::Event;

pub(super) struct Signals {
tx: mpsc::UnboundedSender<Event>,
pub rx: mpsc::UnboundedReceiver<Event>,
ct: CancellationToken,
ct: CancellationToken,
}

impl Signals {
pub(super) fn start() -> Result<Self> {
let (tx, rx) = mpsc::unbounded_channel();
let ct = CancellationToken::new();
let mut signals = Self { tx: tx.clone(), rx, ct };
let mut signals = Self { ct: CancellationToken::new() };

Event::init(tx);
signals.spawn_system_task()?;
signals.spawn_crossterm_task();

Expand Down Expand Up @@ -52,12 +47,11 @@ impl Signals {
SIGCONT,
])?;

let tx = self.tx.clone();
Ok(tokio::spawn(async move {
while let Some(signal) = signals.next().await {
match signal {
SIGHUP | SIGTERM | SIGQUIT | SIGINT => {
tx.send(Event::Quit(Default::default())).ok();
Event::Quit(Default::default()).emit();
}
SIGCONT if HIDER.try_acquire().is_ok() => AppProxy::resume(),
_ => {}
Expand All @@ -68,23 +62,18 @@ impl Signals {

fn spawn_crossterm_task(&mut self) -> JoinHandle<()> {
let mut reader = EventStream::new();
let (tx, ct) = (self.tx.clone(), self.ct.clone());
let ct = self.ct.clone();

tokio::spawn(async move {
loop {
select! {
_ = ct.cancelled() => break,
Some(Ok(event)) = reader.next() => {
let event = match event {
// We need to check key event kind;
// otherwise event will be dispatched twice.
CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key),
CrosstermEvent::Paste(str) => Event::Paste(str),
CrosstermEvent::Resize(..) => Event::Resize,
_ => continue,
};
if tx.send(event).is_err() {
break;
match event {
CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key).emit(),
CrosstermEvent::Paste(str) => Event::Paste(str).emit(),
CrosstermEvent::Resize(..) => Event::Resize.emit(),
_ => {},
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions yazi-plugin/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ impl TryFrom<Cmd> for Opt {
let mut data: OptData = c.take_data().unwrap_or_default();

if let Some(args) = c.named.get("args") {
data.args = shell_words::split(args)?
.into_iter()
.map(|s| ValueSendable::String(s.into_bytes()))
.collect();
data.args = shell_words::split(args)?.into_iter().map(ValueSendable::String).collect();
}

Ok(Self { name, sync: c.named.contains_key("sync"), data })
Expand Down
10 changes: 9 additions & 1 deletion yazi-shared/src/event/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::Cmd;
use crate::{Layer, RoCell};

static TX: RoCell<mpsc::UnboundedSender<Event>> = RoCell::new();
static RX: RoCell<mpsc::UnboundedReceiver<Event>> = RoCell::new();

#[derive(Debug)]
pub enum Event {
Expand All @@ -27,7 +28,14 @@ pub struct EventQuit {

impl Event {
#[inline]
pub fn init(tx: mpsc::UnboundedSender<Event>) { TX.init(tx); }
pub fn init() {
let (tx, rx) = mpsc::unbounded_channel();
TX.init(tx);
RX.init(rx);
}

#[inline]
pub fn take() -> mpsc::UnboundedReceiver<Event> { RX.drop() }

#[inline]
pub fn emit(self) { TX.send(self).ok(); }
Expand Down
2 changes: 2 additions & 0 deletions yazi-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ pub use ro_cell::*;
pub use throttle::*;
pub use time::*;
pub use xdg::*;

pub fn init() { event::Event::init(); }
7 changes: 3 additions & 4 deletions yazi-shared/src/ro_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ impl<T> RoCell<T> {
}

#[inline]
pub fn drop(&self) {
unsafe {
*self.0.get() = None;
}
pub fn drop(&self) -> T {
debug_assert!(self.is_initialized());
unsafe { mem::take(&mut *self.0.get()).unwrap_unchecked() }
}

#[inline]
Expand Down