Skip to content

Commit

Permalink
feat(async): add extra async versions of APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Oct 13, 2019
1 parent 642e40b commit a491840
Show file tree
Hide file tree
Showing 13 changed files with 1,496 additions and 211 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [stable, beta, nightly]
rust: [nightly]
os: [ubuntu-latest, windows-latest]

steps:
Expand All @@ -21,6 +21,7 @@ jobs:
- name: Build
run: cargo build --verbose
- name: Clippy
run: cargo clippy -- -D warnings
# TODO - add -D warnings back in after async-std publishes task::blocking
run: cargo clippy # -- -D warnings
- name: Run tests
run: cargo test --verbose
794 changes: 593 additions & 201 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ failure = "0.1.5"
walkdir = "2.2.7"
either = "1.5.2"
mkdirp = "1.0.0"
futures-preview = "0.3.0-alpha.18"
async-std = { version = "0.99.9", features = ["unstable"]}

[target.'cfg(unix)'.dependencies]
chownr = "2.0.0"
Expand Down
68 changes: 65 additions & 3 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,81 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

use async_std::task;
use cacache;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tempfile;

fn get(c: &mut Criterion) {
fn read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("read_hash", move |b| {
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)))
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)).unwrap())
});
}

criterion_group!(benches, get);
fn read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
cacache::get::read(&cache, "hello").unwrap();
c.bench_function("read", move |b| {
b.iter(|| cacache::get::read(black_box(&cache), black_box(String::from("hello"))).unwrap())
});
}

fn read_hash_big_data(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("read_hash_big_data", move |b| {
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)).unwrap())
});
}

fn async_read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read_hash", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read_hash(
black_box(&cache),
black_box(&sri),
))
.unwrap()
})
});
}

fn async_read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read(
black_box(&cache),
black_box("hello"),
))
.unwrap()
})
});
}

criterion_group!(
benches,
read_hash,
read,
read_hash_big_data,
async_read_hash,
async_read
);
// criterion_group!(benches, read_hash, async_read_hash);
criterion_main!(benches);
133 changes: 133 additions & 0 deletions src/async_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

//! Functions for reading asynchronously from cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

use crate::content::read::{self, AsyncReader};
use crate::errors::Error;
use crate::index::{self, Entry};

/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}

impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}

/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn read<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
read_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
#[allow(clippy::needless_lifetimes)]
pub async fn read_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}

/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}

/// Copies a cache entry by integrity address to a specified location.
#[allow(clippy::needless_lifetimes)]
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}

/// Gets entry information and metadata for a certain key.
pub async fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find_async(cache.as_ref(), key.as_ref()).await
}

/// Returns true if the given hash exists in the cache.
pub async fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content_async(cache.as_ref(), &sri).await.is_some()
}

131 changes: 131 additions & 0 deletions src/async_put.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

//! Functions for asynchronously writing to cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

pub use crate::put::PutOpts;
use crate::content::write;
use crate::errors::Error;
use crate::index;

/// Writes `data` to the `cache`, indexing it under `key`.
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_async(cache.as_ref(), key.as_ref()).await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}

impl PutOpts {
/// Opens the file handle for writing, returning a Put instance.
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
).await?,
opts: self,
})
}
}

/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}

impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}

impl AsyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::async_get;
use async_std::task;

#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
data(&dir, "hello", b"hello").await.unwrap();
});
let data = task::block_on(async {
async_get::read(&dir, "hello").await.unwrap()
});
assert_eq!(data, b"hello");
}
}
Loading

0 comments on commit a491840

Please sign in to comment.