Skip to content

Commit

Permalink
Merge pull request #488 from jiangliu/utils-v0.3
Browse files Browse the repository at this point in the history
Prepare for releasing nydus-utils-v0.3
  • Loading branch information
bergwolf authored Jun 13, 2022
2 parents d9f6247 + 334ac61 commit f563ec1
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 137 deletions.
53 changes: 21 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ nydus-app = { path = "app" }
nydus-error = { path = "error" }
nydus-rafs = { version = "0.1.0", path = "rafs", features = ["backend-registry", "backend-oss"] }
nydus-storage = { version = "0.5.0", path = "storage" }
nydus-utils = { version = "0.2.0", path = "utils" }
nydus-utils = { version = "0.3.0", path = "utils" }
blobfs = { path = "blobfs", features = ["virtiofs"], optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ url = "2.1.1"
vmm-sys-util = "0.9.0"

nydus-error = { version = "0.2.0", path = "../error" }
nydus-utils = { version = "0.2.0", path = "../utils" }
nydus-utils = { version = "0.3.0", path = "../utils" }
13 changes: 13 additions & 0 deletions api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,4 +1021,17 @@ mod tests {
assert!(msg.is_none());
let _ = thread.join().unwrap();
}

#[test]
fn test_common_config() {
let config = RegistryOssConfig::default();

assert_eq!(config.timeout, 5);
assert_eq!(config.connect_timeout, 5);
assert_eq!(config.retry_limit, 0);
assert_eq!(config.proxy.check_interval, 5);
assert!(config.proxy.fallback);
assert_eq!(config.proxy.ping_url, "");
assert_eq!(config.proxy.url, "");
}
}
2 changes: 1 addition & 1 deletion rafs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fuse-backend-rs = { version = "0.9.0" }
nydus-api = { version = "0.1.0", path = "../api" }
nydus-error = { version = "0.2.0", path = "../error" }
nydus-storage = { version = "0.5.0", path = "../storage", features = ["backend-localfs"] }
nydus-utils = { version = "0.2.0", path = "../utils" }
nydus-utils = { version = "0.3.0", path = "../utils" }

[dev-dependencies]
vmm-sys-util = "0.9.0"
Expand Down
5 changes: 2 additions & 3 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = "1.0.35"
arc-swap = "=0.4"
base64 = { version = "0.13.0", optional = true }
bitflags = "1.2.1"
dbs-uhttp = { version = "0.3.0" }
futures = "0.3"
# pin governor to avoid multi versions of hashbrown
governor = "=0.4.1"
Expand All @@ -36,11 +37,9 @@ vmm-sys-util = "0.9.0"
fuse-backend-rs = { version = "0.9.0" }

nydus-api = { version = "0.1.0", path = "../api" }
nydus-utils = { version = "0.2.0", path = "../utils" }
nydus-utils = { version = "0.3.0", path = "../utils" }
nydus-error = { version = "0.2.0", path = "../error" }

dbs-uhttp = { version = "0.3.0" }

[dev-dependencies]

[features]
Expand Down
18 changes: 1 addition & 17 deletions storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,9 @@ fn default_http_scheme() -> String {

#[cfg(test)]
mod tests {
use super::*;
use nydus_api::http::RegistryOssConfig;

#[cfg(any(feature = "backend-oss", feature = "backend-registry"))]
#[test]
fn test_default_http_scheme() {
assert_eq!(default_http_scheme(), "https");
}

#[test]
fn test_common_config() {
let config = RegistryOssConfig::default();

assert_eq!(config.timeout, 5);
assert_eq!(config.connect_timeout, 5);
assert_eq!(config.retry_limit, 0);
assert_eq!(config.proxy.check_interval, 5);
assert!(config.proxy.fallback);
assert_eq!(config.proxy.ping_url, "");
assert_eq!(config.proxy.url, "");
assert_eq!(super::default_http_scheme(), "https");
}
}
90 changes: 9 additions & 81 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::collections::VecDeque;
use std::io::{Error, ErrorKind, Result};
use std::io::Result;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

Expand All @@ -17,9 +16,9 @@ use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter};
use nydus_utils::metrics::{BlobcacheMetrics, Metric};
use tokio::runtime::Runtime;
use tokio::sync::Notify;

