Skip to content

Commit

Permalink
GH-678: Review 1 Changes (#253)
Browse files Browse the repository at this point in the history
* feat: migrate MessageScheduler to the sub_lib/utils.rs

* feat: rename to scheduled_msg

* GH-678: rename duration to delay

* GH-678: rename the name of the sub

* GH-678: add a todo for the procedural macro card

* GH-678: rename to scheduled_node_query_response_sub

* GH-678: migrate the MessageScheduler handler for Recorder after the recorder_message_handler implementaions
  • Loading branch information
utkarshg6 authored Mar 31, 2023
1 parent f0ec8f1 commit 0ba8dce
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 33 deletions.
8 changes: 0 additions & 8 deletions masq_lib/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ use crate::messages::UiMessageError::{DeserializationError, PayloadError, Unexpe
use crate::shared_schema::ConfiguratorError;
use crate::ui_gateway::MessageBody;
use crate::ui_gateway::MessagePath::{Conversation, FireAndForget};
use actix::Message;
use itertools::Itertools;
use serde::de::DeserializeOwned;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::str::FromStr;
use std::time::Duration;

pub const NODE_UI_PROTOCOL: &str = "MASQNode-UIv2";

Expand Down Expand Up @@ -845,12 +843,6 @@ pub struct UiWalletAddressesResponse {
}
conversation_message!(UiWalletAddressesResponse, "walletAddresses");

#[derive(Message, Clone, PartialEq, Eq)]
pub struct MessageScheduler<M: Message> {
pub schedule_msg: M,
pub duration: Duration,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 5 additions & 2 deletions node/src/node_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::sub_lib::framer::FramedChunk;
use crate::sub_lib::framer::Framer;
use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::utils::MessageScheduler;
use crate::test_utils::recorder::Recorder;
use actix::Actor;
use actix::Addr;
use masq_lib::messages::MessageScheduler;
use masq_lib::test_utils::logging::TestLog;
use masq_lib::ui_gateway::NodeFromUiMessage;
use std::cell::RefCell;
Expand Down Expand Up @@ -308,7 +308,10 @@ pub fn make_stream_handler_pool_subs_from_recorder(addr: &Addr<Recorder>) -> Str
bind: recipient!(addr, PoolBindMessage),
node_query_response: recipient!(addr, DispatcherNodeQueryResponse),
node_from_ui_sub: recipient!(addr, NodeFromUiMessage),
schedule_message_sub: recipient!(addr, MessageScheduler<DispatcherNodeQueryResponse>),
scheduled_node_query_response_sub: recipient!(
addr,
MessageScheduler<DispatcherNodeQueryResponse>
),
}
}

Expand Down
22 changes: 11 additions & 11 deletions node/src/stream_handler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use crate::sub_lib::tokio_wrappers::WriteHalfWrapper;
use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY};
use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY};
use actix::Addr;
use actix::Context;
use actix::Handler;
use actix::Recipient;
use actix::{Actor, AsyncContext};
use masq_lib::logger::Logger;
use masq_lib::messages::MessageScheduler;
use masq_lib::ui_gateway::NodeFromUiMessage;
use masq_lib::utils::localhost;
use std::collections::HashMap;
Expand All @@ -62,7 +61,7 @@ pub struct StreamHandlerPoolSubs {
pub bind: Recipient<PoolBindMessage>,
pub node_query_response: Recipient<DispatcherNodeQueryResponse>,
pub node_from_ui_sub: Recipient<NodeFromUiMessage>,
pub schedule_message_sub: Recipient<MessageScheduler<DispatcherNodeQueryResponse>>,
pub scheduled_node_query_response_sub: Recipient<MessageScheduler<DispatcherNodeQueryResponse>>,
}

impl Clone for StreamHandlerPoolSubs {
Expand All @@ -74,7 +73,7 @@ impl Clone for StreamHandlerPoolSubs {
bind: self.bind.clone(),
node_query_response: self.node_query_response.clone(),
node_from_ui_sub: self.node_from_ui_sub.clone(),
schedule_message_sub: self.schedule_message_sub.clone(),
scheduled_node_query_response_sub: self.scheduled_node_query_response_sub.clone(),
}
}
}
Expand Down Expand Up @@ -157,14 +156,15 @@ impl Handler<DispatcherNodeQueryResponse> for StreamHandlerPool {
}
}

