Skip to content

Commit

Permalink
chore: refactor cmd system to be async non blocking of TUI main loop (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood authored Sep 25, 2024
1 parent 9f0f69d commit b6d9530
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 80 deletions.
17 changes: 10 additions & 7 deletions edc-connector-tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod msg;

use crossterm::event::{self, Event, KeyCode};
use edc_connector_client::{Auth, EdcConnectorClient};
use futures::FutureExt;
use keyring::Entry;
use ratatui::{
layout::{Constraint, Direction, Layout, Rect},
Expand All @@ -18,8 +19,8 @@ use crate::{
contract_definitions::ContractDefinitionsComponent,
contract_negotiations::ContractNegotiationsComponent, footer::Footer,
header::HeaderComponent, launch_bar::LaunchBar, policies::PolicyDefinitionsComponent,
transfer_processes::TransferProcessesComponent, Action, Component, ComponentEvent,
ComponentMsg, ComponentReturn, Notification, NotificationMsg,
transfer_processes::TransferProcessesComponent, Component, ComponentEvent, ComponentMsg,
ComponentReturn, Notification, NotificationMsg,
},
config::{AuthKind, Config, ConnectorConfig},
types::{
Expand Down Expand Up @@ -126,11 +127,13 @@ impl App {
let timeout = noty.timeout();
self.footer.show_notification(noty);

let action = Action::spawn(async move {
tokio::time::sleep(Duration::from_secs(timeout)).await;
Ok(Action::ClearNotification)
});
Ok(ComponentReturn::action(action))
Ok(ComponentReturn::cmd(
async move {
tokio::time::sleep(Duration::from_secs(timeout)).await;
Ok(vec![AppMsg::NontificationMsg(NotificationMsg::Clear).into()])
}
.boxed(),
))
}

pub fn clear_notification(&mut self) -> anyhow::Result<ComponentReturn<AppMsg>> {
Expand Down
4 changes: 0 additions & 4 deletions edc-connector-tui/src/app/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ impl ActionHandler for App {
crate::components::NotificationMsg::Show(noty),
)
.into()]),
(_, Action::ClearNotification) => Ok(vec![AppMsg::NontificationMsg(
crate::components::NotificationMsg::Clear,
)
.into()]),
_ => Ok(vec![]),
}
}
Expand Down
61 changes: 21 additions & 40 deletions edc-connector-tui/src/components.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, future::Future, sync::Arc};
use std::{fmt::Debug, sync::Arc};