use nydus_api::http::BlobPrefetchConfig;
use nydus_utils::mpmc::Channel;

use crate::cache::{BlobCache, BlobIoRange};
use crate::RAFS_MAX_CHUNK_SIZE;
Expand Down Expand Up @@ -81,77 +80,6 @@ impl AsyncPrefetchMessage {
}
}

// Async implementation of Multi-Producer-Multi-Consumer channel.
struct Channel<T> {
closed: AtomicBool,
notifier: Notify,
requests: Mutex<VecDeque<T>>,
}

impl<T> Channel<T> {
fn new() -> Self {
Channel {
closed: AtomicBool::new(false),
notifier: Notify::new(),
requests: Mutex::new(VecDeque::new()),
}
}

fn close(&self) {
self.closed.store(true, Ordering::Release);
self.notifier.notify_waiters();
}

fn send(&self, msg: T) -> std::result::Result<(), T> {
if self.closed.load(Ordering::Acquire) {
Err(msg)
} else {
self.requests.lock().unwrap().push_back(msg);
self.notifier.notify_one();
Ok(())
}
}

fn try_recv(&self) -> Option<T> {
self.requests.lock().unwrap().pop_front()
}

async fn recv(&self) -> Result<T> {
let future = self.notifier.notified();
tokio::pin!(future);

loop {
/*
// TODO: enable this after https://github.com/tokio-rs/tokio/issues/4745 has been fixed
// Make sure that no wakeup is lost if we get `None` from `try_recv`.
future.as_mut().enable();
*/

if let Some(msg) = self.try_recv() {
return Ok(msg);
} else if self.closed.load(Ordering::Acquire) {
return Err(Error::new(ErrorKind::BrokenPipe, "channel has been closed"));
}

// Wait for a call to `notify_one`.
//
// This uses `.as_mut()` to avoid consuming the future,
// which lets us call `Pin::set` below.
future.as_mut().await;

// Reset the future in case another call to `try_recv` got the message before us.
future.set(self.notifier.notified());
}
}

fn flush_pending_prefetch_requests<F>(&self, f: F)
where
F: FnMut(&T) -> bool,
{
self.requests.lock().unwrap().retain(f);
}
}

pub(crate) struct AsyncWorkerMgr {
metrics: Arc<BlobcacheMetrics>,
ping_requests: AtomicU32,
Expand Down Expand Up @@ -213,7 +141,7 @@ impl AsyncWorkerMgr {
self.prefetch_channel.close();

while self.workers.load(Ordering::Relaxed) > 0 {
self.prefetch_channel.notifier.notify_waiters();
self.prefetch_channel.notify_waiters();
thread::sleep(Duration::from_millis(10));
}
}
Expand All @@ -236,12 +164,12 @@ impl AsyncWorkerMgr {
self.prefetch_channel
.flush_pending_prefetch_requests(|t| match t {
AsyncPrefetchMessage::BlobPrefetch(state, blob, _, _) => {
blob_id != blob.blob_id() || state.load(Ordering::Acquire) > 0
blob_id == blob.blob_id() && state.load(Ordering::Acquire) == 0
}
AsyncPrefetchMessage::FsPrefetch(state, blob, _) => {
blob_id != blob.blob_id() || state.load(Ordering::Acquire) > 0
blob_id == blob.blob_id() && state.load(Ordering::Acquire) == 0
}
_ => true,
_ => false,
});
}

Expand All @@ -260,7 +188,7 @@ impl AsyncWorkerMgr {

fn start_prefetch_workers(mgr: Arc<AsyncWorkerMgr>) -> Result<()> {
// Hold the request queue to barrier all working threads.
let guard = mgr.prefetch_channel.requests.lock().unwrap();
let guard = mgr.prefetch_channel.lock_channel();
for num in 0..mgr.prefetch_config.threads_count {
let mgr2 = mgr.clone();
let res = thread::Builder::new()
Expand Down
19 changes: 19 additions & 0 deletions utils/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Changelog
## [Unreleased]

## [v0.3]

### Added
- Asynchronous multi-producer multi-consumer channel

### Fixed
- Refine metrics APIs

### Deprecated
- Remove dependency on fuse-backend-rs crate

## [v0.1.0]

### Added

- Initial release
Loading

0 comments on commit f563ec1

Please sign in to comment.