Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for choosing between async-std and Tokio #29

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ license = "Apache-2.0"
repository = "https://github.com/zkat/cacache-rs"
homepage = "https://github.com/zkat/cacache-rs"
readme = "README.md"
categories = [
"caching",
"filesystem"
]
categories = ["caching", "filesystem"]

[dependencies]
ssri = "7.0.0"
Expand All @@ -25,15 +22,28 @@ serde = "1.0.130"
serde_derive = "1.0.130"
walkdir = "2.3.2"
either = "1.6.1"
async-std = { version = "1.10.0", features = ["unstable"] }
async-std = { version = "1.10.0", features = ["unstable"], optional = true }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined with

[features]
default = ["async-std"]

it doesn't really change anything for your existing users unless they've put default-features = false on the dependency already, which would be a really odd thing to do on a crate which had no features.

tokio = { version = "1.12.0", features = [
"fs",
"io-util",
"macros",
"rt",
"rt-multi-thread",
], optional = true }
tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true }
thiserror = "1.0.29"
futures = "0.3.17"
memmap = "0.7.0"

[dev-dependencies]
async-std = { version = "1.10.0", features = ["unstable"] }
async-attributes = "1.1.2"
criterion = "0.3.5"

[[bench]]
name = "benchmarks"
harness = false

[features]
default = ["async-std"]
tokio-runtime = ["tokio", "tokio-stream"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Minimum supported Rust version is `1.43.0`.

## Features

- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary
- First-class async support, using either [`async-std`](https://crates.io/crates/async-std) or [`tokio`](https://crates.io/crates/tokio) as its runtime. Sync APIs are available but secondary
- `std::fs`-style API
- Extraction by key or by content address (shasum, etc)
- [Subresource Integrity](#integrity) web standard support
Expand Down
126 changes: 126 additions & 0 deletions src/async_lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#[cfg(feature = "async-std")]
pub use async_std::fs::File;
#[cfg(feature = "tokio")]
pub use tokio::fs::File;

#[cfg(feature = "async-std")]
pub use futures::io::AsyncRead;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncRead;

#[cfg(feature = "async-std")]
pub use futures::io::AsyncReadExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncReadExt;

#[cfg(feature = "async-std")]
pub use futures::io::AsyncBufReadExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncBufReadExt;

#[cfg(feature = "async-std")]
pub use futures::io::AsyncWrite;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncWrite;

#[cfg(feature = "async-std")]
pub use futures::io::AsyncWriteExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncWriteExt;

#[cfg(feature = "async-std")]
pub use async_std::fs::read;
#[cfg(feature = "tokio")]
pub use tokio::fs::read;

#[cfg(feature = "async-std")]
pub use async_std::fs::copy;
#[cfg(feature = "tokio")]
pub use tokio::fs::copy;

#[cfg(feature = "async-std")]
pub use async_std::fs::metadata;
#[cfg(feature = "tokio")]
pub use tokio::fs::metadata;

#[cfg(feature = "async-std")]
pub use async_std::fs::remove_file;
#[cfg(feature = "tokio")]
pub use tokio::fs::remove_file;

#[cfg(feature = "async-std")]
pub use async_std::fs::create_dir_all;
#[cfg(feature = "tokio")]
pub use tokio::fs::create_dir_all;

#[cfg(feature = "async-std")]
pub use async_std::fs::remove_dir_all;
#[cfg(feature = "tokio")]
pub use tokio::fs::remove_dir_all;

#[cfg(feature = "async-std")]
pub use async_std::fs::DirBuilder;
#[cfg(feature = "tokio")]
pub use tokio::fs::DirBuilder;

#[cfg(feature = "async-std")]
pub use async_std::fs::OpenOptions;
#[cfg(feature = "tokio")]
pub use tokio::fs::OpenOptions;

#[cfg(feature = "async-std")]
pub use async_std::io::BufReader;
#[cfg(feature = "tokio")]
pub use tokio::io::BufReader;

#[cfg(feature = "async-std")]
#[inline]
pub fn lines_to_stream<R>(lines: futures::io::Lines<R>) -> futures::io::Lines<R> {
lines
}
#[cfg(feature = "tokio")]
#[inline]
pub fn lines_to_stream<R>(lines: tokio::io::Lines<R>) -> tokio_stream::wrappers::LinesStream<R> {
tokio_stream::wrappers::LinesStream::new(lines)
}

#[cfg(feature = "async-std")]
pub use async_std::task::spawn_blocking;
#[cfg(feature = "tokio")]
pub use tokio::task::spawn_blocking;

#[cfg(all(test, feature = "async-std"))]
pub use async_std::task::block_on;
#[cfg(all(test, feature = "tokio"))]
#[inline]
pub fn block_on<F, T>(future: F) -> T
where
F: std::future::Future<Output = T>,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}

#[cfg(feature = "async-std")]
pub use async_std::task::JoinHandle;
#[cfg(feature = "async-std")]
#[inline]
pub async fn unwrap_joinhandle<R>(handle: async_std::task::JoinHandle<R>) -> R {
handle.await
}
#[cfg(feature = "async-std")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: T) -> T {
value
}
#[cfg(feature = "tokio")]
pub use tokio::task::JoinHandle;
#[cfg(feature = "tokio")]
#[inline]
pub async fn unwrap_joinhandle<R>(handle: tokio::task::JoinHandle<R>) -> R {
handle.await.unwrap()
}
#[cfg(feature = "tokio")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: Result<T, tokio::task::JoinError>) -> T {
value.unwrap()
}
32 changes: 24 additions & 8 deletions src/content/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_std;
use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityChecker};

