From 9555c296a7d74d2f6f63e08c8b8844ea4a86024a Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 12 Jun 2022 20:54:05 +0800 Subject: [PATCH 1/4] storage: update tokio to fix a bug in mpmc channel Update tokio to fix the bug: https://github.com/tokio-rs/tokio/issues/4745 Signed-off-by: Jiang Liu --- Cargo.lock | 50 ++++++++++++++----------------------- storage/src/cache/worker.rs | 3 --- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c2fcabd641..7d505d1ee9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,9 +168,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.9.1" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" +checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" [[package]] name = "bytes" @@ -741,9 +741,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", @@ -1324,9 +1324,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.73" +version = "0.9.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" +checksum = "835363342df5fba8354c5b453325b110ffd54044e588c539cf2f20a8014e4cb1" dependencies = [ "autocfg", "cc", @@ -1794,9 +1794,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" +checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf" dependencies = [ "proc-macro2", "quote", @@ -1894,9 +1894,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.19.0" +version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f392c8f16bda3456c0b00c6de39cb100449b98de55ac41c6cdd2bfcf53a1245" +checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ "bytes", "libc", @@ -1912,9 +1912,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" dependencies = [ "proc-macro2", "quote", @@ -1933,9 +1933,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ "bytes", "futures-core", @@ -1962,34 +1962,22 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" dependencies = [ "cfg-if", "pin-project-lite", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing-core" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921" dependencies = [ - "lazy_static", + "once_cell", ] [[package]] diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index bb7380ff4f4..380f266e3eb 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -121,11 +121,8 @@ impl Channel { tokio::pin!(future); loop { - /* - // TODO: enable this after https://github.com/tokio-rs/tokio/issues/4745 has been fixed // Make sure that no wakeup is lost if we get `None` from `try_recv`. future.as_mut().enable(); - */ if let Some(msg) = self.try_recv() { return Ok(msg); From fcea786b632fc34a53e01e3678632c7e4b90f597 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 12 Jun 2022 21:01:16 +0800 Subject: [PATCH 2/4] storage: move API related unit test into nydus-api Move API related unit test into nydus-api. Signed-off-by: Jiang Liu --- api/src/http.rs | 13 +++++++++++++ storage/src/backend/mod.rs | 18 +----------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/api/src/http.rs b/api/src/http.rs index d2a18d8aa9c..0b68be7f6cf 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -1021,4 +1021,17 @@ mod tests { assert!(msg.is_none()); let _ = thread.join().unwrap(); } + + #[test] + fn test_common_config() { + let config = RegistryOssConfig::default(); + + assert_eq!(config.timeout, 5); + assert_eq!(config.connect_timeout, 5); + assert_eq!(config.retry_limit, 0); + assert_eq!(config.proxy.check_interval, 5); + assert!(config.proxy.fallback); + assert_eq!(config.proxy.ping_url, ""); + assert_eq!(config.proxy.url, ""); + } } diff --git a/storage/src/backend/mod.rs b/storage/src/backend/mod.rs index cde2b4827e6..c9de7f4f9fb 100644 --- a/storage/src/backend/mod.rs +++ b/storage/src/backend/mod.rs @@ -182,25 +182,9 @@ fn default_http_scheme() -> String { #[cfg(test)] mod tests { - use super::*; - use nydus_api::http::RegistryOssConfig; - #[cfg(any(feature = "backend-oss", feature = "backend-registry"))] #[test] fn test_default_http_scheme() { - assert_eq!(default_http_scheme(), "https"); - } - - #[test] - fn test_common_config() { - let config = RegistryOssConfig::default(); - - assert_eq!(config.timeout, 5); - assert_eq!(config.connect_timeout, 5); - assert_eq!(config.retry_limit, 0); - assert_eq!(config.proxy.check_interval, 5); - assert!(config.proxy.fallback); - assert_eq!(config.proxy.ping_url, ""); - assert_eq!(config.proxy.url, ""); + assert_eq!(super::default_http_scheme(), "https"); } } From 42f7d24b80ec2c8ff32608e0d77921a899ac04f9 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 12 Jun 2022 21:04:34 +0800 Subject: [PATCH 3/4] uitl: move MPMC channenl implementation into nydus-util Move MPMC channenl implementation into nydus-util, so it could be reused later. Signed-off-by: Jiang Liu --- Cargo.lock | 1 + storage/Cargo.toml | 3 +- storage/src/cache/worker.rs | 87 +++----------------- utils/Cargo.toml | 1 + utils/src/lib.rs | 1 + utils/src/mpmc.rs | 154 ++++++++++++++++++++++++++++++++++++ 6 files changed, 167 insertions(+), 80 deletions(-) create mode 100644 utils/src/mpmc.rs diff --git a/Cargo.lock b/Cargo.lock index 7d505d1ee9f..936190e0a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1262,6 +1262,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "tokio", "vmm-sys-util", "zstd", ] diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 3677723eac4..6415db0aad7 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -14,6 +14,7 @@ anyhow = "1.0.35" arc-swap = "=0.4" base64 = { version = "0.13.0", optional = true } bitflags = "1.2.1" +dbs-uhttp = { version = "0.3.0" } futures = "0.3" # pin governor to avoid multi versions of hashbrown governor = "=0.4.1" @@ -39,8 +40,6 @@ nydus-api = { version = "0.1.0", path = "../api" } nydus-utils = { version = "0.2.0", path = "../utils" } nydus-error = { version = "0.2.0", path = "../error" } -dbs-uhttp = { version = "0.3.0" } - [dev-dependencies] [features] diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 380f266e3eb..05df2324631 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -3,11 +3,10 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::collections::VecDeque; -use std::io::{Error, ErrorKind, Result}; +use std::io::Result; use std::num::NonZeroU32; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -17,9 +16,9 @@ use governor::state::{InMemoryState, NotKeyed}; use governor::{Quota, RateLimiter}; use nydus_utils::metrics::{BlobcacheMetrics, Metric}; use tokio::runtime::Runtime; -use tokio::sync::Notify; use nydus_api::http::BlobPrefetchConfig; +use nydus_utils::mpmc::Channel; use crate::cache::{BlobCache, BlobIoRange}; use crate::RAFS_MAX_CHUNK_SIZE; @@ -81,74 +80,6 @@ impl AsyncPrefetchMessage { } } -// Async implementation of Multi-Producer-Multi-Consumer channel. -struct Channel { - closed: AtomicBool, - notifier: Notify, - requests: Mutex>, -} - -impl Channel { - fn new() -> Self { - Channel { - closed: AtomicBool::new(false), - notifier: Notify::new(), - requests: Mutex::new(VecDeque::new()), - } - } - - fn close(&self) { - self.closed.store(true, Ordering::Release); - self.notifier.notify_waiters(); - } - - fn send(&self, msg: T) -> std::result::Result<(), T> { - if self.closed.load(Ordering::Acquire) { - Err(msg) - } else { - self.requests.lock().unwrap().push_back(msg); - self.notifier.notify_one(); - Ok(()) - } - } - - fn try_recv(&self) -> Option { - self.requests.lock().unwrap().pop_front() - } - - async fn recv(&self) -> Result { - let future = self.notifier.notified(); - tokio::pin!(future); - - loop { - // Make sure that no wakeup is lost if we get `None` from `try_recv`. - future.as_mut().enable(); - - if let Some(msg) = self.try_recv() { - return Ok(msg); - } else if self.closed.load(Ordering::Acquire) { - return Err(Error::new(ErrorKind::BrokenPipe, "channel has been closed")); - } - - // Wait for a call to `notify_one`. - // - // This uses `.as_mut()` to avoid consuming the future, - // which lets us call `Pin::set` below. - future.as_mut().await; - - // Reset the future in case another call to `try_recv` got the message before us. - future.set(self.notifier.notified()); - } - } - - fn flush_pending_prefetch_requests(&self, f: F) - where - F: FnMut(&T) -> bool, - { - self.requests.lock().unwrap().retain(f); - } -} - pub(crate) struct AsyncWorkerMgr { metrics: Arc, ping_requests: AtomicU32, @@ -210,7 +141,7 @@ impl AsyncWorkerMgr { self.prefetch_channel.close(); while self.workers.load(Ordering::Relaxed) > 0 { - self.prefetch_channel.notifier.notify_waiters(); + self.prefetch_channel.notify_waiters(); thread::sleep(Duration::from_millis(10)); } } @@ -233,12 +164,12 @@ impl AsyncWorkerMgr { self.prefetch_channel .flush_pending_prefetch_requests(|t| match t { AsyncPrefetchMessage::BlobPrefetch(state, blob, _, _) => { - blob_id != blob.blob_id() || state.load(Ordering::Acquire) > 0 + blob_id == blob.blob_id() && state.load(Ordering::Acquire) == 0 } AsyncPrefetchMessage::FsPrefetch(state, blob, _) => { - blob_id != blob.blob_id() || state.load(Ordering::Acquire) > 0 + blob_id == blob.blob_id() && state.load(Ordering::Acquire) == 0 } - _ => true, + _ => false, }); } @@ -257,7 +188,7 @@ impl AsyncWorkerMgr { fn start_prefetch_workers(mgr: Arc) -> Result<()> { // Hold the request queue to barrier all working threads. - let guard = mgr.prefetch_channel.requests.lock().unwrap(); + let guard = mgr.prefetch_channel.lock_channel(); for num in 0..mgr.prefetch_config.threads_count { let mgr2 = mgr.clone(); let res = thread::Builder::new() diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 9a1bf9b2c51..7437a91d5c7 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -18,6 +18,7 @@ lz4-sys = "1.9.2" serde = { version = ">=1.0.27", features = ["serde_derive", "rc"] } serde_json = ">=1.0.9" sha2 = "0.10.0" +tokio = { version = "1.19.0", features = ["rt", "sync"] } zstd = "0.11" nydus-error = { version = "0.2", path = "../error" } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 7c30328840f..d7bc65b3fb0 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -22,6 +22,7 @@ pub mod digest; pub mod exec; pub mod inode_bitmap; pub mod metrics; +pub mod mpmc; pub mod types; /// Round up and divide the value `n` by `d`. diff --git a/utils/src/mpmc.rs b/utils/src/mpmc.rs new file mode 100644 index 00000000000..4a36b5f3bf7 --- /dev/null +++ b/utils/src/mpmc.rs @@ -0,0 +1,154 @@ +// Copyright (C) 2022 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// Async implementation of Multi-Producer-Multi-Consumer channel. + +//! Asynchronous Multi-Producer Multi-Consumer channel. +//! +//! This module provides an asynchronous multi-producer multi-consumer channel based on [tokio::sync::Notify]. + +use std::collections::VecDeque; +use std::io::{Error, ErrorKind, Result}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Mutex, MutexGuard}; +use tokio::sync::Notify; + +/// An asynchronous multi-producer multi-consumer channel based on [tokio::sync::Notify]. +pub struct Channel { + closed: AtomicBool, + notifier: Notify, + requests: Mutex>, +} + +impl Channel { + /// Create a new instance of [`Channel`]. + pub fn new() -> Self { + Channel { + closed: AtomicBool::new(false), + notifier: Notify::new(), + requests: Mutex::new(VecDeque::new()), + } + } + + /// Close the channel. + pub fn close(&self) { + self.closed.store(true, Ordering::Release); + self.notifier.notify_waiters(); + } + + /// Send a message to the channel. + /// + /// The message object will be returned on error, to ease the lifecycle management. + pub fn send(&self, msg: T) -> std::result::Result<(), T> { + if self.closed.load(Ordering::Acquire) { + Err(msg) + } else { + self.requests.lock().unwrap().push_back(msg); + self.notifier.notify_one(); + Ok(()) + } + } + + /// Try to receive a message from the channel. + pub fn try_recv(&self) -> Option { + self.requests.lock().unwrap().pop_front() + } + + /// Receive message from the channel in asynchronous mode. + pub async fn recv(&self) -> Result { + let future = self.notifier.notified(); + tokio::pin!(future); + + loop { + // Make sure that no wakeup is lost if we get `None` from `try_recv`. + future.as_mut().enable(); + + if let Some(msg) = self.try_recv() { + return Ok(msg); + } else if self.closed.load(Ordering::Acquire) { + return Err(Error::new(ErrorKind::BrokenPipe, "channel has been closed")); + } + + // Wait for a call to `notify_one`. + // + // This uses `.as_mut()` to avoid consuming the future, + // which lets us call `Pin::set` below. + future.as_mut().await; + + // Reset the future in case another call to `try_recv` got the message before us. + future.set(self.notifier.notified()); + } + } + + /// Flush all pending requests specified by the predicator. + /// + pub fn flush_pending_prefetch_requests(&self, mut f: F) + where + F: FnMut(&T) -> bool, + { + self.requests.lock().unwrap().retain(|t| !f(t)); + } + + /// Lock the channel to block all queue operations. + pub fn lock_channel(&self) -> MutexGuard> { + self.requests.lock().unwrap() + } + + /// Notify all waiters. + pub fn notify_waiters(&self) { + self.notifier.notify_waiters(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + #[test] + fn test_new_channel() { + let channel = Channel::new(); + + channel.send(1u32).unwrap(); + channel.send(2u32).unwrap(); + assert_eq!(channel.try_recv().unwrap(), 1); + assert_eq!(channel.try_recv().unwrap(), 2); + + channel.close(); + channel.send(2u32).unwrap_err(); + } + + #[test] + fn test_flush_channel() { + let channel = Channel::new(); + + channel.send(1u32).unwrap(); + channel.send(2u32).unwrap(); + channel.flush_pending_prefetch_requests(|_| true); + assert!(channel.try_recv().is_none()); + + channel.notify_waiters(); + let _guard = channel.lock_channel(); + } + + #[test] + fn test_async_recv() { + let channel = Arc::new(Channel::new()); + let channel2 = channel.clone(); + + let t = std::thread::spawn(move || { + channel2.send(1u32).unwrap(); + }); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let msg = channel.recv().await.unwrap(); + assert_eq!(msg, 1); + }); + + t.join().unwrap(); + } +} From 334ac61e788e369151eaf196112f60d7c7b6f291 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 12 Jun 2022 23:19:29 +0800 Subject: [PATCH 4/4] util: prepare for releasing v0.3 Prepare for release nydus-utils v0.3. Signed-off-by: Jiang Liu --- Cargo.lock | 2 +- Cargo.toml | 2 +- api/Cargo.toml | 2 +- rafs/Cargo.toml | 2 +- storage/Cargo.toml | 2 +- utils/CHANGELOG.md | 19 +++++++++++++++++++ utils/Cargo.toml | 2 +- utils/src/mpmc.rs | 6 ++++++ 8 files changed, 31 insertions(+), 6 deletions(-) create mode 100644 utils/CHANGELOG.md diff --git a/Cargo.lock b/Cargo.lock index 936190e0a01..fd315990a15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,7 +1250,7 @@ dependencies = [ [[package]] name = "nydus-utils" -version = "0.2.1" +version = "0.3.0" dependencies = [ "blake3", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 5f8468921c0..ce7f017bbc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ nydus-app = { path = "app" } nydus-error = { path = "error" } nydus-rafs = { version = "0.1.0", path = "rafs", features = ["backend-registry", "backend-oss"] } nydus-storage = { version = "0.5.0", path = "storage" } -nydus-utils = { version = "0.2.0", path = "utils" } +nydus-utils = { version = "0.3.0", path = "utils" } blobfs = { path = "blobfs", features = ["virtiofs"], optional = true } [dev-dependencies] diff --git a/api/Cargo.toml b/api/Cargo.toml index bc2bb3223c2..496a0d7f0f4 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -22,4 +22,4 @@ url = "2.1.1" vmm-sys-util = "0.9.0" nydus-error = { version = "0.2.0", path = "../error" } -nydus-utils = { version = "0.2.0", path = "../utils" } +nydus-utils = { version = "0.3.0", path = "../utils" } diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index 71bb94a1118..b8b7ebb011d 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -36,7 +36,7 @@ fuse-backend-rs = { version = "0.9.0" } nydus-api = { version = "0.1.0", path = "../api" } nydus-error = { version = "0.2.0", path = "../error" } nydus-storage = { version = "0.5.0", path = "../storage", features = ["backend-localfs"] } -nydus-utils = { version = "0.2.0", path = "../utils" } +nydus-utils = { version = "0.3.0", path = "../utils" } [dev-dependencies] vmm-sys-util = "0.9.0" diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 6415db0aad7..9375c3fe3cc 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -37,7 +37,7 @@ vmm-sys-util = "0.9.0" fuse-backend-rs = { version = "0.9.0" } nydus-api = { version = "0.1.0", path = "../api" } -nydus-utils = { version = "0.2.0", path = "../utils" } +nydus-utils = { version = "0.3.0", path = "../utils" } nydus-error = { version = "0.2.0", path = "../error" } [dev-dependencies] diff --git a/utils/CHANGELOG.md b/utils/CHANGELOG.md new file mode 100644 index 00000000000..83fb2ca005f --- /dev/null +++ b/utils/CHANGELOG.md @@ -0,0 +1,19 @@ +# Changelog +## [Unreleased] + +## [v0.3] + +### Added +- Asynchronous multi-producer multi-consumer channel + +### Fixed +- Refine metrics APIs + +### Deprecated +- Remove dependency on fuse-backend-rs crate + +## [v0.1.0] + +### Added + +- Initial release diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 7437a91d5c7..f66668edb23 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nydus-utils" -version = "0.2.1" +version = "0.3.0" description = "Compression/encryption/digest utilities for Nydus Image Service" authors = ["The Nydus Developers"] license = "Apache-2.0 OR BSD-3-Clause" diff --git a/utils/src/mpmc.rs b/utils/src/mpmc.rs index 4a36b5f3bf7..1e19acb1ad9 100644 --- a/utils/src/mpmc.rs +++ b/utils/src/mpmc.rs @@ -20,6 +20,12 @@ pub struct Channel { requests: Mutex>, } +impl Default for Channel { + fn default() -> Self { + Self::new() + } +} + impl Channel { /// Create a new instance of [`Channel`]. pub fn new() -> Self {