Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tests in iroh sync and some cleanups #1305

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,26 @@ impl Dialer {
}

/// A [`TimerMap`] with an async method to wait for the next timer expiration.
#[derive(Debug)]
pub struct Timers<T> {
next: Option<(Instant, Pin<Box<Sleep>>)>,
map: TimerMap<T>,
}
impl<T> Timers<T> {
/// Create a new timer map
pub fn new() -> Self {

impl<T> Default for Timers<T> {
fn default() -> Self {
Self {
next: None,
map: TimerMap::default(),
}
}
}

impl<T> Timers<T> {
/// Create a new timer map
pub fn new() -> Self {
Self::default()
}

/// Insert a new entry at the specified instant
pub fn insert(&mut self, instant: Instant, item: T) {
Expand Down
1 change: 1 addition & 0 deletions iroh-gossip/src/proto/hyparview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum InEvent<PA> {
}

/// Output event for HyParView
#[derive(Debug)]
pub enum OutEvent<PA> {
SendMessage(PA, Message<PA>),
ScheduleTimer(Duration, Timer<PA>),
Expand Down
1 change: 1 addition & 0 deletions iroh-gossip/src/proto/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl<PA> From<InEvent<PA>> for InEventMapped<PA> {
/// This struct contains a map of [`topic::State`] for each topic that was joined. It mostly acts as
/// a forwarder of [`InEvent`]s to matching topic state. Each topic's state is completely
/// independent; thus the actual protocol logic lives with [`topic::State`].
#[derive(Debug)]
pub struct State<PA, R> {
me: PA,
me_data: PeerData,
Expand Down
2 changes: 1 addition & 1 deletion iroh-gossip/src/proto/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<PA: Clone> IO<PA> for VecDeque<OutEvent<PA>> {
}
}
/// Protocol configuration
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct Config {
/// Configuration for the swarm membership layer
pub membership: hyparview::Config,
Expand Down
1 change: 1 addition & 0 deletions iroh-gossip/src/proto/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ where
}

/// A [`BTreeMap`] with [`Instant`] as key. Allows to process expired items.
#[derive(Debug)]
pub struct TimerMap<T>(BTreeMap<Instant, Vec<T>>);

impl<T> TimerMap<T> {
Expand Down
32 changes: 11 additions & 21 deletions iroh-sync/src/ranger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ where
V: 'a;

/// Returns all items in the given range
fn get_range<'a>(&'a self, range: Range<K>, limit: Option<Range<K>>)
-> Self::RangeIterator<'a>;
fn get_range(&self, range: Range<K>, limit: Option<Range<K>>) -> Self::RangeIterator<'_>;
fn remove(&mut self, key: &K) -> Option<V>;

type AllIterator<'a>: Iterator<Item = (&'a K, &'a V)>
Expand Down Expand Up @@ -282,11 +281,7 @@ where
type RangeIterator<'a> = SimpleRangeIterator<'a, K, V>
where K: 'a, V: 'a;
/// Returns all items in the given range
fn get_range<'a>(
&'a self,
range: Range<K>,
limit: Option<Range<K>>,
) -> Self::RangeIterator<'a> {
fn get_range(&self, range: Range<K>, limit: Option<Range<K>>) -> Self::RangeIterator<'_> {
// TODO: this is not very efficient, optimize depending on data structure
let iter = self.data.iter();

Expand Down Expand Up @@ -401,12 +396,14 @@ where

/// Processes an incoming message and produces a response.
/// If terminated, returns `None`
pub fn process_message(&mut self, message: Message<K, V>) -> (Vec<K>, Option<Message<K, V>>) {
pub fn process_message<F>(&mut self, message: Message<K, V>, cb: F) -> Option<Message<K, V>>
where
F: Fn(K, V),
{
let mut out = Vec::new();

// TODO: can these allocs be avoided?
let mut items = Vec::new();
let mut inserted = Vec::new();
let mut fingerprints = Vec::new();
for part in message.parts {
match part {
Expand Down Expand Up @@ -440,7 +437,7 @@ where

// Store incoming values
for (k, v) in values {
inserted.push(k.clone());
cb(k.clone(), v.clone());
self.store.put(k, v);
}

Expand Down Expand Up @@ -547,9 +544,9 @@ where

// If we have any parts, return a message
if !out.is_empty() {
(inserted, Some(Message { parts: out }))
Some(Message { parts: out })
} else {
(inserted, None)
None
}
}

Expand Down Expand Up @@ -1101,9 +1098,9 @@ mod tests {
rounds += 1;
alice_to_bob.push(msg.clone());

if let Some(msg) = bob.process_message(msg) {
if let Some(msg) = bob.process_message(msg, |_, _| {}) {
bob_to_alice.push(msg.clone());
next_to_bob = alice.process_message(msg);
next_to_bob = alice.process_message(msg, |_, _| {});
}
}
let res = SyncResult {
Expand Down Expand Up @@ -1177,36 +1174,31 @@ mod tests {

let all: Vec<_> = store
.get_range(Range::new("", ""), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();
assert_eq!(&all, &set[..]);

let regular: Vec<_> = store
.get_range(("bee", "eel").into(), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();
assert_eq!(&regular, &set[..3]);

// empty start
let regular: Vec<_> = store
.get_range(("", "eel").into(), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();
assert_eq!(&regular, &set[..3]);

let regular: Vec<_> = store
.get_range(("cat", "hog").into(), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();
assert_eq!(&regular, &set[1..5]);

let excluded: Vec<_> = store
.get_range(("fox", "bee").into(), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();

Expand All @@ -1216,7 +1208,6 @@ mod tests {

let excluded: Vec<_> = store
.get_range(("fox", "doe").into(), None)
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();

Expand All @@ -1229,7 +1220,6 @@ mod tests {
// Limit
let all: Vec<_> = store
.get_range(("", "").into(), Some(("bee", "doe").into()))
.into_iter()
.map(|(k, v)| (*k, *v))
.collect();
assert_eq!(&all, &set[..2]);
Expand Down
96 changes: 52 additions & 44 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ pub enum InsertOrigin {
#[derive(derive_more::Debug, Clone)]
pub struct Replica {
inner: Arc<RwLock<InnerReplica>>,
#[debug("on_insert: [Box<dyn Fn>; {}]", "self.on_insert.len()")]
on_insert: Arc<RwLock<Vec<OnInsertCallback>>>,
}

#[derive(derive_more::Debug)]
struct InnerReplica {
namespace: Namespace,
peer: Peer<RecordIdentifier, SignedEntry, Store>,
#[debug("on_insert: [Box<dyn Fn>; {}]", "self.on_insert.len()")]
on_insert: Vec<OnInsertCallback>,
}

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -315,11 +315,11 @@ impl crate::ranger::Store<RecordIdentifier, SignedEntry> for Store {
}

type RangeIterator<'a> = RangeIterator<'a>;
fn get_range<'a>(
&'a self,
fn get_range(
&self,
range: Range<RecordIdentifier>,
limit: Option<Range<RecordIdentifier>>,
) -> Self::RangeIterator<'a> {
) -> Self::RangeIterator<'_> {
RangeIterator {
iter: self.records.iter(),
range: Some(range),
Expand Down Expand Up @@ -388,14 +388,14 @@ impl Replica {
inner: Arc::new(RwLock::new(InnerReplica {
namespace,
peer: Peer::default(),
on_insert: Default::default(),
})),
on_insert: Default::default(),
}
}

pub fn on_insert(&self, callback: OnInsertCallback) {
let mut inner = self.inner.write();
inner.on_insert.push(callback);
let mut on_insert = self.on_insert.write();
on_insert.push(callback);
}

// TODO: not horrible
Expand Down Expand Up @@ -456,11 +456,29 @@ impl Replica {
let entry = Entry::new(id.clone(), record);
let signed_entry = entry.sign(&inner.namespace, author);
inner.peer.put(id, signed_entry.clone());
for cb in &inner.on_insert {
drop(inner);
let on_insert = self.on_insert.read();
for cb in &*on_insert {
cb(InsertOrigin::Local, signed_entry.clone());
}
}

/// Hashes the given data and inserts it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make clear in the comment that the data is not stored/inserted anyhwere, only its hash. Also, maybe return the hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/// This does not store the content, just the record of it.
///
/// Returns the calculated hash.
pub fn hash_and_insert(
&self,
key: impl AsRef<[u8]>,
author: &Author,
data: impl AsRef<[u8]>,
) -> Hash {
let len = data.as_ref().len() as u64;
let hash = Hash::new(data);
self.insert(key, author, hash, len);
hash
}

pub fn id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
let inner = self.inner.read();
let id = RecordIdentifier::new(key, inner.namespace.id(), author.id());
Expand All @@ -470,9 +488,12 @@ impl Replica {
pub fn insert_remote_entry(&self, entry: SignedEntry) -> anyhow::Result<()> {
entry.verify()?;
let mut inner = self.inner.write();
inner.peer.put(entry.entry.id.clone(), entry.clone());
for cb in &inner.on_insert {
cb(InsertOrigin::Sync, entry.clone())
let id = entry.entry.id.clone();
inner.peer.put(id, entry.clone());
drop(inner);
let on_insert = self.on_insert.read();
for cb in &*on_insert {
cb(InsertOrigin::Sync, entry.clone());
}
Ok(())
}
Expand Down Expand Up @@ -511,14 +532,17 @@ impl Replica {
&self,
message: crate::ranger::Message<RecordIdentifier, SignedEntry>,
) -> Option<crate::ranger::Message<RecordIdentifier, SignedEntry>> {
let (inserted_keys, reply) = self.inner.write().peer.process_message(message);
let inner = self.inner.read();
for key in inserted_keys {
let entry = inner.peer.get(&key).unwrap();
for cb in &inner.on_insert {
cb(InsertOrigin::Sync, entry.clone())
}
}
let reply = self
.inner
.write()
.peer
.process_message(message, |_key, entry| {
let on_insert = self.on_insert.read();
for cb in &*on_insert {
cb(InsertOrigin::Sync, entry.clone());
}
});

reply
}

Expand Down Expand Up @@ -817,7 +841,7 @@ mod tests {

let my_replica = Replica::new(myspace);
for i in 0..10 {
my_replica.insert(format!("/{i}"), &alice, format!("{i}: hello from alice"));
my_replica.hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"));
}

for i in 0..10 {
Expand All @@ -828,33 +852,16 @@ mod tests {
}

// Test multiple records for the same key
my_replica.insert("/cool/path", &alice, "round 1");
let entry = my_replica.get_latest("/cool/path", alice.id()).unwrap();
let content = my_replica
.get_content(entry.entry().record().content_hash())
.unwrap();
assert_eq!(&content[..], b"round 1");
my_replica.hash_and_insert("/cool/path", &alice, "round 1");
let _entry = my_replica.get_latest("/cool/path", alice.id()).unwrap();

// Second

my_replica.insert("/cool/path", &alice, "round 2");
let entry = my_replica.get_latest("/cool/path", alice.id()).unwrap();
let content = my_replica
.get_content(entry.entry().record().content_hash())
.unwrap();
assert_eq!(&content[..], b"round 2");
my_replica.hash_and_insert("/cool/path", &alice, "round 2");
let _entry = my_replica.get_latest("/cool/path", alice.id()).unwrap();

// Get All
let entries: Vec<_> = my_replica.get_all("/cool/path", alice.id()).collect();
assert_eq!(entries.len(), 2);
let content = my_replica
.get_content(entries[0].entry().record().content_hash())
.unwrap();
assert_eq!(&content[..], b"round 1");
let content = my_replica
.get_content(entries[1].entry().record().content_hash())
.unwrap();
assert_eq!(&content[..], b"round 2");
}

#[test]
Expand Down Expand Up @@ -928,12 +935,12 @@ mod tests {
let myspace = Namespace::new(&mut rng);
let mut alice = Replica::new(myspace.clone());
for el in &alice_set {
alice.insert(el, &author, el.as_bytes());
alice.hash_and_insert(el, &author, el.as_bytes());
}

let mut bob = Replica::new(myspace);
for el in &bob_set {
bob.insert(el, &author, el.as_bytes());
bob.hash_and_insert(el, &author, el.as_bytes());
}

sync(&author, &mut alice, &mut bob, &alice_set, &bob_set);
Expand All @@ -952,6 +959,7 @@ mod tests {
while let Some(msg) = next_to_bob.take() {
assert!(rounds < 100, "too many rounds");
rounds += 1;
println!("round {}", rounds);
if let Some(msg) = bob.sync_process_message(msg) {
next_to_bob = alice.sync_process_message(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ async fn handle_command(
for (_id, entry) in entries {
println!("{}", fmt_entry(&entry));
if print_content {
println!("{}", fmt_content(&doc, &entry).await);
println!("{}", fmt_content(doc, &entry).await);
}
}
}
Expand Down
Loading