use crossterm::event::Event;
use futures::{future::BoxFuture, FutureExt};
Expand Down Expand Up @@ -26,7 +26,7 @@ pub trait StatelessComponent {

#[async_trait::async_trait]
pub trait Component {
type Msg: Send;
type Msg: Send + 'static;
type Props: Send;

async fn init(&mut self, _props: Self::Props) -> anyhow::Result<ComponentReturn<Self::Msg>> {
Expand All @@ -49,14 +49,14 @@ pub trait Component {
Ok(vec![])
}

async fn forward_update<'a, F, C>(
other: &'a mut C,
async fn forward_update<F, C>(
other: &mut C,
msg: ComponentMsg<C::Msg>,
mapper: F,
) -> anyhow::Result<ComponentReturn<Self::Msg>>
where
F: Fn(C::Msg) -> Self::Msg + Send + Sync + 'a,
C: Component + Sync + Send + 'a,
F: Fn(C::Msg) -> Self::Msg + Send + Sync + 'static,
C: Component + Sync + Send + 'static,
{
Ok(other.update(msg).await?.map(mapper))
}
Expand All @@ -67,8 +67,8 @@ pub trait Component {
mapper: F,
) -> anyhow::Result<ComponentReturn<Self::Msg>>
where
F: Fn(C::Msg) -> Self::Msg + Send + Sync + 'a,
C: Component + Sync + Send + 'a,
F: Fn(C::Msg) -> Self::Msg + Send + Sync + 'static,
C: Component + Sync + Send + 'static,
{
Ok(other.init(props).await?.map(mapper))
}
Expand All @@ -94,13 +94,13 @@ pub trait Component {
pub struct ComponentMsg<T>(T);

#[derive(Default)]
pub struct ComponentReturn<'a, T> {
pub struct ComponentReturn<T> {
pub(crate) msgs: Vec<ComponentMsg<T>>,
pub(crate) cmds: Vec<BoxFuture<'a, anyhow::Result<Vec<ComponentMsg<T>>>>>,
pub(crate) cmds: Vec<BoxFuture<'static, anyhow::Result<Vec<ComponentMsg<T>>>>>,
pub(crate) actions: Vec<Action>,
}

impl<'a, T: Debug> Debug for ComponentReturn<'a, T> {
impl<T: Debug> Debug for ComponentReturn<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentReturn")
.field("msgs", &self.msgs)
Expand All @@ -109,34 +109,13 @@ impl<'a, T: Debug> Debug for ComponentReturn<'a, T> {
}
}

#[derive(Debug)]
pub enum Action {
Quit,
Esc,
NavTo(Nav),
ChangeSheet,
Notification(Notification),
ClearNotification,
Spawn(BoxFuture<'static, anyhow::Result<Action>>),
}

impl Action {
pub fn spawn(fut: impl Future<Output = anyhow::Result<Action>> + Send + 'static) -> Action {
Action::Spawn(fut.boxed())
}
}

impl Debug for Action {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Quit => write!(f, "Quit"),
Self::Esc => write!(f, "Esc"),
Self::NavTo(arg0) => f.debug_tuple("NavTo").field(arg0).finish(),
Self::ChangeSheet => write!(f, "ChangeSheet"),
Self::Notification(arg0) => f.debug_tuple("Notification").field(arg0).finish(),
Self::Spawn(_arg0) => f.debug_tuple("Spawn").finish(),
Self::ClearNotification => f.debug_tuple("ClearNotification").finish(),
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -202,34 +181,36 @@ impl<T> ComponentMsg<T> {
}
}

impl<'a, T: 'a> ComponentReturn<'a, T> {
pub fn cmd(cmd: BoxFuture<'a, anyhow::Result<Vec<ComponentMsg<T>>>>) -> ComponentReturn<'a, T> {
impl<T: 'static> ComponentReturn<T> {
pub fn cmd(
cmd: BoxFuture<'static, anyhow::Result<Vec<ComponentMsg<T>>>>,
) -> ComponentReturn<T> {
ComponentReturn {
msgs: vec![],
cmds: vec![cmd],
actions: vec![],
}
}

pub fn empty() -> ComponentReturn<'a, T> {
pub fn empty() -> ComponentReturn<T> {
ComponentReturn {
msgs: vec![],
cmds: vec![],
actions: vec![],
}
}

pub fn action(action: Action) -> ComponentReturn<'a, T> {
pub fn action(action: Action) -> ComponentReturn<T> {
ComponentReturn {
msgs: vec![],
cmds: vec![],
actions: vec![action],
}
}

pub fn map<M, F>(self, mapper: F) -> ComponentReturn<'a, M>
pub fn map<M, F>(self, mapper: F) -> ComponentReturn<M>
where
F: Fn(T) -> M + Sync + Send + 'a,
F: Fn(T) -> M + Sync + Send + 'static,
{
let msgs = self.msgs.into_iter().map(|msg| msg.map(&mapper)).collect();

Expand Down Expand Up @@ -265,7 +246,7 @@ impl<T> From<T> for ComponentMsg<T> {
}
}

impl<T> From<ComponentMsg<T>> for ComponentReturn<'_, T> {
impl<T> From<ComponentMsg<T>> for ComponentReturn<T> {
fn from(value: ComponentMsg<T>) -> Self {
ComponentReturn {
msgs: vec![value],
Expand Down
9 changes: 6 additions & 3 deletions edc-connector-tui/src/components/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ResourcesComponent<T: TableEntry> {
page_size: u32,
}

impl<T: DrawableResource + TableEntry + Send + Sync> ResourcesComponent<T> {
impl<T: DrawableResource + TableEntry + Send + Sync + 'static> ResourcesComponent<T> {
pub fn on_fetch<F, Fut>(mut self, on_fetch: F) -> Self
where
F: Fn(Connector, Query) -> Fut + Send + Sync + 'static,
Expand Down Expand Up @@ -77,9 +77,12 @@ impl<T: DrawableResource + TableEntry + Send + Sync> ResourcesComponent<T> {
if let (Some(connector), Some(on_fetch)) = (self.connector.as_ref(), self.on_fetch.as_ref())
{
let query = self.query.clone();

let connector = connector.clone();
let on_fetch = on_fetch.clone();
Ok(ComponentReturn::cmd(
async move {
match on_fetch(connector, query).await {
match on_fetch(&connector, query).await {
Ok(elements) => Ok(vec![ResourcesMsg::ResourcesFetched(elements).into()]),
Err(err) => Ok(vec![
ResourcesMsg::ResourcesFetchFailed(err.to_string()).into()
Expand Down Expand Up @@ -160,7 +163,7 @@ impl<T: DrawableResource + TableEntry + Clone> Default for ResourcesComponent<T>
}

#[async_trait::async_trait]
impl<T: DrawableResource + TableEntry + Send + Sync> Component for ResourcesComponent<T> {
impl<T: DrawableResource + TableEntry + Send + Sync + 'static> Component for ResourcesComponent<T> {
type Msg = ResourcesMsg<T>;
type Props = Connector;

Expand Down
2 changes: 1 addition & 1 deletion edc-connector-tui/src/components/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub trait TableEntry {
}

#[async_trait::async_trait]
impl<T: TableEntry + Send, M: Send> Component for UiTable<T, M> {
impl<T: TableEntry + Send, M: Send + 'static> Component for UiTable<T, M> {
type Msg = TableMsg<M>;
type Props = ();

Expand Down
33 changes: 8 additions & 25 deletions edc-connector-tui/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl<C: Component + ActionHandler<Msg = <C as Component>::Msg> + Send> Runner<C>
terminal.clear()?;

let mut should_quit = false;
let action_queue = Arc::new(Mutex::new(VecDeque::<Action>::new()));
let async_msgs = Arc::new(Mutex::new(
VecDeque::<ComponentMsg<<C as Component>::Msg>>::new(),
));
Expand Down Expand Up @@ -63,38 +62,22 @@ impl<C: Component + ActionHandler<Msg = <C as Component>::Msg> + Send> Runner<C>
}

for c in ret.cmds {
for m in c.await.unwrap() {
msgs.push_back(m);
}
let inner_async_msg = async_msgs.clone();
tokio::task::spawn(async move {
for m in c.await.unwrap() {
let mut msg_guard = inner_async_msg.lock().await;
msg_guard.push_back(m);
}
});
}

ret.actions
};

for a in actions {
if let Action::Spawn(handler) = a {
let inner_action_queue = action_queue.clone();
tokio::task::spawn(async move {
if let Ok(action) = handler.await {
inner_action_queue.lock().await.push_back(action)
}
});
} else {
should_quit = should_quit || matches!(a, Action::Quit);
for m in self.component.handle_action(a)? {
msgs.push_back(m)
}
}
}
}
let mut guard = action_queue.lock().await;
let mut msg_guard = async_msgs.lock().await;
while let Some(a) = guard.pop_front() {
if let Action::Spawn(_) = a {
} else {
should_quit = should_quit || matches!(a, Action::Quit);
for m in self.component.handle_action(a)? {
msg_guard.push_back(m)
msgs.push_back(m)
}
}
}
Expand Down

0 comments on commit b6d9530

Please sign in to comment.