Skip to content

Commit

Permalink
Avoid a deadlock updating the network D-Bus tree (#966)
Browse files Browse the repository at this point in the history
## Problem

If the network service receives an `Apply` at the same time as any other
request, it may get blocked.

## Solution

Updating the tree must be done in a separate task. Otherwise, it could
fight with an incoming request for the ObjectServer mutex, blocking the
actions dispatching.

## Using tokio-console

I decided to use
[tokio-console](https://docs.rs/tokio-console/latest/tokio_console/) to
make it easier to find the problem. This tool collects and displays
information about the asynchronous tasks in your program. In the
screenshot below you can see that the D-Bus `Set` dispatchers are
blocked.

![Captura desde 2023-12-29
10-52-45](https://github.com/openSUSE/agama/assets/15836/648e5f1e-eb6b-431b-be40-45cdeaf9e980)

Once you find the problem, it is kind of "obvious", but it may take some
time until you figure out what is happening.

To use `tokio-console` you need to add some instrumentation to your
program and, at this point, it implies adding some unstable Tokio APIs.
So if you want to give it a try, you can try the code in the
[tokio-console
branch](https://github.com/openSUSE/agama/compare/tokio-console).
  • Loading branch information
imobachgs authored Dec 29, 2023
2 parents e4ae7ed + c40e251 commit d9b0131
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions rust/agama-dbus-server/src/network/system.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use super::error::NetworkStateError;
use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState};
use agama_lib::network::types::DeviceType;
use std::error::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use std::{error::Error, sync::Arc};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
};
use uuid::Uuid;
use zbus::zvariant::OwnedObjectPath;

Expand All @@ -13,7 +16,7 @@ pub struct NetworkSystem<T: Adapter> {
/// Side of the channel to send actions.
actions_tx: UnboundedSender<Action>,
actions_rx: UnboundedReceiver<Action>,
tree: Tree,
tree: Arc<Mutex<Tree>>,
/// Adapter to read/write the network state.
adapter: T,
}
Expand All @@ -26,7 +29,7 @@ impl<T: Adapter> NetworkSystem<T> {
state: NetworkState::default(),
actions_tx,
actions_rx,
tree,
tree: Arc::new(Mutex::new(tree)),
adapter,
}
}
Expand All @@ -48,10 +51,9 @@ impl<T: Adapter> NetworkSystem<T> {
/// Populates the D-Bus tree with the known devices and connections.
pub async fn setup(&mut self) -> Result<(), Box<dyn Error>> {
self.state = self.adapter.read().await?;
self.tree
.set_connections(&mut self.state.connections)
.await?;
self.tree.set_devices(&self.state.devices).await?;
let mut tree = self.tree.lock().await;
tree.set_connections(&mut self.state.connections).await?;
tree.set_devices(&self.state.devices).await?;
Ok(())
}

Expand All @@ -78,15 +80,22 @@ impl<T: Adapter> NetworkSystem<T> {
tx.send(conn.cloned()).unwrap();
}
Action::GetConnectionPath(id, tx) => {
let path = self.tree.connection_path(&id);
let tree = self.tree.lock().await;
let path = tree.connection_path(&id);
tx.send(path).unwrap();
}
Action::GetController(uuid, tx) => {
let result = self.get_controller_action(uuid);
tx.send(result).unwrap()
}
Action::GetDevicesPaths(tx) => tx.send(self.tree.devices_paths()).unwrap(),
Action::GetConnectionsPaths(tx) => tx.send(self.tree.connections_paths()).unwrap(),
Action::GetDevicesPaths(tx) => {
let tree = self.tree.lock().await;
tx.send(tree.devices_paths()).unwrap();
}
Action::GetConnectionsPaths(tx) => {
let tree = self.tree.lock().await;
tx.send(tree.connections_paths()).unwrap();
}
Action::SetPorts(uuid, ports, rx) => {
let result = self.set_ports_action(uuid, *ports);
rx.send(result).unwrap();
Expand All @@ -95,16 +104,25 @@ impl<T: Adapter> NetworkSystem<T> {
self.state.update_connection(*conn)?;
}
Action::RemoveConnection(id) => {
self.tree.remove_connection(&id).await?;
let mut tree = self.tree.lock().await;
tree.remove_connection(&id).await?;
self.state.remove_connection(&id)?;
}
Action::Apply => {
self.write().await?;
// TODO: re-creating the tree is kind of brute-force and it sends signals about
// adding/removing interfaces. We should add/update/delete objects as needed.
self.tree
.set_connections(&mut self.state.connections)
.await?;
// NOTE updating the tree at the same time than dispatching actions can cause a
// deadlock. We might consider using message passing too but at this point
// is enough to use a separate task.
let mut connections = self.state.connections.clone();
let tree = Arc::clone(&self.tree);
tokio::spawn(async move {
let mut tree = tree.lock().await;
if let Err(e) = tree.set_connections(&mut connections).await {
log::error!("Could not update the D-Bus tree: {}", e);
}
});
}
}

Expand All @@ -118,8 +136,8 @@ impl<T: Adapter> NetworkSystem<T> {
) -> Result<OwnedObjectPath, NetworkStateError> {
let mut conn = Connection::new(name, ty);
// TODO: handle tree handling problems
let path = self
.tree
let mut tree = self.tree.lock().await;
let path = tree
.add_connection(&mut conn)
.await
.expect("Could not update the D-Bus tree");
Expand Down

0 comments on commit d9b0131

Please sign in to comment.