Skip to content

Commit

Permalink
chore(msg): UUID pending msg tracking (#1909)
Browse files Browse the repository at this point in the history
  • Loading branch information
Flemmli97 authored Mar 15, 2024
1 parent d668299 commit 9b687fd
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 274 deletions.
50 changes: 22 additions & 28 deletions common/src/state/chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,42 +141,36 @@ impl Chat {
pub fn append_pending_msg(
&mut self,
chat_id: Uuid,
message_id: Uuid,
did: DID,
msg: Vec<String>,
attachments: &[Location],
) -> Uuid {
let new = PendingMessage::new(chat_id, did, msg, attachments);
let uuid = new.message.inner.id();
self.pending_outgoing_messages.push(new);
uuid
) -> bool {
if self
.pending_outgoing_messages
.iter()
.any(|m| m.id().eq(&message_id))
{
return false;
}
self.pending_outgoing_messages
.push(PendingMessage::new(chat_id, did, message_id, msg));
true
}

pub fn update_pending_msg(&mut self, msg: PendingMessage, progress: FileProgression) {
pub fn update_pending_msg(&mut self, message_id: Uuid, progress: FileProgression) {
let file = progress_file(&progress);
for m in &mut self.pending_outgoing_messages {
if msg.eq(m) {
m.attachments_progress.insert(file, progress);
break;
}
if let Some(m) = &mut self
.pending_outgoing_messages
.iter_mut()
.find(|m| m.id().eq(&message_id))
{
m.attachments_progress.insert(file, progress);
}
}

pub fn remove_pending_msg(
&mut self,
msg: Vec<String>,
attachments: Vec<String>,
uuid: Option<Uuid>,
) {
let opt = self.pending_outgoing_messages.iter().position(|e| {
e.message.inner.lines().eq(&msg)
&& e.attachments_progress
.keys()
.all(|a| attachments.contains(a))
&& uuid.map(|id| id.eq(&e.id())).unwrap_or(true)
});
if let Some(pending) = opt {
self.pending_outgoing_messages.remove(pending);
}
pub fn remove_pending_msg(&mut self, message_id: Uuid) {
self.pending_outgoing_messages
.retain(|m| !m.id().eq(&message_id))
}

pub fn unreads(&self) -> u32 {
Expand Down
59 changes: 15 additions & 44 deletions common/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,17 +523,7 @@ impl State {
chat.messages.push_back(message);
}
self.send_chat_to_top_of_sidebar(conversation_id);
self.decrement_outgoing_messages(
conversation_id,
message_clone.inner.lines(),
message_clone
.inner
.attachments()
.iter()
.map(|f| f.name())
.collect(),
None,
);
self.decrement_outgoing_messages(conversation_id, message_clone.inner.id());
}
MessageEvent::Edited {
conversation_id,
Expand Down Expand Up @@ -1232,35 +1222,31 @@ impl State {
}

// indicates that a conversation has a pending outgoing message
// can only send messages to the active chat
pub fn increment_outgoing_messages(
&mut self,
msg: Vec<String>,
attachments: &[Location],
) -> Option<Uuid> {
// sends it to the active chat
pub fn increment_outgoing_messages(&mut self, message_id: Uuid, msg: Vec<String>) {
if let Some(id) = self.chats.active {
return self.increment_outgoing_messages_for(msg, attachments, id);
self.increment_outgoing_messages_for(id, message_id, msg);
}
None
}

pub fn increment_outgoing_messages_for(
&mut self,
chat_id: Uuid,
message_id: Uuid,
msg: Vec<String>,
attachments: &[Location],
id: Uuid,
) -> Option<Uuid> {
) {
let did = self.get_own_identity().did_key();
if let Some(chat) = self.chats.all.get_mut(&id) {
return Some(chat.append_pending_msg(id, did, msg, attachments));
if let Some(chat) = self.chats.all.get_mut(&chat_id) {
if !chat.append_pending_msg(chat_id, message_id, did, msg) {
log::debug!("attempted to add an already existing pending message");
}
}
None
}

pub fn update_outgoing_messages(
&mut self,
conv_id: Uuid,
msg: PendingMessage,
message_id: Uuid,
progress: FileProgression,
) -> bool {
let mut update = false;
Expand All @@ -1280,29 +1266,14 @@ impl State {
update = true;
}
if let Some(chat) = self.chats.all.get_mut(&conv_id) {
chat.update_pending_msg(msg, progress);
chat.update_pending_msg(message_id, progress);
}
update
}

pub fn decrement_outgoing_messagess(
&mut self,
conv_id: Uuid,
msg: Vec<String>,
uuid: Option<Uuid>,
) {
self.decrement_outgoing_messages(conv_id, msg, vec![], uuid);
}

pub fn decrement_outgoing_messages(
&mut self,
conv_id: Uuid,
msg: Vec<String>,
attachments: Vec<String>,
uuid: Option<Uuid>,
) {
pub fn decrement_outgoing_messages(&mut self, conv_id: Uuid, message_id: Uuid) {
if let Some(chat) = self.chats.all.get_mut(&conv_id) {
chat.remove_pending_msg(msg, attachments, uuid);
chat.remove_pending_msg(message_id);
}
}

Expand Down
64 changes: 5 additions & 59 deletions common/src/state/pending_message.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,40 @@
use std::{collections::HashMap, ffi::OsStr, path::PathBuf};
use std::collections::HashMap;

use uuid::Uuid;
use warp::{constellation::Progression, crypto::DID, raygun::Location};
use warp::{constellation::Progression, crypto::DID};

use crate::warp_runner::ui_adapter::Message;
// We can improve message equality detection if warp e.g. can send us their assigned uuid.
// Else it is just a guesswork
#[derive(Clone, Debug)]
pub struct PendingMessage {
attachments: Vec<String>,
pub attachments_progress: HashMap<String, FileProgression>,
pub message: Message,
}

impl PendingMessage {
// Use this for comparison cases
pub fn for_compare(text: Vec<String>, attachments: &[Location], id: Option<Uuid>) -> Self {
let mut inner = warp::raygun::Message::default();
if let Some(m_id) = id {
inner.set_id(m_id);
}
inner.set_lines(text);
let message = Message::new(inner, None, Uuid::new_v4().to_string());
PendingMessage {
attachments: attachments
.iter()
.filter_map(|p| {
let path = match p {
Location::Disk { path } => path.clone(),
Location::Constellation { path } => PathBuf::from(path),
};

path.file_name().and_then(OsStr::to_str).map(str::to_string)
})
.collect::<Vec<_>>(),
attachments_progress: HashMap::new(),
message,
}
}

pub fn new(chat_id: Uuid, did: DID, text: Vec<String>, attachments: &[Location]) -> Self {
pub fn new(chat_id: Uuid, did: DID, message_id: Uuid, text: Vec<String>) -> Self {
// Create a dummy message
let mut inner = warp::raygun::Message::default();
inner.set_id(Uuid::new_v4());
inner.set_id(message_id);
inner.set_sender(did);
inner.set_conversation_id(chat_id);
inner.set_lines(text);
let attachments = attachments
.iter()
.filter(|location| match location {
Location::Disk { path } => path.is_file(),
Location::Constellation { .. } => true,
})
.cloned()
.collect::<Vec<_>>();

let message = Message::new(inner, None, Uuid::new_v4().to_string());
PendingMessage {
attachments: attachments
.iter()
.map(|p| {
let pathbuf = match p {
Location::Disk { path } => path.clone(),
Location::Constellation { path } => PathBuf::from(path),
};
pathbuf
.file_name()
.map_or_else(String::new, |ostr| ostr.to_string_lossy().to_string())
})
.collect(),
attachments_progress: HashMap::new(),
message,
}
}

// UI side id. Messages arriving at warp have a different id!
// This is only for messages that have not been sent to warp yet
pub fn id(&self) -> Uuid {
self.message.inner.id()
}
}

impl PartialEq for PendingMessage {
fn eq(&self, other: &Self) -> bool {
self.message.inner.lines().eq(&other.message.inner.lines())
&& self
.attachments
.iter()
.all(|k| other.attachments.contains(k))
&& self.id().eq(&other.id())
self.id().eq(&other.id())
}
}

Expand Down
34 changes: 20 additions & 14 deletions common/src/warp_runner/manager/commands/raygun_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::{
},
};

pub type MultiChatResult = Vec<(Uuid, (Uuid, Option<AttachmentEventStream>))>;

#[allow(clippy::large_enum_variant)]
#[derive(Display)]
pub enum RayGunCmd {
Expand Down Expand Up @@ -113,14 +115,14 @@ pub enum RayGunCmd {
conv_id: Uuid,
msg: Vec<String>,
attachments: Vec<Location>,
rsp: oneshot::Sender<Result<Option<AttachmentEventStream>, warp::error::Error>>,
rsp: oneshot::Sender<Result<(Uuid, Option<AttachmentEventStream>), warp::error::Error>>,
},
#[display(fmt = "SendMessageForSeveralChats")]
SendMessageForSeveralChats {
convs_id: Vec<Uuid>,
msg: Vec<String>,
attachments: Vec<Location>,
rsp: oneshot::Sender<Result<Vec<(Uuid, AttachmentEventStream)>, warp::error::Error>>,
rsp: oneshot::Sender<Result<MultiChatResult, warp::error::Error>>,
},
#[display(fmt = "EditMessage")]
EditMessage {
Expand Down Expand Up @@ -149,7 +151,7 @@ pub enum RayGunCmd {
reply_to: Uuid,
msg: Vec<String>,
attachments: Vec<Location>,
rsp: oneshot::Sender<Result<Option<AttachmentEventStream>, warp::error::Error>>,
rsp: oneshot::Sender<Result<(Uuid, Option<AttachmentEventStream>), warp::error::Error>>,
},
// removes all direct conversations involving the recipient
#[display(fmt = "RemoveDirectConvs")]
Expand Down Expand Up @@ -295,14 +297,14 @@ pub async fn handle_raygun_cmd(
rsp,
} => {
let r = if attachments.is_empty() {
messaging.send(conv_id, msg).await.map(|_| None)
messaging.send(conv_id, msg).await.map(|id| (id, None))
} else {
//TODO: Pass stream off to attachment events
match messaging
.attach(conv_id, None, attachments.clone(), msg.clone())
.await
{
Ok((_, stream)) => Result::Ok(Some(stream)),
Ok((id, stream)) => Result::Ok((id, Some(stream))),
Err(e) => Err(e),
}
};
Expand All @@ -315,25 +317,26 @@ pub async fn handle_raygun_cmd(
attachments,
rsp,
} => {
let mut streams = vec![];
let mut results = vec![];
for chat_id in convs_id {
if attachments.is_empty() {
let _ = messaging.send(chat_id, msg.clone()).await;
match messaging.send(chat_id, msg.clone()).await {
Ok(id) => results.push((chat_id, (id, None))),
Err(e) => log::error!("Raygun: Send files to several chats: {}", e),
}
} else {
//TODO: Pass stream off to attachment events
match messaging
.attach(chat_id, None, attachments.clone(), msg.clone())
.await
{
Ok((_, stream)) => streams.push((chat_id, stream)),
Err(e) => {
log::error!("Raygun: Send files to several chats: {}", e);
}
Ok((id, stream)) => results.push((chat_id, (id, Some(stream)))),
Err(e) => log::error!("Raygun: Send files to several chats: {}", e),
}
};
}

let _ = rsp.send(Ok(streams));
let _ = rsp.send(Ok(results));
}
RayGunCmd::EditMessage {
conv_id,
Expand Down Expand Up @@ -372,13 +375,16 @@ pub async fn handle_raygun_cmd(
rsp,
} => {
let r = if attachments.is_empty() {
messaging.reply(conv_id, reply_to, msg).await.map(|_| None)
messaging
.reply(conv_id, reply_to, msg)
.await
.map(|id| (id, None))
} else {
match messaging
.attach(conv_id, Some(reply_to), attachments, msg)
.await
{
Ok((_, stream)) => Result::Ok(Some(stream)),
Ok((id, stream)) => Result::Ok((id, Some(stream))),
Err(e) => Err(e),
}
};
Expand Down
7 changes: 2 additions & 5 deletions common/src/warp_runner/ui_adapter/message_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use warp::{

use super::Message;
use crate::{
state::{
self,
pending_message::{FileProgression, PendingMessage},
},
state::{self, pending_message::FileProgression},
warp_runner::{
ui_adapter::{convert_raygun_message, did_to_identity},
Messaging,
Expand Down Expand Up @@ -75,7 +72,7 @@ pub enum MessageEvent {
AttachmentProgress {
progress: FileProgression,
conversation_id: Uuid,
msg: PendingMessage,
msg: Uuid,
},
}

Expand Down
1 change: 0 additions & 1 deletion ui/src/layouts/chats/data/chatbar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ use uuid::Uuid;
pub struct MsgChInput {
pub msg: Vec<String>,
pub conv_id: Uuid,
pub appended_msg_id: Option<Uuid>,
pub replying_to: Option<Uuid>,
}
Loading

0 comments on commit 9b687fd

Please sign in to comment.