// TODO: GH-686 - This handler can be implemented using a Procedural Macro
impl<M: actix::Message + 'static> Handler<MessageScheduler<M>> for StreamHandlerPool
where
StreamHandlerPool: Handler<M>,
{
type Result = ();

fn handle(&mut self, msg: MessageScheduler<M>, ctx: &mut Self::Context) -> Self::Result {
ctx.notify_later(msg.schedule_msg, msg.duration);
ctx.notify_later(msg.scheduled_msg, msg.delay);
}
}

Expand Down Expand Up @@ -218,7 +218,7 @@ impl StreamHandlerPool {
bind: recipient!(pool_addr, PoolBindMessage),
node_query_response: recipient!(pool_addr, DispatcherNodeQueryResponse),
node_from_ui_sub: recipient!(pool_addr, NodeFromUiMessage),
schedule_message_sub: recipient!(
scheduled_node_query_response_sub: recipient!(
pool_addr,
MessageScheduler<DispatcherNodeQueryResponse>
),
Expand Down Expand Up @@ -550,17 +550,17 @@ impl StreamHandlerPool {
peer_addr,
msg.context.data.len()
);
let schedule_message_sub = self
let scheduled_node_query_response_sub = self
.self_subs_opt
.as_ref()
.expect("StreamHandlerPool is unbound")
.schedule_message_sub
.scheduled_node_query_response_sub
.clone();

schedule_message_sub
scheduled_node_query_response_sub
.try_send(MessageScheduler {
schedule_msg: msg,
duration: Duration::from_millis(100),
scheduled_msg: msg,
delay: Duration::from_millis(100),
})
.expect("StreamHandlerPool is dead");
}
Expand Down
6 changes: 6 additions & 0 deletions node/src/sub_lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ where
implement_as_any!();
}

#[derive(Message, Clone, PartialEq, Eq)]
pub struct MessageScheduler<M: Message> {
pub scheduled_msg: M,
pub delay: Duration,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
24 changes: 12 additions & 12 deletions node/src/test_utils/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::sub_lib::set_consuming_wallet_message::SetConsumingWalletMessage;
use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::ui_gateway::UiGatewaySubs;
use crate::sub_lib::utils::MessageScheduler;
use crate::test_utils::recorder_stop_conditions::StopConditions;
use crate::test_utils::to_millis;
use crate::test_utils::unshared_test_utils::system_killer_actor::SystemKillerActor;
Expand All @@ -52,7 +53,6 @@ use actix::Handler;
use actix::MessageResult;
use actix::System;
use actix::{Actor, Message};
use masq_lib::messages::MessageScheduler;
use masq_lib::ui_gateway::{NodeFromUiMessage, NodeToUiMessage};
use std::any::Any;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -93,17 +93,6 @@ macro_rules! recorder_message_handler {
};
}

impl<M> Handler<MessageScheduler<M>> for Recorder
where
M: Message + PartialEq + Send + 'static,
{
type Result = ();

fn handle(&mut self, msg: MessageScheduler<M>, _ctx: &mut Self::Context) {
self.handle_msg(msg)
}
}

recorder_message_handler!(AddReturnRouteMessage);
recorder_message_handler!(AddRouteMessage);
recorder_message_handler!(AddStreamMsg);
Expand Down Expand Up @@ -151,6 +140,17 @@ recorder_message_handler!(ScanForPayables);
recorder_message_handler!(ConnectionProgressMessage);
recorder_message_handler!(ScanForPendingPayables);

impl<M> Handler<MessageScheduler<M>> for Recorder
where
M: Message + PartialEq + Send + 'static,
{
type Result = ();

fn handle(&mut self, msg: MessageScheduler<M>, _ctx: &mut Self::Context) {
self.handle_msg(msg)
}
}

impl Handler<NodeQueryMessage> for Recorder {
type Result = MessageResult<NodeQueryMessage>;

Expand Down

0 comments on commit 0ba8dce

Please sign in to comment.