From 550466bf957c284a939af662e08d03d312e495da Mon Sep 17 00:00:00 2001 From: Abhinandan Purkait Date: Sat, 30 Nov 2024 17:07:36 +0000 Subject: [PATCH] test: test Signed-off-by: Abhinandan Purkait --- .../csi-driver/src/bin/node/block_vol.rs | 6 +- .../csi-driver/src/bin/node/filesystem_ops.rs | 16 +- .../csi-driver/src/bin/node/filesystem_vol.rs | 26 +- control-plane/csi-driver/src/bin/node/main.rs | 1 + .../csi-driver/src/bin/node/main_.rs | 1 + .../csi-driver/src/bin/node/mount.rs | 278 ++++++++++-------- control-plane/csi-driver/src/bin/node/node.rs | 6 +- .../src/bin/node/nodeplugin_grpc.rs | 1 + .../csi-driver/src/bin/node/runtime.rs | 49 +++ 9 files changed, 242 insertions(+), 142 deletions(-) create mode 100644 control-plane/csi-driver/src/bin/node/runtime.rs diff --git a/control-plane/csi-driver/src/bin/node/block_vol.rs b/control-plane/csi-driver/src/bin/node/block_vol.rs index ce94e3b95..ca8d4491a 100644 --- a/control-plane/csi-driver/src/bin/node/block_vol.rs +++ b/control-plane/csi-driver/src/bin/node/block_vol.rs @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu } if let Err(error) = - mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly) + mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await { return Err(failure!( Code::Internal, @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu } } -pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { +pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { let target_path = &msg.target_path; let volume_id = &msg.volume_id; // block volumes are mounted on block special file, which is not // a regular file. if mount::find_mount(None, Some(target_path)).is_some() { - match mount::blockdevice_unmount(target_path) { + match mount::blockdevice_unmount(target_path).await { Ok(_) => {} Err(err) => { return Err(Status::new( diff --git a/control-plane/csi-driver/src/bin/node/filesystem_ops.rs b/control-plane/csi-driver/src/bin/node/filesystem_ops.rs index 17faf23d2..7f6c54376 100644 --- a/control-plane/csi-driver/src/bin/node/filesystem_ops.rs +++ b/control-plane/csi-driver/src/bin/node/filesystem_ops.rs @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync { /// Get the default mount options along with the user passed options for specific filesystems. fn mount_flags(&self, mount_flags: Vec) -> Vec; /// Unmount the filesystem if the filesystem uuid and the provided uuid differ. - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, _device_path: &str, _fs_staging_path: &str, @@ -270,13 +270,13 @@ impl FileSystemOps for XFs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, ) -> Result<(), Error> { - mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await } /// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed. @@ -288,12 +288,12 @@ impl FileSystemOps for XFs { options: &[String], volume_uuid: &Uuid, ) -> Result<(), Error> { - mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| { + mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| { format!( "(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}", ) })?; - mount::filesystem_unmount(staging_path).map_err(|error| { + mount::filesystem_unmount(staging_path).await.map_err(|error| { format!( "(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}", ) @@ -358,13 +358,13 @@ impl FileSystemOps for BtrFs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, ) -> Result<(), Error> { - mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await } /// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential diff --git a/control-plane/csi-driver/src/bin/node/filesystem_vol.rs b/control-plane/csi-driver/src/bin/node/filesystem_vol.rs index dfe4d7649..c77e1845e 100644 --- a/control-plane/csi-driver/src/bin/node/filesystem_vol.rs +++ b/control-plane/csi-driver/src/bin/node/filesystem_vol.rs @@ -91,7 +91,7 @@ pub(crate) async fn stage_fs_volume( // If clone's fs id change was requested and we were not able to change it in first attempt // unmount and continue the stage again. let continue_stage = if fs_id.is_some() { - continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid) + continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid).await .map_err(|error| { failure!( Code::FailedPrecondition, @@ -107,7 +107,7 @@ pub(crate) async fn stage_fs_volume( if !continue_stage { // todo: validate other flags? if mnt.mount_flags.readonly() != existing.options.readonly() { - mount::remount(fs_staging_path, mnt.mount_flags.readonly())?; + mount::remount(fs_staging_path, mnt.mount_flags.readonly()).await?; } return Ok(()); @@ -161,7 +161,8 @@ pub(crate) async fn stage_fs_volume( debug!("Mounting device {} onto {}", device_path, fs_staging_path); - if let Err(error) = mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags) + if let Err(error) = + mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags).await { return Err(failure!( Code::Internal, @@ -209,7 +210,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result< )); } - if let Err(error) = mount::filesystem_unmount(fs_staging_path) { + if let Err(error) = mount::filesystem_unmount(fs_staging_path).await { return Err(failure!( Code::Internal, "Failed to unstage volume {}: failed to unmount device {:?} from {}: {}", @@ -227,7 +228,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result< } /// Publish a filesystem volume -pub(crate) fn publish_fs_volume( +pub(crate) async fn publish_fs_volume( msg: &NodePublishVolumeRequest, mnt: &MountVolume, filesystems: &[FileSystem], @@ -328,7 +329,7 @@ pub(crate) fn publish_fs_volume( debug!("Mounting {} to {}", fs_staging_path, target_path); - if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false) { + if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false).await { return Err(failure!( Code::Internal, "Failed to publish volume {}: failed to mount {} to {}: {}", @@ -345,7 +346,7 @@ pub(crate) fn publish_fs_volume( debug!("Remounting {} as readonly", target_path); - if let Err(error) = mount::bind_remount(target_path, &options) { + if let Err(error) = mount::bind_remount(target_path, &options).await { let message = format!( "Failed to publish volume {volume_id}: failed to mount {fs_staging_path} to {target_path} as readonly: {error}" ); @@ -354,7 +355,7 @@ pub(crate) fn publish_fs_volume( debug!("Unmounting {}", target_path); - if let Err(error) = mount::bind_unmount(target_path) { + if let Err(error) = mount::bind_unmount(target_path).await { error!("Failed to unmount {}: {}", target_path, error); } @@ -367,7 +368,7 @@ pub(crate) fn publish_fs_volume( Ok(()) } -pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { +pub(crate) async fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { // filesystem mount let target_path = &msg.target_path; let volume_id = &msg.volume_id; @@ -398,7 +399,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<() debug!("Unmounting {}", target_path); - if let Err(error) = mount::bind_unmount(target_path) { + if let Err(error) = mount::bind_unmount(target_path).await { return Err(failure!( Code::Internal, "Failed to unpublish volume {}: failed to unmount {}: {}", @@ -427,7 +428,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<() /// Check if we can continue the staging incase the change fs id failed mid way and we want to retry /// the flow. -fn continue_after_unmount_on_fs_id_diff( +async fn continue_after_unmount_on_fs_id_diff( fstype: &FileSystem, device_path: &str, fs_staging_path: &str, @@ -435,6 +436,7 @@ fn continue_after_unmount_on_fs_id_diff( ) -> Result { fstype .fs_ops()? - .unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)?; + .unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + .await?; Ok(fstype == &Fs::Xfs.into()) } diff --git a/control-plane/csi-driver/src/bin/node/main.rs b/control-plane/csi-driver/src/bin/node/main.rs index 8dca319e6..3b41e7891 100644 --- a/control-plane/csi-driver/src/bin/node/main.rs +++ b/control-plane/csi-driver/src/bin/node/main.rs @@ -39,6 +39,7 @@ mod nodeplugin_nvme; #[cfg(target_os = "linux")] mod nodeplugin_svc; mod registration; +mod runtime; /// Shutdown event which lets the plugin know it needs to stop processing new events and /// complete any existing ones before shutting down. #[cfg(target_os = "linux")] diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index 4f0b4d6ae..010f15261 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -451,6 +451,7 @@ impl CsiServer { let node = Node::new(node_name.into(), node_selector, probe_filesystems()); Ok(async move { Server::builder() + .timeout(std::time::Duration::from_secs(10)) .add_service(NodeServer::new(node)) .add_service(IdentityServer::new(Identity {})) .add_service(NvmeOperationsServer::new(NvmeOperationsSvc {})) diff --git a/control-plane/csi-driver/src/bin/node/mount.rs b/control-plane/csi-driver/src/bin/node/mount.rs index bc339ab44..8d841ed5e 100644 --- a/control-plane/csi-driver/src/bin/node/mount.rs +++ b/control-plane/csi-driver/src/bin/node/mount.rs @@ -3,6 +3,7 @@ use crate::filesystem_ops::FileSystem; use csi_driver::filesystem::FileSystem as Fs; use devinfo::mountinfo::{MountInfo, SafeMountIter}; +use crate::runtime; use std::{collections::HashSet, io::Error}; use sys_mount::{unmount, FilesystemType, Mount, MountFlags, UnmountFlags}; use tracing::{debug, info}; @@ -130,184 +131,229 @@ fn show(options: &[String]) -> String { list.join(",") } -/// Mount a device to a directory (mountpoint) -pub(crate) fn filesystem_mount( +pub(crate) async fn filesystem_mount( device: &str, target: &str, fstype: &FileSystem, options: &[String], ) -> Result { - let mut flags = MountFlags::empty(); + let device = device.to_string(); + let target = target.to_string(); + let fstype = fstype.clone(); + let options: Vec = options.to_vec(); - let (readonly, value) = parse(options); + runtime::spawn_blocking(move || { + let mut flags = MountFlags::empty(); - if readonly { - flags.insert(MountFlags::RDONLY); - } + let (readonly, value) = parse(&options); - // I'm not certain if it's fine to pass "" so keep existing behaviour - let mount = if value.is_empty() { - Mount::builder() - } else { - Mount::builder().data(&value) - } - .fstype(FilesystemType::Manual(fstype.as_ref())) - .flags(flags) - .mount(device, target)?; - - debug!( - "Filesystem ({}) on device {} mounted onto target {} (options: {})", - fstype, - device, - target, - show(options) - ); - - Ok(mount) + if readonly { + flags.insert(MountFlags::RDONLY); + } + + // I'm not certain if it's fine to pass "" so keep existing behaviour + let mount_builder = if value.is_empty() { + Mount::builder() + } else { + Mount::builder().data(&value) + } + .fstype(FilesystemType::Manual(fstype.as_ref())) + .flags(flags); + + let mount = mount_builder.mount(&device, &target)?; + + debug!( + "Filesystem ({}) on device {} mounted onto target {} (options: {})", + fstype, + device, + target, + show(&options) + ); + + Ok(mount) + }) + .await? } -/// Unmount a device from a directory (mountpoint) +/// Unmount a device from a directory (mountpoint) asynchronously /// Should not be used for removing bind mounts. -pub(crate) fn filesystem_unmount(target: &str) -> Result<(), Error> { - let flags = UnmountFlags::empty(); - // read more about the umount system call and it's flags at `man 2 umount` - unmount(target, flags)?; +pub(crate) async fn filesystem_unmount(target: &str) -> Result<(), Error> { + let target = target.to_string(); - debug!("Target {} unmounted", target); + runtime::spawn_blocking(move || { + let flags = UnmountFlags::empty(); - Ok(()) + unmount(&target, flags)?; + + debug!("Target {} unmounted", target); + + Ok(()) + }) + .await? } /// Bind mount a source path to a target path. /// Supports both directories and files. -pub(crate) fn bind_mount(source: &str, target: &str, file: bool) -> Result { - let mut flags = MountFlags::empty(); +pub(crate) async fn bind_mount(source: &str, target: &str, file: bool) -> Result { + let source = source.to_string(); + let target = target.to_string(); - flags.insert(MountFlags::BIND); + runtime::spawn_blocking(move || { + let mut flags = MountFlags::empty(); + flags.insert(MountFlags::BIND); - if file { - flags.insert(MountFlags::RDONLY); - } + if file { + flags.insert(MountFlags::RDONLY); + } - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount(source, target)?; + let mount = Mount::builder() + .fstype(FilesystemType::Manual("none")) + .flags(flags) + .mount(&source, &target)?; - debug!("Source {} bind mounted onto target {}", source, target); + debug!("Source {} bind mounted onto target {}", source, target); - Ok(mount) + Ok(mount) + }) + .await? } /// Bind remount a path to modify mount options. /// Assumes that target has already been bind mounted. -pub(crate) fn bind_remount(target: &str, options: &[String]) -> Result { - let mut flags = MountFlags::empty(); +pub(crate) async fn bind_remount(target: &str, options: &[String]) -> Result { + let target = target.to_string(); + let options = options.to_vec(); - let (readonly, value) = parse(options); + runtime::spawn_blocking(move || { + let mut flags = MountFlags::empty(); + let (readonly, value) = parse(&options); - flags.insert(MountFlags::BIND); + flags.insert(MountFlags::BIND); - if readonly { - flags.insert(MountFlags::RDONLY); - } + if readonly { + flags.insert(MountFlags::RDONLY); + } - flags.insert(MountFlags::REMOUNT); + flags.insert(MountFlags::REMOUNT); - let mount = if value.is_empty() { - Mount::builder() - } else { - Mount::builder().data(&value) - } - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount("none", target)?; + let mount = if value.is_empty() { + Mount::builder() + } else { + Mount::builder().data(&value) + } + .fstype(FilesystemType::Manual("none")) + .flags(flags) + .mount("none", &target)?; - debug!( - "Target {} bind remounted (options: {})", - target, - show(options) - ); + debug!( + "Target {} bind remounted (options: {})", + target, + show(&options) + ); - Ok(mount) + Ok(mount) + }) + .await? } /// Unmounts a path that has previously been bind mounted. /// Should not be used for unmounting devices. -pub(crate) fn bind_unmount(target: &str) -> Result<(), Error> { - let flags = UnmountFlags::empty(); +pub(crate) async fn bind_unmount(target: &str) -> Result<(), Error> { + let target = target.to_string(); - unmount(target, flags)?; + runtime::spawn_blocking(move || { + let flags = UnmountFlags::empty(); + unmount(&target, flags)?; - debug!("Target {} bind unmounted", target); - - Ok(()) + debug!("Target {} bind unmounted", target); + Ok(()) + }) + .await? } /// Remount existing mount as read only or read write. -pub(crate) fn remount(target: &str, ro: bool) -> Result { - let mut flags = MountFlags::empty(); - flags.insert(MountFlags::REMOUNT); +pub(crate) async fn remount(target: &str, ro: bool) -> Result { + let target = target.to_string(); - if ro { - flags.insert(MountFlags::RDONLY); - } + runtime::spawn_blocking(move || { + let mut flags = MountFlags::empty(); + flags.insert(MountFlags::REMOUNT); - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount("", target)?; + if ro { + flags.insert(MountFlags::RDONLY); + } + + let mount = Mount::builder() + .fstype(FilesystemType::Manual("none")) + .flags(flags) + .mount("", &target)?; - debug!("Target {} remounted with {}", target, flags.bits()); + debug!("Target {} remounted with {}", target, flags.bits()); - Ok(mount) + Ok(mount) + }) + .await? } /// Mount a block device -pub(crate) fn blockdevice_mount( +pub(crate) async fn blockdevice_mount( source: &str, target: &str, readonly: bool, ) -> Result { - debug!("Mounting {} ...", source); + let source = source.to_string(); + let target = target.to_string(); - let mut flags = MountFlags::empty(); - flags.insert(MountFlags::BIND); + runtime::spawn_blocking(move || { + debug!("Mounting {} ...", source); - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount(source, target)?; - info!("Block device {} mounted to {}", source, target,); - - if readonly { - flags.insert(MountFlags::REMOUNT); - flags.insert(MountFlags::RDONLY); + let mut flags = MountFlags::empty(); + flags.insert(MountFlags::BIND); let mount = Mount::builder() - .fstype(FilesystemType::Manual("")) + .fstype(FilesystemType::Manual("none")) .flags(flags) - .mount("", target)?; - info!("Remounted block device {} (readonly) to {}", source, target); - return Ok(mount); - } + .mount(&source, &target)?; + + info!("Block device {} mounted to {}", source, target); - Ok(mount) + if readonly { + flags.insert(MountFlags::REMOUNT); + flags.insert(MountFlags::RDONLY); + + let mount = Mount::builder() + .fstype(FilesystemType::Manual("")) + .flags(flags) + .mount("", &target)?; + + info!("Remounted block device {} (readonly) to {}", source, target); + return Ok(mount); + } + + Ok(mount) + }) + .await? } /// Unmount a block device. -pub(crate) fn blockdevice_unmount(target: &str) -> Result<(), Error> { - let flags = UnmountFlags::empty(); +pub(crate) async fn blockdevice_unmount(target: &str) -> Result<(), Error> { + let target = target.to_string(); - debug!( - "Unmounting block device {} (flags={}) ...", - target, - flags.bits() - ); + runtime::spawn_blocking(move || { + let flags = UnmountFlags::empty(); - unmount(target, flags)?; - info!("block device at {} has been unmounted", target); - Ok(()) + debug!( + "Unmounting block device {} (flags={}) ...", + target, + flags.bits() + ); + + unmount(&target, flags)?; + info!("Block device at {} has been unmounted", target); + + Ok(()) + }) + .await? } /// Waits until a device's filesystem is shutdown. @@ -364,7 +410,7 @@ async fn wait_file_removal( } /// If the filesystem uuid doesn't match with the provided uuid, unmount the device. -pub(crate) fn unmount_on_fs_id_diff( +pub(crate) async fn unmount_on_fs_id_diff( device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, @@ -374,7 +420,7 @@ pub(crate) fn unmount_on_fs_id_diff( return Ok(()); } } - filesystem_unmount(fs_staging_path).map_err(|error| { + filesystem_unmount(fs_staging_path).await.map_err(|error| { format!( "Failed to unmount on fs id difference, device {device_path} from {fs_staging_path} for {volume_uuid}, {error}", ) diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index 0f5b25590..029cc11db 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -305,7 +305,7 @@ impl node_server::Node for Node { ) })? { AccessType::Mount(mnt) => { - publish_fs_volume(&msg, mnt, &self.filesystems)?; + publish_fs_volume(&msg, mnt, &self.filesystems).await?; } AccessType::Block(_) => { publish_block_volume(&msg).await?; @@ -356,7 +356,7 @@ impl node_server::Node for Node { let target_path = Path::new(&msg.target_path); if target_path.exists() { if target_path.is_dir() { - unpublish_fs_volume(&msg)?; + unpublish_fs_volume(&msg).await?; } else { if target_path.is_file() { return Err(Status::new( @@ -368,7 +368,7 @@ impl node_server::Node for Node { )); } - unpublish_block_volume(&msg)?; + unpublish_block_volume(&msg).await?; } } Ok(Response::new(NodeUnpublishVolumeResponse {})) diff --git a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs index eeb894409..70fe84bc9 100644 --- a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs +++ b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs @@ -76,6 +76,7 @@ impl NodePluginGrpcServer { endpoint ); Server::builder() + .timeout(std::time::Duration::from_secs(5)) .add_service(NodePluginServer::new(NodePluginSvc {})) .serve_with_shutdown(endpoint, Shutdown::wait()) .await diff --git a/control-plane/csi-driver/src/bin/node/runtime.rs b/control-plane/csi-driver/src/bin/node/runtime.rs new file mode 100644 index 000000000..af5734011 --- /dev/null +++ b/control-plane/csi-driver/src/bin/node/runtime.rs @@ -0,0 +1,49 @@ +//! +//! This allows us to send futures from within mayastor to the tokio +//! runtime to do whatever it needs to do. + +use once_cell::sync::Lazy; +use tokio::task::JoinHandle; +use tracing::trace; + +/// spawn a future that might block on a separate worker thread the +/// number of threads available is determined by max_blocking_threads +pub(crate) fn spawn_blocking(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + RUNTIME.spawn_blocking(f) +} + +pub(crate) struct Runtime { + rt: tokio::runtime::Runtime, +} + +static RUNTIME: Lazy = Lazy::new(|| { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(5) + .max_blocking_threads(50) + .build() + .unwrap(); + + Runtime::new(rt) +}); + +impl Runtime { + fn new(rt: tokio::runtime::Runtime) -> Self { + Self { rt } + } + fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let handle = self.rt.handle().clone(); + handle.spawn_blocking(|| { + trace!("Spawned a blocking thread"); + f() + }) + } +}