diff --git a/iroh/examples/sync.rs b/iroh/examples/sync.rs index 147280bb82..5df2db1772 100644 --- a/iroh/examples/sync.rs +++ b/iroh/examples/sync.rs @@ -208,6 +208,20 @@ async fn run(args: Args) -> anyhow::Result<()> { // process commands in a loop println!("> ready to accept commands"); println!("> type `help` for a list of commands"); + + let current_watch: Arc>> = + Arc::new(std::sync::Mutex::new(None)); + let watch = current_watch.clone(); + doc.on_insert(Box::new(move |_origin, entry| { + let matcher = watch.lock().unwrap(); + if let Some(matcher) = &*matcher { + let key = entry.entry().id().key(); + if key.starts_with(matcher.as_bytes()) { + println!("change: {}", fmt_entry(&entry)); + } + } + })); + loop { // wait for a command from the input repl thread let Some((cmd, to_repl_tx)) = cmd_rx.recv().await else { @@ -225,7 +239,7 @@ async fn run(args: Args) -> anyhow::Result<()> { _ = tokio::signal::ctrl_c() => { println!("> aborted"); } - res = handle_command(cmd, &doc, &our_ticket, &log_filter) => if let Err(err) = res { + res = handle_command(cmd, &doc, &our_ticket, &log_filter, ¤t_watch) => if let Err(err) = res { println!("> error: {err}"); }, }; @@ -249,6 +263,7 @@ async fn handle_command( doc: &Doc, ticket: &Ticket, log_filter: &LogLevelReload, + current_watch: &Arc>>, ) -> anyhow::Result<()> { match cmd { Cmd::Set { key, value } => { @@ -263,6 +278,18 @@ async fn handle_command( } } } + Cmd::Watch { key } => { + println!("watching key: '{key}'"); + current_watch.lock().unwrap().replace(key); + } + Cmd::WatchCancel => match current_watch.lock().unwrap().take() { + Some(key) => { + println!("canceled watching key: '{key}'"); + } + None => { + println!("no watch active"); + } + }, Cmd::Ls { prefix } => { let entries = match prefix { None => doc.replica().all(), @@ -325,6 +352,13 @@ pub enum Cmd { #[clap(verbatim_doc_comment)] directive: String, }, + /// Watch for changes. + Watch { + /// The key to watch. + key: String, + }, + /// Cancels any running watch command. + WatchCancel, /// Quit Exit, } diff --git a/iroh/src/sync/content.rs b/iroh/src/sync/content.rs index 3f396e37c6..3d48424749 100644 --- a/iroh/src/sync/content.rs +++ b/iroh/src/sync/content.rs @@ -16,7 +16,8 @@ use iroh_gossip::net::util::Dialer; use iroh_io::{AsyncSliceReader, AsyncSliceReaderExt}; use iroh_net::{tls::PeerId, MagicEndpoint}; use iroh_sync::sync::{ - Author, InsertOrigin, Namespace, NamespaceId, Replica, ReplicaStore, SignedEntry, + Author, InsertOrigin, Namespace, NamespaceId, OnInsertCallback, Replica, ReplicaStore, + SignedEntry, }; use tokio::{ io::AsyncRead, @@ -127,6 +128,10 @@ impl Doc { doc } + pub fn on_insert(&self, callback: OnInsertCallback) { + self.replica.on_insert(callback); + } + pub fn replica(&self) -> &Replica { &self.replica }