use crate::async_lib::AsyncRead;
use crate::content::path;
use crate::errors::{Internal, Result};

Expand All @@ -30,11 +29,12 @@ impl Reader {
}

pub struct AsyncReader {
fd: async_std::fs::File,
fd: crate::async_lib::File,
checker: IntegrityChecker,
}

impl AsyncRead for AsyncReader {
#[cfg(feature = "async-std")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -44,6 +44,22 @@ impl AsyncRead for AsyncReader {
self.checker.input(&buf[..amt]);
Poll::Ready(Ok(amt))
}

#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
let pre_len = buf.filled().len();
futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
let post_len = buf.filled().len();
if post_len - pre_len == 0 {
return Poll::Ready(Ok(()));
}
self.checker.input(&buf.filled()[pre_len..]);
Poll::Ready(Ok(()))
}
}

impl AsyncReader {
Expand All @@ -63,7 +79,7 @@ pub fn open(cache: &Path, sri: Integrity) -> Result<Reader> {
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> {
let cpath = path::content_path(cache, &sri);
Ok(AsyncReader {
fd: async_std::fs::File::open(cpath).await.to_internal()?,
fd: crate::async_lib::File::open(cpath).await.to_internal()?,
checker: IntegrityChecker::new(sri),
})
}
Expand All @@ -77,7 +93,7 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>> {

pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> {
let cpath = path::content_path(cache, sri);
let ret = async_std::fs::read(&cpath).await.to_internal()?;
let ret = crate::async_lib::read(&cpath).await.to_internal()?;
sri.check(&ret)?;
Ok(ret)
}
Expand All @@ -92,8 +108,8 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {

pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
let cpath = path::content_path(cache, sri);
let ret = async_std::fs::copy(&cpath, to).await.to_internal()?;
let data = async_std::fs::read(cpath).await.to_internal()?;
let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?;
let data = crate::async_lib::read(cpath).await.to_internal()?;
sri.check(data)?;
Ok(ret)
}
Expand All @@ -107,7 +123,7 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
}

pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option<Integrity> {
if async_std::fs::metadata(path::content_path(cache, sri))
if crate::async_lib::metadata(path::content_path(cache, sri))
.await
.is_ok()
{
Expand Down
3 changes: 1 addition & 2 deletions src/content/rm.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fs;
use std::path::Path;

use async_std::fs as afs;
use ssri::Integrity;

use crate::content::path;
Expand All @@ -13,7 +12,7 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> {
}

pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> {
afs::remove_file(path::content_path(cache, sri))
crate::async_lib::remove_file(path::content_path(cache, sri))
.await
.to_internal()?;
Ok(())
Expand Down
Loading