Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace async_std with tokio #848

Merged
merged 8 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
416 changes: 153 additions & 263 deletions rust/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion rust/agama-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serde = { version = "1.0.152" }
serde_json = "1.0.91"
serde_yaml = "0.9.17"
indicatif= "0.17.3"
async-std = { version ="1.12.0", features = ["attributes"] }
thiserror = "1.0.39"
convert_case = "0.6.0"
console = "0.15.7"
Expand All @@ -23,6 +22,8 @@ log = "0.4"
tempdir = "0.3"
fs_extra = "1.3.0"
nix = { version = "0.27.1", features = ["user"] }
zbus = { version = "3", default-features = false, features = ["tokio"] }
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread"] }

[[bin]]
name = "agama"
Expand Down
26 changes: 14 additions & 12 deletions rust/agama-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::error::CliError;
use agama_lib::error::ServiceError;
use agama_lib::manager::ManagerClient;
use agama_lib::progress::ProgressMonitor;
use async_std::task::{self, block_on};
use commands::Commands;
use config::run as run_config_cmd;
use logs::run as run_logs_cmd;
Expand All @@ -26,6 +25,7 @@ use std::{
thread::sleep,
time::Duration,
};
use tokio;

#[derive(Parser)]
#[command(name = "agama", version, about, long_about = None)]
Expand All @@ -40,7 +40,9 @@ struct Cli {

async fn probe() -> anyhow::Result<()> {
let another_manager = build_manager().await?;
let probe = task::spawn(async move { another_manager.probe().await });
let probe = tokio::spawn(async move {
let _ = another_manager.probe().await;
});
show_progress().await?;

Ok(probe.await?)
Expand All @@ -63,7 +65,7 @@ async fn install(manager: &ManagerClient<'_>, max_attempts: u8) -> anyhow::Resul
return Err(CliError::ValidationError)?;
}

let progress = task::spawn(async { show_progress().await });
let progress = tokio::spawn(async { show_progress().await });
// Try to start the installation up to max_attempts times.
let mut attempts = 1;
loop {
Expand All @@ -89,7 +91,7 @@ async fn install(manager: &ManagerClient<'_>, max_attempts: u8) -> anyhow::Resul

async fn show_progress() -> Result<(), ServiceError> {
// wait 1 second to give other task chance to start, so progress can display something
task::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let conn = agama_lib::connection().await?;
let mut monitor = ProgressMonitor::new(conn).await.unwrap();
let presenter = InstallerProgress::new();
Expand Down Expand Up @@ -119,21 +121,21 @@ async fn run_command(cli: Cli) -> anyhow::Result<()> {
match cli.command {
Commands::Config(subcommand) => {
let manager = build_manager().await?;
block_on(wait_for_services(&manager))?;
block_on(run_config_cmd(subcommand, cli.format))
wait_for_services(&manager).await?;
run_config_cmd(subcommand, cli.format).await
}
Commands::Probe => {
let manager = build_manager().await?;
block_on(wait_for_services(&manager))?;
block_on(probe())
wait_for_services(&manager).await?;
probe().await
}
Commands::Profile(subcommand) => Ok(run_profile_cmd(subcommand)?),
Commands::Install => {
let manager = build_manager().await?;
block_on(install(&manager, 3))
install(&manager, 3).await
}
Commands::Questions(subcommand) => block_on(run_questions_cmd(subcommand)),
Commands::Logs(subcommand) => block_on(run_logs_cmd(subcommand)),
Commands::Questions(subcommand) => run_questions_cmd(subcommand).await,
Commands::Logs(subcommand) => run_logs_cmd(subcommand).await,
_ => unimplemented!(),
}
}
Expand All @@ -152,7 +154,7 @@ impl Termination for CliResult {
}
}

#[async_std::main]
#[tokio::main]
async fn main() -> CliResult {
let cli = Cli::parse();

Expand Down
8 changes: 4 additions & 4 deletions rust/agama-dbus-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ agama-lib = { path="../agama-lib" }
log = "0.4"
simplelog = "0.12.1"
systemd-journal-logger = "1.0"
zbus = "3.7.0"
zbus_macros = "3.7.0"
async-std = { version = "1.12.0", features = ["attributes"]}
zbus = { version = "3", default-features = false, features = ["tokio"] }
zbus_macros = "3"
uuid = { version = "1.3.4", features = ["v4"] }
thiserror = "1.0.40"
serde = { version = "1.0.152", features = ["derive"] }
serde_yaml = "0.9.24"
futures = "0.3.28"
cidr = { version = "0.2.2", features = ["serde"] }
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.14"
3 changes: 2 additions & 1 deletion rust/agama-dbus-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use agama_lib::connection_to;
use anyhow::Context;
use log::LevelFilter;
use std::future::pending;
use tokio;

const ADDRESS: &str = "unix:path=/run/agama/bus";
const SERVICE_NAME: &str = "org.opensuse.Agama1";

#[async_std::main]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// be smart with logging and log directly to journal if connected to it
if systemd_journal_logger::connected_to_journal() {
Expand Down
45 changes: 24 additions & 21 deletions rust/agama-dbus-server/src/network/dbus/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use crate::network::{
};

use agama_lib::network::types::SSID;
use async_std::{channel::Sender, sync::Arc};
use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use zbus::{
dbus_interface,
zvariant::{ObjectPath, OwnedObjectPath},
Expand Down Expand Up @@ -91,15 +92,15 @@ impl Device {
///
/// It offers an API to query the connections collection.
pub struct Connections {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
objects: Arc<Mutex<ObjectsRegistry>>,
}

impl Connections {
/// Creates a Connections interface object.
///
/// * `objects`: Objects paths registry.
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>, actions: Sender<Action>) -> Self {
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>, actions: UnboundedSender<Action>) -> Self {
Self {
objects,
actions: Arc::new(Mutex::new(actions)),
Expand Down Expand Up @@ -127,7 +128,6 @@ impl Connections {
let actions = self.actions.lock().await;
actions
.send(Action::AddConnection(id.clone(), ty.try_into()?))
.await
.unwrap();
Ok(())
}
Expand All @@ -150,7 +150,6 @@ impl Connections {
let actions = self.actions.lock().await;
actions
.send(Action::RemoveConnection(id.to_string()))
.await
.unwrap();
Ok(())
}
Expand All @@ -160,7 +159,7 @@ impl Connections {
/// It includes adding, updating and removing connections as needed.
pub async fn apply(&self) -> zbus::fdo::Result<()> {
let actions = self.actions.lock().await;
actions.send(Action::Apply).await.unwrap();
actions.send(Action::Apply).unwrap();
Ok(())
}

Expand All @@ -177,7 +176,7 @@ impl Connections {
///
/// It offers an API to query a connection.
pub struct Connection {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -186,7 +185,10 @@ impl Connection {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(
actions: UnboundedSender<Action>,
connection: Arc<Mutex<NetworkConnection>>,
) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -208,7 +210,6 @@ impl Connection {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
Expand Down Expand Up @@ -241,7 +242,7 @@ impl Connection {

/// D-Bus interface for Match settings
pub struct Match {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -250,7 +251,10 @@ impl Match {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(
actions: UnboundedSender<Action>,
connection: Arc<Mutex<NetworkConnection>>,
) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -272,7 +276,6 @@ impl Match {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
Expand Down Expand Up @@ -342,7 +345,7 @@ impl Match {

/// D-Bus interface for wireless settings
pub struct Wireless {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -351,7 +354,10 @@ impl Wireless {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(
actions: UnboundedSender<Action>,
connection: Arc<Mutex<NetworkConnection>>,
) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -361,7 +367,7 @@ impl Wireless {
/// Gets the wireless connection.
///
/// Beware that it crashes when it is not a wireless connection.
async fn get_wireless(&self) -> MappedMutexGuard<NetworkConnection, WirelessConnection> {
async fn get_wireless(&self) -> MappedMutexGuard<WirelessConnection> {
MutexGuard::map(self.connection.lock().await, |c| match c {
NetworkConnection::Wireless(config) => config,
_ => panic!("Not a wireless network. This is most probably a bug."),
Expand All @@ -373,14 +379,11 @@ impl Wireless {
/// * `connection`: Updated connection.
async fn update_connection<'a>(
&self,
connection: MappedMutexGuard<'a, NetworkConnection, WirelessConnection>,
connection: MappedMutexGuard<'a, WirelessConnection>,
) -> zbus::fdo::Result<()> {
let actions = self.actions.lock().await;
let connection = NetworkConnection::Wireless(connection.clone());
actions
.send(Action::UpdateConnection(connection))
.await
.unwrap();
actions.send(Action::UpdateConnection(connection)).unwrap();
Ok(())
}
}
Expand Down
16 changes: 9 additions & 7 deletions rust/agama-dbus-server/src/network/dbus/interfaces/ip_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use crate::network::{
action::Action,
model::{Connection as NetworkConnection, IpConfig, Ipv4Method, Ipv6Method},
};
use async_std::{channel::Sender, sync::Arc};
use cidr::IpInet;
use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
use std::net::IpAddr;
use std::{net::IpAddr, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use zbus::dbus_interface;

/// D-Bus interface for IPv4 and IPv6 settings
pub struct Ip {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -25,7 +25,10 @@ impl Ip {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(
actions: UnboundedSender<Action>,
connection: Arc<Mutex<NetworkConnection>>,
) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -47,15 +50,14 @@ impl Ip {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
}

impl Ip {
/// Returns the IpConfig struct.
async fn get_ip_config(&self) -> MappedMutexGuard<NetworkConnection, IpConfig> {
async fn get_ip_config(&self) -> MappedMutexGuard<IpConfig> {
MutexGuard::map(self.get_connection().await, |c| c.ip_config_mut())
}

Expand Down
3 changes: 2 additions & 1 deletion rust/agama-dbus-server/src/network/dbus/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! This module defines a D-Bus service which exposes Agama's network configuration.
use crate::network::{Adapter, NetworkSystem};
use std::error::Error;
use tokio;
use zbus::Connection;

/// Represents the Agama networking D-Bus service.
Expand All @@ -19,7 +20,7 @@ impl NetworkService {
let connection = connection.clone();
let mut network = NetworkSystem::new(connection.clone(), adapter);

async_std::task::spawn(async move {
tokio::spawn(async move {
network
.setup()
.await
Expand Down
10 changes: 5 additions & 5 deletions rust/agama-dbus-server/src/network/dbus/tree.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use agama_lib::error::ServiceError;
use futures::lock::Mutex;
use tokio::sync::Mutex;
use zbus::zvariant::{ObjectPath, OwnedObjectPath};

use crate::network::{action::Action, dbus::interfaces, model::*};
use async_std::{channel::Sender, sync::Arc};
use log;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;

const CONNECTIONS_PATH: &str = "/org/opensuse/Agama1/Network/connections";
const DEVICES_PATH: &str = "/org/opensuse/Agama1/Network/devices";

/// Handle the objects in the D-Bus tree for the network state
pub struct Tree {
connection: zbus::Connection,
actions: Sender<Action>,
actions: UnboundedSender<Action>,
objects: Arc<Mutex<ObjectsRegistry>>,
}

Expand All @@ -22,7 +22,7 @@ impl Tree {
///
/// * `connection`: D-Bus connection to use.
/// * `actions`: sending-half of a channel to send actions.
pub fn new(connection: zbus::Connection, actions: Sender<Action>) -> Self {
pub fn new(connection: zbus::Connection, actions: UnboundedSender<Action>) -> Self {
Self {
connection,
actions,
Expand Down
Loading
Loading