diff --git a/common/src/state/chats.rs b/common/src/state/chats.rs index 4b9e855403e..da19a901047 100644 --- a/common/src/state/chats.rs +++ b/common/src/state/chats.rs @@ -141,42 +141,36 @@ impl Chat { pub fn append_pending_msg( &mut self, chat_id: Uuid, + message_id: Uuid, did: DID, msg: Vec, - attachments: &[Location], - ) -> Uuid { - let new = PendingMessage::new(chat_id, did, msg, attachments); - let uuid = new.message.inner.id(); - self.pending_outgoing_messages.push(new); - uuid + ) -> bool { + if self + .pending_outgoing_messages + .iter() + .any(|m| m.id().eq(&message_id)) + { + return false; + } + self.pending_outgoing_messages + .push(PendingMessage::new(chat_id, did, message_id, msg)); + true } - pub fn update_pending_msg(&mut self, msg: PendingMessage, progress: FileProgression) { + pub fn update_pending_msg(&mut self, message_id: Uuid, progress: FileProgression) { let file = progress_file(&progress); - for m in &mut self.pending_outgoing_messages { - if msg.eq(m) { - m.attachments_progress.insert(file, progress); - break; - } + if let Some(m) = &mut self + .pending_outgoing_messages + .iter_mut() + .find(|m| m.id().eq(&message_id)) + { + m.attachments_progress.insert(file, progress); } } - pub fn remove_pending_msg( - &mut self, - msg: Vec, - attachments: Vec, - uuid: Option, - ) { - let opt = self.pending_outgoing_messages.iter().position(|e| { - e.message.inner.lines().eq(&msg) - && e.attachments_progress - .keys() - .all(|a| attachments.contains(a)) - && uuid.map(|id| id.eq(&e.id())).unwrap_or(true) - }); - if let Some(pending) = opt { - self.pending_outgoing_messages.remove(pending); - } + pub fn remove_pending_msg(&mut self, message_id: Uuid) { + self.pending_outgoing_messages + .retain(|m| !m.id().eq(&message_id)) } pub fn unreads(&self) -> u32 { diff --git a/common/src/state/mod.rs b/common/src/state/mod.rs index c3bff415930..e6616781bc9 100644 --- a/common/src/state/mod.rs +++ b/common/src/state/mod.rs @@ -523,17 +523,7 @@ impl State { chat.messages.push_back(message); } self.send_chat_to_top_of_sidebar(conversation_id); - self.decrement_outgoing_messages( - conversation_id, - message_clone.inner.lines(), - message_clone - .inner - .attachments() - .iter() - .map(|f| f.name()) - .collect(), - None, - ); + self.decrement_outgoing_messages(conversation_id, message_clone.inner.id()); } MessageEvent::Edited { conversation_id, @@ -1232,35 +1222,31 @@ impl State { } // indicates that a conversation has a pending outgoing message - // can only send messages to the active chat - pub fn increment_outgoing_messages( - &mut self, - msg: Vec, - attachments: &[Location], - ) -> Option { + // sends it to the active chat + pub fn increment_outgoing_messages(&mut self, message_id: Uuid, msg: Vec) { if let Some(id) = self.chats.active { - return self.increment_outgoing_messages_for(msg, attachments, id); + self.increment_outgoing_messages_for(id, message_id, msg); } - None } pub fn increment_outgoing_messages_for( &mut self, + chat_id: Uuid, + message_id: Uuid, msg: Vec, - attachments: &[Location], - id: Uuid, - ) -> Option { + ) { let did = self.get_own_identity().did_key(); - if let Some(chat) = self.chats.all.get_mut(&id) { - return Some(chat.append_pending_msg(id, did, msg, attachments)); + if let Some(chat) = self.chats.all.get_mut(&chat_id) { + if !chat.append_pending_msg(chat_id, message_id, did, msg) { + log::debug!("attempted to add an already existing pending message"); + } } - None } pub fn update_outgoing_messages( &mut self, conv_id: Uuid, - msg: PendingMessage, + message_id: Uuid, progress: FileProgression, ) -> bool { let mut update = false; @@ -1280,29 +1266,14 @@ impl State { update = true; } if let Some(chat) = self.chats.all.get_mut(&conv_id) { - chat.update_pending_msg(msg, progress); + chat.update_pending_msg(message_id, progress); } update } - pub fn decrement_outgoing_messagess( - &mut self, - conv_id: Uuid, - msg: Vec, - uuid: Option, - ) { - self.decrement_outgoing_messages(conv_id, msg, vec![], uuid); - } - - pub fn decrement_outgoing_messages( - &mut self, - conv_id: Uuid, - msg: Vec, - attachments: Vec, - uuid: Option, - ) { + pub fn decrement_outgoing_messages(&mut self, conv_id: Uuid, message_id: Uuid) { if let Some(chat) = self.chats.all.get_mut(&conv_id) { - chat.remove_pending_msg(msg, attachments, uuid); + chat.remove_pending_msg(message_id); } } diff --git a/common/src/state/pending_message.rs b/common/src/state/pending_message.rs index a09a93ea7f3..cb6d2734926 100644 --- a/common/src/state/pending_message.rs +++ b/common/src/state/pending_message.rs @@ -1,81 +1,32 @@ -use std::{collections::HashMap, ffi::OsStr, path::PathBuf}; +use std::collections::HashMap; use uuid::Uuid; -use warp::{constellation::Progression, crypto::DID, raygun::Location}; +use warp::{constellation::Progression, crypto::DID}; use crate::warp_runner::ui_adapter::Message; // We can improve message equality detection if warp e.g. can send us their assigned uuid. // Else it is just a guesswork #[derive(Clone, Debug)] pub struct PendingMessage { - attachments: Vec, pub attachments_progress: HashMap, pub message: Message, } impl PendingMessage { - // Use this for comparison cases - pub fn for_compare(text: Vec, attachments: &[Location], id: Option) -> Self { - let mut inner = warp::raygun::Message::default(); - if let Some(m_id) = id { - inner.set_id(m_id); - } - inner.set_lines(text); - let message = Message::new(inner, None, Uuid::new_v4().to_string()); - PendingMessage { - attachments: attachments - .iter() - .filter_map(|p| { - let path = match p { - Location::Disk { path } => path.clone(), - Location::Constellation { path } => PathBuf::from(path), - }; - - path.file_name().and_then(OsStr::to_str).map(str::to_string) - }) - .collect::>(), - attachments_progress: HashMap::new(), - message, - } - } - - pub fn new(chat_id: Uuid, did: DID, text: Vec, attachments: &[Location]) -> Self { + pub fn new(chat_id: Uuid, did: DID, message_id: Uuid, text: Vec) -> Self { // Create a dummy message let mut inner = warp::raygun::Message::default(); - inner.set_id(Uuid::new_v4()); + inner.set_id(message_id); inner.set_sender(did); inner.set_conversation_id(chat_id); inner.set_lines(text); - let attachments = attachments - .iter() - .filter(|location| match location { - Location::Disk { path } => path.is_file(), - Location::Constellation { .. } => true, - }) - .cloned() - .collect::>(); - let message = Message::new(inner, None, Uuid::new_v4().to_string()); PendingMessage { - attachments: attachments - .iter() - .map(|p| { - let pathbuf = match p { - Location::Disk { path } => path.clone(), - Location::Constellation { path } => PathBuf::from(path), - }; - pathbuf - .file_name() - .map_or_else(String::new, |ostr| ostr.to_string_lossy().to_string()) - }) - .collect(), attachments_progress: HashMap::new(), message, } } - // UI side id. Messages arriving at warp have a different id! - // This is only for messages that have not been sent to warp yet pub fn id(&self) -> Uuid { self.message.inner.id() } @@ -83,12 +34,7 @@ impl PendingMessage { impl PartialEq for PendingMessage { fn eq(&self, other: &Self) -> bool { - self.message.inner.lines().eq(&other.message.inner.lines()) - && self - .attachments - .iter() - .all(|k| other.attachments.contains(k)) - && self.id().eq(&other.id()) + self.id().eq(&other.id()) } } diff --git a/common/src/warp_runner/manager/commands/raygun_commands.rs b/common/src/warp_runner/manager/commands/raygun_commands.rs index 5d3012dc338..9d6b6272bf8 100644 --- a/common/src/warp_runner/manager/commands/raygun_commands.rs +++ b/common/src/warp_runner/manager/commands/raygun_commands.rs @@ -31,6 +31,8 @@ use crate::{ }, }; +pub type MultiChatResult = Vec<(Uuid, (Uuid, Option))>; + #[allow(clippy::large_enum_variant)] #[derive(Display)] pub enum RayGunCmd { @@ -113,14 +115,14 @@ pub enum RayGunCmd { conv_id: Uuid, msg: Vec, attachments: Vec, - rsp: oneshot::Sender, warp::error::Error>>, + rsp: oneshot::Sender), warp::error::Error>>, }, #[display(fmt = "SendMessageForSeveralChats")] SendMessageForSeveralChats { convs_id: Vec, msg: Vec, attachments: Vec, - rsp: oneshot::Sender, warp::error::Error>>, + rsp: oneshot::Sender>, }, #[display(fmt = "EditMessage")] EditMessage { @@ -149,7 +151,7 @@ pub enum RayGunCmd { reply_to: Uuid, msg: Vec, attachments: Vec, - rsp: oneshot::Sender, warp::error::Error>>, + rsp: oneshot::Sender), warp::error::Error>>, }, // removes all direct conversations involving the recipient #[display(fmt = "RemoveDirectConvs")] @@ -295,14 +297,14 @@ pub async fn handle_raygun_cmd( rsp, } => { let r = if attachments.is_empty() { - messaging.send(conv_id, msg).await.map(|_| None) + messaging.send(conv_id, msg).await.map(|id| (id, None)) } else { //TODO: Pass stream off to attachment events match messaging .attach(conv_id, None, attachments.clone(), msg.clone()) .await { - Ok((_, stream)) => Result::Ok(Some(stream)), + Ok((id, stream)) => Result::Ok((id, Some(stream))), Err(e) => Err(e), } }; @@ -315,25 +317,26 @@ pub async fn handle_raygun_cmd( attachments, rsp, } => { - let mut streams = vec![]; + let mut results = vec![]; for chat_id in convs_id { if attachments.is_empty() { - let _ = messaging.send(chat_id, msg.clone()).await; + match messaging.send(chat_id, msg.clone()).await { + Ok(id) => results.push((chat_id, (id, None))), + Err(e) => log::error!("Raygun: Send files to several chats: {}", e), + } } else { //TODO: Pass stream off to attachment events match messaging .attach(chat_id, None, attachments.clone(), msg.clone()) .await { - Ok((_, stream)) => streams.push((chat_id, stream)), - Err(e) => { - log::error!("Raygun: Send files to several chats: {}", e); - } + Ok((id, stream)) => results.push((chat_id, (id, Some(stream)))), + Err(e) => log::error!("Raygun: Send files to several chats: {}", e), } }; } - let _ = rsp.send(Ok(streams)); + let _ = rsp.send(Ok(results)); } RayGunCmd::EditMessage { conv_id, @@ -372,13 +375,16 @@ pub async fn handle_raygun_cmd( rsp, } => { let r = if attachments.is_empty() { - messaging.reply(conv_id, reply_to, msg).await.map(|_| None) + messaging + .reply(conv_id, reply_to, msg) + .await + .map(|id| (id, None)) } else { match messaging .attach(conv_id, Some(reply_to), attachments, msg) .await { - Ok((_, stream)) => Result::Ok(Some(stream)), + Ok((id, stream)) => Result::Ok((id, Some(stream))), Err(e) => Err(e), } }; diff --git a/common/src/warp_runner/ui_adapter/message_event.rs b/common/src/warp_runner/ui_adapter/message_event.rs index 913e1648ee9..4aecf48b82a 100644 --- a/common/src/warp_runner/ui_adapter/message_event.rs +++ b/common/src/warp_runner/ui_adapter/message_event.rs @@ -9,10 +9,7 @@ use warp::{ use super::Message; use crate::{ - state::{ - self, - pending_message::{FileProgression, PendingMessage}, - }, + state::{self, pending_message::FileProgression}, warp_runner::{ ui_adapter::{convert_raygun_message, did_to_identity}, Messaging, @@ -75,7 +72,7 @@ pub enum MessageEvent { AttachmentProgress { progress: FileProgression, conversation_id: Uuid, - msg: PendingMessage, + msg: Uuid, }, } diff --git a/ui/src/layouts/chats/data/chatbar/mod.rs b/ui/src/layouts/chats/data/chatbar/mod.rs index 87d9ef134e5..1a130a1718a 100644 --- a/ui/src/layouts/chats/data/chatbar/mod.rs +++ b/ui/src/layouts/chats/data/chatbar/mod.rs @@ -8,6 +8,5 @@ use uuid::Uuid; pub struct MsgChInput { pub msg: Vec, pub conv_id: Uuid, - pub appended_msg_id: Option, pub replying_to: Option, } diff --git a/ui/src/layouts/chats/presentation/chatbar/coroutines.rs b/ui/src/layouts/chats/presentation/chatbar/coroutines.rs index 6f4da1c9c24..af7cf47a911 100644 --- a/ui/src/layouts/chats/presentation/chatbar/coroutines.rs +++ b/ui/src/layouts/chats/presentation/chatbar/coroutines.rs @@ -1,7 +1,4 @@ -use std::{ - path::PathBuf, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use common::{ state::{Action, State}, @@ -11,7 +8,7 @@ use common::{ use dioxus::prelude::*; use futures::{channel::oneshot, StreamExt}; use uuid::Uuid; -use warp::raygun::{self, Location}; +use warp::raygun; use crate::{ layouts::chats::data::{self, ChatProps, MsgChInput, TypingInfo, DEFAULT_MESSAGES_TO_TAKE}, @@ -32,7 +29,6 @@ pub fn get_msg_ch( while let Some(MsgChInput { msg, conv_id, - appended_msg_id, replying_to, }) = rx.next().await { @@ -42,7 +38,6 @@ pub fn get_msg_ch( .get_active_chat() .map(|f| f.files_attached_to_send) .unwrap_or_default(); - let msg_clone = msg.clone(); let cmd = match replying_to { Some(reply_to) => RayGunCmd::Reply { conv_id, @@ -58,56 +53,25 @@ pub fn get_msg_ch( rsp: tx, }, }; - let attachments = state - .read() - .get_active_chat() - .map(|f| f.files_attached_to_send) - .unwrap_or_default(); state .write_silent() .mutate(Action::ClearChatAttachments(conv_id)); - let attachment_files: Vec = attachments - .iter() - .map(|p| { - let pathbuf = match p { - Location::Disk { path } => path.clone(), - Location::Constellation { path } => PathBuf::from(path), - }; - pathbuf - .file_name() - .map_or_else(String::new, |ostr| ostr.to_string_lossy().to_string()) - }) - .collect(); if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) { log::error!("failed to send warp command: {}", e); - state.write().decrement_outgoing_messages( - conv_id, - msg_clone, - attachment_files, - appended_msg_id, - ); continue; } let rsp = rx.await.expect("command canceled"); match rsp { - Ok(Some(attachment)) => upload_streams.write().append(( - conv_id, - msg, - attachments, - appended_msg_id, - attachment, - )), + Ok((id, stream)) => { + state.write().increment_outgoing_messages(id, msg.clone()); + if let Some(stream) = stream { + upload_streams.write().append((conv_id, id, stream)); + } + } Err(e) => { log::error!("failed to send message: {}", e); - state.write().decrement_outgoing_messages( - conv_id, - msg_clone, - attachment_files, - appended_msg_id, - ) } - _ => {} } } } diff --git a/ui/src/layouts/chats/presentation/chatbar/mod.rs b/ui/src/layouts/chats/presentation/chatbar/mod.rs index 430a0c5d9c2..f08ed0fdcbf 100644 --- a/ui/src/layouts/chats/presentation/chatbar/mod.rs +++ b/ui/src/layouts/chats/presentation/chatbar/mod.rs @@ -214,13 +214,6 @@ pub fn get_chatbar<'a>(cx: &'a Scoped<'a, ChatProps>) -> Element<'a> { local_typing_ch.send(TypingIndicator::NotTyping); let active_chat_id = chat_data.read().active_chat.id(); - let files_to_upload = state - .read() - .get_active_chat() - .as_ref() - .map(|d| d.files_attached_to_send.clone()) - .unwrap_or_default(); - let msg = state .read() .get_active_chat() @@ -257,13 +250,9 @@ pub fn get_chatbar<'a>(cx: &'a Scoped<'a, ChatProps>) -> Element<'a> { if replying_to.is_some() { state.write().mutate(Action::CancelReply(active_chat_id)); } - let appended_msg_id = state - .write() - .increment_outgoing_messages(msg.clone(), &files_to_upload); msg_ch.send(MsgChInput { msg, conv_id: active_chat_id, - appended_msg_id, replying_to, }); } diff --git a/ui/src/layouts/chats/presentation/quick_profile/mod.rs b/ui/src/layouts/chats/presentation/quick_profile/mod.rs index 38398d1b160..3d5beeff5aa 100644 --- a/ui/src/layouts/chats/presentation/quick_profile/mod.rs +++ b/ui/src/layouts/chats/presentation/quick_profile/mod.rs @@ -49,7 +49,7 @@ enum QuickProfileCmd { BlockFriend(DID), UnBlockFriend(DID), RemoveDirectConvs(DID), - Chat(Option, Vec, Option), + Chat(Option, Vec), AdjustVolume(DID, f32), SendFriendRequest(DID, Vec), } @@ -225,7 +225,7 @@ pub fn QuickProfileContext<'a>(cx: Scope<'a, QuickProfileProps<'a>>) -> Element< ); } } - QuickProfileCmd::Chat(chat, msg, uuid) => { + QuickProfileCmd::Chat(chat, msg) => { let c = match chat { Some(c) => c.id, None => return, @@ -240,18 +240,19 @@ pub fn QuickProfileContext<'a>(cx: Scope<'a, QuickProfileProps<'a>>) -> Element< }; if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) { log::error!("failed to send warp command: {}", e); - state - .write_silent() - .decrement_outgoing_messagess(c, msg_vec, uuid); continue; } let rsp = rx.await.expect("command canceled"); - if let Err(e) = rsp { - log::error!("failed to send message: {}", e); - state - .write_silent() - .decrement_outgoing_messagess(c, msg_vec, uuid); + match rsp { + Ok((id, _)) => { + state + .write_silent() + .increment_outgoing_messages_for(c, id, msg_vec); + } + Err(e) => { + log::error!("failed to send message: {}", e); + } } chat_with.set(Some(c)); } @@ -476,10 +477,7 @@ pub fn QuickProfileContext<'a>(cx: Scope<'a, QuickProfileProps<'a>>) -> Element< placeholder: get_local_text("quickprofile.chat-placeholder"), disable_onblur: true, onreturn: move |(val, _,_): (String,bool,Code)|{ - let ui_id = chat_send.as_ref().and_then(|chat|state - .write_silent() - .increment_outgoing_messages_for(vec![val.clone()], &[], chat.id)); - ch.send(QuickProfileCmd::Chat(chat_send.to_owned(), vec![val], ui_id)); + ch.send(QuickProfileCmd::Chat(chat_send.to_owned(), vec![val])); let script = format!(r#"document.getElementById("{id}-context-menu").classList.add("hidden")"#); let _ = eval(&script); } diff --git a/ui/src/layouts/storage/files_layout/mod.rs b/ui/src/layouts/storage/files_layout/mod.rs index 1233dc86e1b..90617bb2c85 100644 --- a/ui/src/layouts/storage/files_layout/mod.rs +++ b/ui/src/layouts/storage/files_layout/mod.rs @@ -111,18 +111,16 @@ pub fn FilesLayout(cx: Scope<'_>) -> Element<'_> { let send_ch = use_coroutine( cx, |mut rx: UnboundedReceiver<(Vec, Vec)>| { - to_owned![upload_streams, send_files_from_storage]; + to_owned![state, upload_streams, send_files_from_storage]; async move { let warp_cmd_tx = WARP_CMD_CH.tx.clone(); while let Some((files_location, convs_id)) = rx.next().await { let (tx, rx) = oneshot::channel(); - let msg = vec!["".to_owned()]; - let attachments = files_location; if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(RayGunCmd::SendMessageForSeveralChats { convs_id, - msg, - attachments: attachments.clone(), + msg: vec!["".to_owned()], + attachments: files_location, rsp: tx, })) { @@ -131,14 +129,13 @@ pub fn FilesLayout(cx: Scope<'_>) -> Element<'_> { } if let Ok(Ok(streams)) = rx.await { let mut to_append = upload_streams.write(); - for (chat, stream) in streams { - to_append.append(( - chat, - vec!["".to_owned()], - attachments.clone(), - None, - stream, - )) + for (chat, (id, stream)) in streams { + state + .write() + .increment_outgoing_messages(id, vec!["".to_owned()]); + if let Some(stream) = stream { + to_append.append((chat, id, stream)) + } } } send_files_from_storage.set(false); diff --git a/ui/src/utils/async_task_queue.rs b/ui/src/utils/async_task_queue.rs index aaf7cf8a7a6..dc6ccb5b21b 100644 --- a/ui/src/utils/async_task_queue.rs +++ b/ui/src/utils/async_task_queue.rs @@ -3,7 +3,7 @@ use common::{ language::get_local_text_with_args, state::{ data_transfer::{TransferState, TransferStates}, - pending_message::{FileProgression, PendingMessage}, + pending_message::FileProgression, }, warp_runner::{ui_adapter::MessageEvent, WarpEvent}, WARP_EVENT_CH, @@ -22,7 +22,7 @@ use tokio::{ use once_cell::sync::Lazy; use std::{sync::Arc, time::Duration}; use uuid::Uuid; -use warp::raygun::{AttachmentEventStream, AttachmentKind, Location}; +use warp::raygun::{AttachmentEventStream, AttachmentKind}; use super::download::DownloadComplete; @@ -104,48 +104,28 @@ where pub fn chat_upload_stream_handler( cx: &ScopeState, -) -> &UseRef< - AsyncRef<( - Uuid, - Vec, - Vec, - Option, - AttachmentEventStream, - )>, -> { +) -> &UseRef> { async_queue( cx, - |(conv_id, msg, attachments, appended_msg_id, mut stream): ( - Uuid, - Vec, - Vec, - Option, - AttachmentEventStream, - )| { - async move { - while let Some(kind) = stream.next().await { - match kind { - AttachmentKind::Pending(res) => { - if let Err(e) = res { - log::debug!("Error uploading file {}", e); - } - return; + |(conv_id, message_id, mut stream): (Uuid, Uuid, AttachmentEventStream)| async move { + while let Some(kind) = stream.next().await { + match kind { + AttachmentKind::Pending(res) => { + if let Err(e) = res { + log::debug!("Error uploading file {}", e); } - AttachmentKind::AttachedProgress(progress) => { - let progress = progress.into(); - if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message( - MessageEvent::AttachmentProgress { - progress, - conversation_id: conv_id, - msg: PendingMessage::for_compare( - msg.clone(), - &attachments, - appended_msg_id, - ), - }, - )) { - log::error!("failed to send warp_event: {e}"); - } + return; + } + AttachmentKind::AttachedProgress(progress) => { + let progress = progress.into(); + if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message( + MessageEvent::AttachmentProgress { + progress, + conversation_id: conv_id, + msg: message_id, + }, + )) { + log::error!("failed to send warp_event: {e}"); } } }