Skip to content

Commit 2d3a901

Browse files
committed
Add some simple smoke tests for tags
1 parent 83998d7 commit 2d3a901

File tree

8 files changed

+219
-39
lines changed

8 files changed

+219
-39
lines changed

src/hash.rs

+6
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,12 @@ pub struct HashAndFormat {
256256
pub format: BlobFormat,
257257
}
258258

259+
impl From<Hash> for HashAndFormat {
260+
fn from(hash: Hash) -> Self {
261+
Self::raw(hash)
262+
}
263+
}
264+
259265
#[cfg(feature = "redb")]
260266
mod redb_support {
261267
use postcard::experimental::max_size::MaxSize;

src/rpc.rs

+1-11
Original file line numberDiff line numberDiff line change
@@ -313,20 +313,10 @@ impl<D: crate::store::Store> Handler<D> {
313313
tracing::info!("blob_list_tags");
314314
let blobs = self;
315315
Gen::new(|co| async move {
316-
let tags = blobs.store().tags().await.unwrap();
316+
let tags = blobs.store().tags(msg.from, msg.to).await.unwrap();
317317
#[allow(clippy::manual_flatten)]
318318
for item in tags {
319319
if let Ok((name, HashAndFormat { hash, format })) = item {
320-
if let Some(from) = msg.from.as_ref() {
321-
if &name < from {
322-
continue;
323-
}
324-
}
325-
if let Some(to) = msg.to.as_ref() {
326-
if &name >= to {
327-
break;
328-
}
329-
}
330320
if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) {
331321
co.yield_(TagInfo { name, hash, format }).await;
332322
}

src/rpc/client/tags.rs

+42-5
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,33 @@ pub struct DeleteOptions {
144144

145145
impl DeleteOptions {
146146
/// Delete a single tag
147-
pub fn single(name: Tag) -> Self {
147+
pub fn single(name: &[u8]) -> Self {
148+
let name = Tag::from(name);
148149
Self {
149150
to: Some(name.successor()),
150151
from: Some(name),
151152
}
152153
}
154+
155+
/// Delete a range of tags
156+
pub fn range<R, E>(range: R) -> Self
157+
where
158+
R: RangeBounds<E>,
159+
E: AsRef<[u8]>,
160+
{
161+
let (from, to) = tags_from_range(range);
162+
Self { from, to }
163+
}
164+
165+
/// Delete tags with a prefix
166+
pub fn prefix(prefix: &[u8]) -> Self {
167+
let from = Tag::from(prefix);
168+
let to = from.next_prefix();
169+
Self {
170+
from: Some(from),
171+
to,
172+
}
173+
}
153174
}
154175

155176
/// A client that uses the memory connector.
@@ -209,7 +230,7 @@ where
209230
self.list_with_opts(ListOptions::range(range)).await
210231
}
211232

212-
/// Lists all tags.
233+
/// Lists all tags with the given prefix.
213234
pub async fn list_prefix(
214235
&self,
215236
prefix: impl AsRef<[u8]>,
@@ -235,13 +256,29 @@ where
235256
}
236257

237258
/// Deletes a tag.
238-
pub async fn delete(&self, name: Tag) -> Result<()> {
239-
self.delete_with_opts(DeleteOptions::single(name)).await
259+
pub async fn delete(&self, name: impl AsRef<[u8]>) -> Result<()> {
260+
self.delete_with_opts(DeleteOptions::single(name.as_ref()))
261+
.await
262+
}
263+
264+
/// Deletes a range of tags.
265+
pub async fn delete_range<R, E>(&self, range: R) -> Result<()>
266+
where
267+
R: RangeBounds<E>,
268+
E: AsRef<[u8]>,
269+
{
270+
self.delete_with_opts(DeleteOptions::range(range)).await
271+
}
272+
273+
/// Lists all tags with the given prefix.
274+
pub async fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<()> {
275+
self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref()))
276+
.await
240277
}
241278
}
242279

243280
/// Information about a tag.
244-
#[derive(Debug, Serialize, Deserialize)]
281+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
245282
pub struct TagInfo {
246283
/// Name of the tag
247284
pub name: Tag,

src/store/fs.rs

+22-18
Original file line numberDiff line numberDiff line change
@@ -601,8 +601,8 @@ pub(crate) enum ActorMessage {
601601
},
602602
/// Bulk query method: get the entire tags table
603603
Tags {
604-
#[debug(skip)]
605-
filter: FilterPredicate<Tag, HashAndFormat>,
604+
from: Option<Tag>,
605+
to: Option<Tag>,
606606
#[allow(clippy::type_complexity)]
607607
tx: oneshot::Sender<
608608
ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
@@ -863,11 +863,13 @@ impl StoreInner {
863863
Ok(res)
864864
}
865865

866-
async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
866+
async fn tags(
867+
&self,
868+
from: Option<Tag>,
869+
to: Option<Tag>,
870+
) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
867871
let (tx, rx) = oneshot::channel();
868-
let filter: FilterPredicate<Tag, HashAndFormat> =
869-
Box::new(|_i, k, v| Some((k.value(), v.value())));
870-
self.tx.send(ActorMessage::Tags { filter, tx }).await?;
872+
self.tx.send(ActorMessage::Tags { from, to, tx }).await?;
871873
let tags = rx.await?;
872874
// transform the internal error type into io::Error
873875
let tags = tags?
@@ -1299,8 +1301,12 @@ impl super::ReadableStore for Store {
12991301
Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
13001302
}
13011303

1302-
async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1303-
Ok(Box::new(self.0.tags().await?.into_iter()))
1304+
async fn tags(
1305+
&self,
1306+
from: Option<Tag>,
1307+
to: Option<Tag>,
1308+
) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1309+
Ok(Box::new(self.0.tags(from, to).await?.into_iter()))
13041310
}
13051311

13061312
fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
@@ -1985,23 +1991,21 @@ impl ActorState {
19851991
fn tags(
19861992
&mut self,
19871993
tables: &impl ReadableTables,
1988-
filter: FilterPredicate<Tag, HashAndFormat>,
1994+
from: Option<Tag>,
1995+
to: Option<Tag>,
19891996
) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
19901997
let mut res = Vec::new();
1991-
let mut index = 0u64;
1992-
#[allow(clippy::explicit_counter_loop)]
1993-
for item in tables.tags().iter()? {
1998+
let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
1999+
let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2000+
for item in tables.tags().range((from, to))? {
19942001
match item {
19952002
Ok((k, v)) => {
1996-
if let Some(item) = filter(index, k, v) {
1997-
res.push(Ok(item));
1998-
}
2003+
res.push(Ok((k.value(), v.value())));
19992004
}
20002005
Err(e) => {
20012006
res.push(Err(e));
20022007
}
20032008
}
2004-
index += 1;
20052009
}
20062010
Ok(res)
20072011
}
@@ -2342,8 +2346,8 @@ impl ActorState {
23422346
let res = self.blobs(tables, filter);
23432347
tx.send(res).ok();
23442348
}
2345-
ActorMessage::Tags { filter, tx } => {
2346-
let res = self.tags(tables, filter);
2349+
ActorMessage::Tags { from, to, tx } => {
2350+
let res = self.tags(tables, from, to);
23472351
tx.send(res).ok();
23482352
}
23492353
ActorMessage::GcStart { tx } => {

src/store/mem.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -446,10 +446,30 @@ impl ReadableStore for Store {
446446
))
447447
}
448448

449-
async fn tags(&self) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
449+
async fn tags(
450+
&self,
451+
from: Option<Tag>,
452+
to: Option<Tag>,
453+
) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
450454
#[allow(clippy::mutable_key_type)]
451455
let tags = self.read_lock().tags.clone();
452-
Ok(Box::new(tags.into_iter().map(Ok)))
456+
let tags = tags
457+
.into_iter()
458+
.filter(move |(tag, _)| {
459+
if let Some(from) = &from {
460+
if tag < from {
461+
return false;
462+
}
463+
}
464+
if let Some(to) = &to {
465+
if tag >= to {
466+
return false;
467+
}
468+
}
469+
true
470+
})
471+
.map(Ok);
472+
Ok(Box::new(tags))
453473
}
454474

455475
fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> {

src/store/readonly_mem.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,11 @@ impl ReadableStore for Store {
232232
))
233233
}
234234

235-
async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
235+
async fn tags(
236+
&self,
237+
_from: Option<Tag>,
238+
_to: Option<Tag>,
239+
) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
236240
Ok(Box::new(std::iter::empty()))
237241
}
238242

src/store/traits.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,11 @@ pub trait ReadableStore: Map {
262262
/// been imported, and hash sequences that have been created internally.
263263
fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
264264
/// list all tags (collections or other explicitly added things) in the database
265-
fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
265+
fn tags(
266+
&self,
267+
from: Option<Tag>,
268+
to: Option<Tag>,
269+
) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
266270

267271
/// Temp tags
268272
fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
@@ -635,7 +639,7 @@ pub(super) async fn gc_mark_task<'a>(
635639
}
636640
let mut roots = BTreeSet::new();
637641
debug!("traversing tags");
638-
for item in store.tags().await? {
642+
for item in store.tags(None, None).await? {
639643
let (name, haf) = item?;
640644
debug!("adding root {:?} {:?}", name, haf);
641645
roots.insert(haf);

tests/tags.rs

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#![cfg(all(feature = "net_protocol", feature = "rpc"))]
2+
use futures_lite::StreamExt;
3+
use futures_util::Stream;
4+
use iroh::Endpoint;
5+
use iroh_blobs::{
6+
net_protocol::Blobs,
7+
rpc::{
8+
client::tags::{self, TagInfo},
9+
proto::RpcService,
10+
},
11+
BlobFormat, Hash,
12+
};
13+
use testresult::TestResult;
14+
15+
async fn to_vec<T>(stream: impl Stream<Item = anyhow::Result<T>>) -> anyhow::Result<Vec<T>> {
16+
let res = stream.collect::<Vec<_>>().await;
17+
res.into_iter().collect::<anyhow::Result<Vec<_>>>()
18+
}
19+
20+
fn expected(tags: impl IntoIterator<Item = &'static str>) -> Vec<TagInfo> {
21+
tags.into_iter()
22+
.map(|tag| TagInfo {
23+
name: tag.into(),
24+
hash: Hash::new(tag),
25+
format: BlobFormat::Raw,
26+
})
27+
.collect()
28+
}
29+
30+
async fn tags_smoke<C: quic_rpc::Connector<RpcService>>(tags: tags::Client<C>) -> TestResult<()> {
31+
tags.set("a", Hash::new("a")).await?;
32+
tags.set("b", Hash::new("b")).await?;
33+
tags.set("c", Hash::new("c")).await?;
34+
tags.set("d", Hash::new("d")).await?;
35+
tags.set("e", Hash::new("e")).await?;
36+
let stream = tags.list().await?;
37+
let res = to_vec(stream).await?;
38+
assert_eq!(res, expected(["a", "b", "c", "d", "e"]));
39+
40+
let stream = tags.list_range("b".."d").await?;
41+
let res = to_vec(stream).await?;
42+
assert_eq!(res, expected(["b", "c"]));
43+
44+
let stream = tags.list_range("b"..).await?;
45+
let res = to_vec(stream).await?;
46+
assert_eq!(res, expected(["b", "c", "d", "e"]));
47+
48+
let stream = tags.list_range(.."d").await?;
49+
let res = to_vec(stream).await?;
50+
assert_eq!(res, expected(["a", "b", "c"]));
51+
52+
let stream = tags.list_range(..="d").await?;
53+
let res = to_vec(stream).await?;
54+
assert_eq!(res, expected(["a", "b", "c", "d"]));
55+
56+
tags.delete_range("b"..).await?;
57+
let stream = tags.list().await?;
58+
let res = to_vec(stream).await?;
59+
assert_eq!(res, expected(["a"]));
60+
61+
tags.delete_range(..="a").await?;
62+
let stream = tags.list().await?;
63+
let res = to_vec(stream).await?;
64+
assert_eq!(res, expected([]));
65+
66+
tags.set("a", Hash::new("a")).await?;
67+
tags.set("aa", Hash::new("aa")).await?;
68+
tags.set("aaa", Hash::new("aaa")).await?;
69+
tags.set("aab", Hash::new("aab")).await?;
70+
tags.set("b", Hash::new("b")).await?;
71+
72+
let stream = tags.list_prefix("aa").await?;
73+
let res = to_vec(stream).await?;
74+
assert_eq!(res, expected(["aa", "aaa", "aab"]));
75+
76+
tags.delete_prefix("aa").await?;
77+
let stream = tags.list().await?;
78+
let res = to_vec(stream).await?;
79+
assert_eq!(res, expected(["a", "b"]));
80+
81+
tags.delete_prefix("").await?;
82+
let stream = tags.list().await?;
83+
let res = to_vec(stream).await?;
84+
assert_eq!(res, expected([]));
85+
86+
tags.set("a", Hash::new("a")).await?;
87+
tags.set("b", Hash::new("b")).await?;
88+
tags.set("c", Hash::new("c")).await?;
89+
90+
tags.delete("b").await?;
91+
let stream = tags.list().await?;
92+
let res = to_vec(stream).await?;
93+
assert_eq!(res, expected(["a", "c"]));
94+
95+
Ok(())
96+
}
97+
98+
#[tokio::test]
99+
async fn tags_smoke_mem() -> TestResult<()> {
100+
let endpoint = Endpoint::builder().bind().await?;
101+
let blobs = Blobs::memory().build(&endpoint);
102+
let client = blobs.client();
103+
tags_smoke(client.tags()).await
104+
}
105+
106+
#[tokio::test]
107+
async fn tags_smoke_fs() -> TestResult<()> {
108+
let td = tempfile::tempdir()?;
109+
let endpoint = Endpoint::builder().bind().await?;
110+
let blobs = Blobs::persistent(td.path().join("blobs.db"))
111+
.await?
112+
.build(&endpoint);
113+
let client = blobs.client();
114+
tags_smoke(client.tags()).await
115+
}

0 commit comments

Comments
 (0)