Skip to content

Commit

Permalink
rust: emit stream events in a single place
Browse files Browse the repository at this point in the history
  • Loading branch information
imobachgs committed Mar 2, 2024
1 parent 0cbb436 commit 35ffb0e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
2 changes: 1 addition & 1 deletion rust/agama-server/src/software.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod web;
pub use web::{software_monitor, software_service};
pub use web::{software_service, software_stream};
46 changes: 28 additions & 18 deletions rust/agama-server/src/software/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use axum::{
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tokio_stream::StreamExt;
use tokio_stream::{Stream, StreamExt};

#[derive(Clone)]
struct SoftwareState<'a> {
Expand Down Expand Up @@ -51,29 +51,39 @@ impl IntoResponse for SoftwareError {
}
}

pub async fn software_monitor(connection: zbus::Connection, events: EventsSender) {
tokio::spawn(monitor_product_changed(connection.clone(), events.clone()));
tokio::spawn(monitor_patterns_changed(connection.clone(), events.clone()));
pub async fn software_stream(connection: zbus::Connection) -> impl Stream<Item = Event> {
StreamExt::merge(
product_changed_stream(connection.clone()).await,
patterns_changed_stream(connection.clone()).await,
)
}

async fn monitor_product_changed(connection: zbus::Connection, events: EventsSender) {
async fn product_changed_stream(connection: zbus::Connection) -> impl Stream<Item = Event> {
let proxy = SoftwareProductProxy::new(&connection).await.unwrap();
let mut stream = proxy.receive_selected_product_changed().await;
while let Some(change) = stream.next().await {
if let Ok(id) = change.get().await {
_ = events.send(Event::ProductChanged { id });
}
}
proxy
.receive_selected_product_changed()
.await
.then(|change| async move {
if let Ok(id) = change.get().await {
return Some(Event::ProductChanged { id });
}
None
})
.filter_map(|e| e)
}

async fn monitor_patterns_changed(connection: zbus::Connection, events: EventsSender) {
async fn patterns_changed_stream(connection: zbus::Connection) -> impl Stream<Item = Event> {
let proxy = Software1Proxy::new(&connection).await.unwrap();
let mut stream = proxy.receive_selected_patterns_changed().await;
while let Some(change) = stream.next().await {
if let Ok(patterns) = change.get().await {
_ = events.send(Event::PatternsChanged);
}
}
proxy
.receive_selected_patterns_changed()
.await
.then(|change| async move {
if let Ok(_pattens) = change.get().await {
return Some(Event::PatternsChanged);
}
None
})
.filter_map(|e| e)
}

/// Sets up and returns the axum service for the software module.
Expand Down
19 changes: 17 additions & 2 deletions rust/agama-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use self::progress::EventsProgressPresenter;
use crate::l10n::web::l10n_service;
use crate::software::web::{software_monitor, software_service};
use crate::software::web::{software_service, software_stream};
use axum::Router;

mod auth;
Expand All @@ -25,6 +25,7 @@ pub use config::ServiceConfig;
pub use docs::ApiDoc;
pub use event::{Event, EventsReceiver, EventsSender};
pub use service::MainServiceBuilder;
use tokio_stream::StreamExt;

/// Returns a service that implements the web-based Agama API.
///
Expand Down Expand Up @@ -52,6 +53,20 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
eprintln!("Could not monitor the D-Bus server: {}", error);
}
});
software_monitor(connection, events.clone()).await;
tokio::spawn(run_events_monitor(connection, events.clone()));

Ok(())
}

/// Emits the events from the system streams through the events channel.
///
/// * `connection`: D-Bus connection.
/// * `events`: channel to send the events to.
pub async fn run_events_monitor(connection: zbus::Connection, events: EventsSender) {
let stream = software_stream(connection).await;
tokio::pin!(stream);
let e = events.clone();
while let Some(event) = stream.next().await {
_ = e.send(event);
}
}

0 comments on commit 35ffb0e

Please sign in to comment.