From 0ba8dce92aa35a0ac090291c40b80b003aa74c24 Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta <32920299+utkarshg6@users.noreply.github.com> Date: Fri, 31 Mar 2023 15:56:08 +0530 Subject: [PATCH] GH-678: Review 1 Changes (#253) * feat: migrate MessageScheduler to the sub_lib/utils.rs * feat: rename to scheduled_msg * GH-678: rename duration to delay * GH-678: rename the name of the sub * GH-678: add a todo for the procedural macro card * GH-678: rename to scheduled_node_query_response_sub * GH-678: migrate the MessageScheduler handler for Recorder after the recorder_message_handler implementaions --- masq_lib/src/messages.rs | 8 -------- node/src/node_test_utils.rs | 7 +++++-- node/src/stream_handler_pool.rs | 22 +++++++++++----------- node/src/sub_lib/utils.rs | 6 ++++++ node/src/test_utils/recorder.rs | 24 ++++++++++++------------ 5 files changed, 34 insertions(+), 33 deletions(-) diff --git a/masq_lib/src/messages.rs b/masq_lib/src/messages.rs index c25c6eabf..412076e0d 100644 --- a/masq_lib/src/messages.rs +++ b/masq_lib/src/messages.rs @@ -4,7 +4,6 @@ use crate::messages::UiMessageError::{DeserializationError, PayloadError, Unexpe use crate::shared_schema::ConfiguratorError; use crate::ui_gateway::MessageBody; use crate::ui_gateway::MessagePath::{Conversation, FireAndForget}; -use actix::Message; use itertools::Itertools; use serde::de::DeserializeOwned; use serde_derive::{Deserialize, Serialize}; @@ -12,7 +11,6 @@ use std::collections::HashMap; use std::fmt; use std::fmt::Debug; use std::str::FromStr; -use std::time::Duration; pub const NODE_UI_PROTOCOL: &str = "MASQNode-UIv2"; @@ -845,12 +843,6 @@ pub struct UiWalletAddressesResponse { } conversation_message!(UiWalletAddressesResponse, "walletAddresses"); -#[derive(Message, Clone, PartialEq, Eq)] -pub struct MessageScheduler { - pub schedule_msg: M, - pub duration: Duration, -} - #[cfg(test)] mod tests { use super::*; diff --git a/node/src/node_test_utils.rs b/node/src/node_test_utils.rs index 99adbcd8a..b82c663a3 100644 --- a/node/src/node_test_utils.rs +++ b/node/src/node_test_utils.rs @@ -15,10 +15,10 @@ use crate::sub_lib::framer::FramedChunk; use crate::sub_lib::framer::Framer; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; +use crate::sub_lib::utils::MessageScheduler; use crate::test_utils::recorder::Recorder; use actix::Actor; use actix::Addr; -use masq_lib::messages::MessageScheduler; use masq_lib::test_utils::logging::TestLog; use masq_lib::ui_gateway::NodeFromUiMessage; use std::cell::RefCell; @@ -308,7 +308,10 @@ pub fn make_stream_handler_pool_subs_from_recorder(addr: &Addr) -> Str bind: recipient!(addr, PoolBindMessage), node_query_response: recipient!(addr, DispatcherNodeQueryResponse), node_from_ui_sub: recipient!(addr, NodeFromUiMessage), - schedule_message_sub: recipient!(addr, MessageScheduler), + scheduled_node_query_response_sub: recipient!( + addr, + MessageScheduler + ), } } diff --git a/node/src/stream_handler_pool.rs b/node/src/stream_handler_pool.rs index 91c1bcbb1..a2bac3507 100644 --- a/node/src/stream_handler_pool.rs +++ b/node/src/stream_handler_pool.rs @@ -30,14 +30,13 @@ use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; use crate::sub_lib::tokio_wrappers::WriteHalfWrapper; -use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY}; +use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY}; use actix::Addr; use actix::Context; use actix::Handler; use actix::Recipient; use actix::{Actor, AsyncContext}; use masq_lib::logger::Logger; -use masq_lib::messages::MessageScheduler; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::localhost; use std::collections::HashMap; @@ -62,7 +61,7 @@ pub struct StreamHandlerPoolSubs { pub bind: Recipient, pub node_query_response: Recipient, pub node_from_ui_sub: Recipient, - pub schedule_message_sub: Recipient>, + pub scheduled_node_query_response_sub: Recipient>, } impl Clone for StreamHandlerPoolSubs { @@ -74,7 +73,7 @@ impl Clone for StreamHandlerPoolSubs { bind: self.bind.clone(), node_query_response: self.node_query_response.clone(), node_from_ui_sub: self.node_from_ui_sub.clone(), - schedule_message_sub: self.schedule_message_sub.clone(), + scheduled_node_query_response_sub: self.scheduled_node_query_response_sub.clone(), } } } @@ -157,6 +156,7 @@ impl Handler for StreamHandlerPool { } } +// TODO: GH-686 - This handler can be implemented using a Procedural Macro impl Handler> for StreamHandlerPool where StreamHandlerPool: Handler, @@ -164,7 +164,7 @@ where type Result = (); fn handle(&mut self, msg: MessageScheduler, ctx: &mut Self::Context) -> Self::Result { - ctx.notify_later(msg.schedule_msg, msg.duration); + ctx.notify_later(msg.scheduled_msg, msg.delay); } } @@ -218,7 +218,7 @@ impl StreamHandlerPool { bind: recipient!(pool_addr, PoolBindMessage), node_query_response: recipient!(pool_addr, DispatcherNodeQueryResponse), node_from_ui_sub: recipient!(pool_addr, NodeFromUiMessage), - schedule_message_sub: recipient!( + scheduled_node_query_response_sub: recipient!( pool_addr, MessageScheduler ), @@ -550,17 +550,17 @@ impl StreamHandlerPool { peer_addr, msg.context.data.len() ); - let schedule_message_sub = self + let scheduled_node_query_response_sub = self .self_subs_opt .as_ref() .expect("StreamHandlerPool is unbound") - .schedule_message_sub + .scheduled_node_query_response_sub .clone(); - schedule_message_sub + scheduled_node_query_response_sub .try_send(MessageScheduler { - schedule_msg: msg, - duration: Duration::from_millis(100), + scheduled_msg: msg, + delay: Duration::from_millis(100), }) .expect("StreamHandlerPool is dead"); } diff --git a/node/src/sub_lib/utils.rs b/node/src/sub_lib/utils.rs index 74758bd0b..9b9480821 100644 --- a/node/src/sub_lib/utils.rs +++ b/node/src/sub_lib/utils.rs @@ -237,6 +237,12 @@ where implement_as_any!(); } +#[derive(Message, Clone, PartialEq, Eq)] +pub struct MessageScheduler { + pub scheduled_msg: M, + pub delay: Duration, +} + #[cfg(test)] mod tests { use super::*; diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index dba8f1f61..f68fc3b9e 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -43,6 +43,7 @@ use crate::sub_lib::set_consuming_wallet_message::SetConsumingWalletMessage; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::ui_gateway::UiGatewaySubs; +use crate::sub_lib::utils::MessageScheduler; use crate::test_utils::recorder_stop_conditions::StopConditions; use crate::test_utils::to_millis; use crate::test_utils::unshared_test_utils::system_killer_actor::SystemKillerActor; @@ -52,7 +53,6 @@ use actix::Handler; use actix::MessageResult; use actix::System; use actix::{Actor, Message}; -use masq_lib::messages::MessageScheduler; use masq_lib::ui_gateway::{NodeFromUiMessage, NodeToUiMessage}; use std::any::Any; use std::sync::{Arc, Mutex}; @@ -93,17 +93,6 @@ macro_rules! recorder_message_handler { }; } -impl Handler> for Recorder -where - M: Message + PartialEq + Send + 'static, -{ - type Result = (); - - fn handle(&mut self, msg: MessageScheduler, _ctx: &mut Self::Context) { - self.handle_msg(msg) - } -} - recorder_message_handler!(AddReturnRouteMessage); recorder_message_handler!(AddRouteMessage); recorder_message_handler!(AddStreamMsg); @@ -151,6 +140,17 @@ recorder_message_handler!(ScanForPayables); recorder_message_handler!(ConnectionProgressMessage); recorder_message_handler!(ScanForPendingPayables); +impl Handler> for Recorder +where + M: Message + PartialEq + Send + 'static, +{ + type Result = (); + + fn handle(&mut self, msg: MessageScheduler, _ctx: &mut Self::Context) { + self.handle_msg(msg) + } +} + impl Handler for Recorder { type Result = MessageResult;