-
-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(async): add extra async versions of APIs
- Loading branch information
Showing
12 changed files
with
1,155 additions
and
198 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
//! Functions for reading from cache. | ||
use std::path::Path; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
use futures::prelude::*; | ||
use ssri::{Algorithm, Integrity}; | ||
|
||
use crate::content::read::{self, AsyncReader}; | ||
use crate::errors::Error; | ||
use crate::index::{self, Entry}; | ||
|
||
/// File handle for asynchronously reading from a content entry. | ||
/// | ||
/// Make sure to call `.check()` when done reading to verify that the | ||
/// extracted data passes integrity verification. | ||
pub struct AsyncGet { | ||
reader: AsyncReader, | ||
} | ||
|
||
impl AsyncRead for AsyncGet { | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &mut [u8], | ||
) -> Poll<std::io::Result<usize>> { | ||
Pin::new(&mut self.reader).poll_read(cx, buf) | ||
} | ||
} | ||
|
||
impl AsyncGet { | ||
/// Checks that data read from disk passes integrity checks. Returns the | ||
/// algorithm that was used verified the data. Should be called only after | ||
/// all data has been read from disk. | ||
pub fn check(self) -> Result<Algorithm, Error> { | ||
self.reader.check() | ||
} | ||
} | ||
|
||
/// Opens a new file handle into the cache, looking it up in the index using | ||
/// `key`. | ||
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error> | ||
where | ||
P: AsRef<Path>, | ||
K: AsRef<str>, | ||
{ | ||
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { | ||
open_hash(cache, entry.integrity).await | ||
} else { | ||
Err(Error::NotFound) | ||
} | ||
} | ||
|
||
/// Opens a new file handle into the cache, based on its integrity address. | ||
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error> | ||
where | ||
P: AsRef<Path>, | ||
{ | ||
Ok(AsyncGet { | ||
reader: read::open_async(cache.as_ref(), sri).await?, | ||
}) | ||
} | ||
|
||
/// Reads the entire contents of a cache file into a bytes vector, looking the | ||
/// data up by key. | ||
pub async fn read<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error> | ||
where | ||
P: AsRef<Path>, | ||
K: AsRef<str>, | ||
{ | ||
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { | ||
read_hash(cache, &entry.integrity).await | ||
} else { | ||
Err(Error::NotFound) | ||
} | ||
} | ||
|
||
/// Reads the entire contents of a cache file into a bytes vector, looking the | ||
/// data up by its content address. | ||
#[allow(clippy::needless_lifetimes)] | ||
pub async fn read_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error> | ||
where | ||
P: AsRef<Path>, | ||
{ | ||
Ok(read::read_async(cache.as_ref(), sri).await?) | ||
} | ||
|
||
/// Copies a cache entry by key to a specified location. | ||
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error> | ||
where | ||
P: AsRef<Path>, | ||
K: AsRef<str>, | ||
Q: AsRef<Path>, | ||
{ | ||
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { | ||
copy_hash(cache, &entry.integrity, to).await | ||
} else { | ||
Err(Error::NotFound) | ||
} | ||
} | ||
|
||
/// Copies a cache entry by integrity address to a specified location. | ||
#[allow(clippy::needless_lifetimes)] | ||
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error> | ||
where | ||
P: AsRef<Path>, | ||
Q: AsRef<Path>, | ||
{ | ||
read::copy_async(cache.as_ref(), sri, to.as_ref()).await | ||
} | ||
|
||
/// Gets entry information and metadata for a certain key. | ||
pub fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error> | ||
where | ||
P: AsRef<Path>, | ||
K: AsRef<str>, | ||
{ | ||
index::find(cache.as_ref(), key.as_ref()) | ||
} | ||
|
||
/// Returns true if the given hash exists in the cache. | ||
pub fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool { | ||
read::has_content(cache.as_ref(), &sri).is_some() | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
//! Functions for asynchronously writing to cache. | ||
use std::path::{Path, PathBuf}; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
use futures::prelude::*; | ||
use ssri::{Algorithm, Integrity}; | ||
|
||
pub use crate::put::PutOpts; | ||
use crate::content::write; | ||
use crate::errors::Error; | ||
use crate::index; | ||
|
||
/// Writes `data` to the `cache`, indexing it under `key`. | ||
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error> | ||
where | ||
P: AsRef<Path>, | ||
D: AsRef<[u8]>, | ||
K: AsRef<str>, | ||
{ | ||
let mut writer = PutOpts::new() | ||
.algorithm(Algorithm::Sha256) | ||
.open_async(cache.as_ref(), key.as_ref()).await?; | ||
writer.write_all(data.as_ref()).await?; | ||
writer.commit().await | ||
} | ||
|
||
impl PutOpts { | ||
/// Opens the file handle for writing, returning a Put instance. | ||
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error> | ||
where | ||
P: AsRef<Path>, | ||
K: AsRef<str>, | ||
{ | ||
Ok(AsyncPut { | ||
cache: cache.as_ref().to_path_buf(), | ||
key: String::from(key.as_ref()), | ||
written: 0, | ||
writer: write::AsyncWriter::new( | ||
cache.as_ref(), | ||
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), | ||
).await?, | ||
opts: self, | ||
}) | ||
} | ||
} | ||
|
||
/// A reference to an open file writing to the cache. | ||
pub struct AsyncPut { | ||
cache: PathBuf, | ||
key: String, | ||
written: usize, | ||
pub(crate) writer: write::AsyncWriter, | ||
opts: PutOpts, | ||
} | ||
|
||
impl AsyncWrite for AsyncPut { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<std::io::Result<usize>> { | ||
Pin::new(&mut self.writer).poll_write(cx, buf) | ||
} | ||
|
||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
Pin::new(&mut self.writer).poll_flush(cx) | ||
} | ||
|
||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
Pin::new(&mut self.writer).poll_close(cx) | ||
} | ||
} | ||
|
||
impl AsyncPut { | ||
/// Closes the Put handle and writes content and index entries. Also | ||
/// verifies data against `size` and `integrity` options, if provided. | ||
/// Must be called manually in order to complete the writing process, | ||
/// otherwise everything will be thrown out. | ||
pub async fn commit(self) -> Result<Integrity, Error> { | ||
let writer_sri = self.writer.close().await?; | ||
if let Some(sri) = &self.opts.sri { | ||
// TODO - ssri should have a .matches method | ||
let algo = sri.pick_algorithm(); | ||
let matched = sri | ||
.hashes | ||
.iter() | ||
.take_while(|h| h.algorithm == algo) | ||
.find(|&h| *h == writer_sri.hashes[0]); | ||
if matched.is_none() { | ||
return Err(Error::IntegrityError); | ||
} | ||
} | ||
if let Some(size) = self.opts.size { | ||
if size != self.written { | ||
return Err(Error::SizeError); | ||
} | ||
} | ||
index::insert_async(&self.cache, &self.key, self.opts).await?; | ||
Ok(writer_sri) | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
//! Functions for removing things from the cache. | ||
use std::path::Path; | ||
|
||
use async_std::fs as afs; | ||
use ssri::Integrity; | ||
|
||
use crate::content::rm; | ||
use crate::errors::Error; | ||
use crate::index; | ||
|
||
/// Removes an individual index entry. The associated content will be left | ||
/// intact. | ||
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> { | ||
index::delete_async(cache.as_ref(), &key).await | ||
} | ||
|
||
/// Removes an individual content entry. Any index entries pointing to this | ||
/// content will become invalidated. | ||
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> { | ||
rm::rm_async(cache.as_ref(), &sri).await | ||
} | ||
|
||
/// Removes entire contents of the cache, including temporary files, the entry | ||
/// index, and all content data. | ||
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> { | ||
for entry in cache.as_ref().read_dir()? { | ||
if let Ok(entry) = entry { | ||
afs::remove_dir_all(entry.path()).await?; | ||
} | ||
} | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.