Skip to content

experiment: move rpc authors to in memory #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ use crate::{
const ACTION_CAP: usize = 1024;
pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);

/// Import an author action.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ImportAuthorAction {
/// The author to import.
pub author: Author,
}

#[derive(derive_more::Debug, derive_more::Display)]
enum Action {
#[display("NewAuthor")]
ImportAuthor {
author: Author,
action: ImportAuthorAction,
#[debug("reply")]
reply: oneshot::Sender<Result<AuthorId>>,
},
Expand Down Expand Up @@ -221,7 +228,7 @@ struct OpenReplica {
/// waiting for the actor to finish happens in an async context, and therefore that the final
/// [`SyncHandle::drop`] will not block.
#[derive(Debug, Clone)]
pub struct SyncHandle {
pub(crate) struct SyncHandle {
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
}
Expand Down Expand Up @@ -500,9 +507,20 @@ impl SyncHandle {
self.send(Action::ListReplicas { reply }).await
}

/// Imports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
self.import_author_action(ImportAuthorAction { author })
.await
}

pub(crate) async fn import_author_action(
&self,
action: ImportAuthorAction,
) -> Result<AuthorId> {
let (reply, rx) = oneshot::channel();
self.send(Action::ImportAuthor { author, reply }).await?;
self.send(Action::ImportAuthor { action, reply }).await?;
rx.await?
}

Expand Down Expand Up @@ -663,9 +681,9 @@ impl Actor {
Action::Shutdown { .. } => {
unreachable!("Shutdown is handled in run()")
}
Action::ImportAuthor { author, reply } => {
let id = author.id();
send_reply(reply, self.store.import_author(author).map(|_| id))
Action::ImportAuthor { action, reply } => {
let id = action.author.id();
send_reply(reply, self.store.import_author(action.author).map(|_| id))
}
Action::ExportAuthor { author, reply } => {
send_reply(reply, self.store.get_author(&author))
Expand Down
119 changes: 102 additions & 17 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@
//!
//! [`crate::Replica`] is also called documents here.

use std::{
io,
path::PathBuf,
str::FromStr,
sync::{Arc, RwLock},
};
use std::{io, path::PathBuf, str::FromStr, sync::Arc};

use anyhow::{bail, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use futures_lite::{Stream, StreamExt};
use iroh::{key::PublicKey, Endpoint, NodeAddr};
use iroh_blobs::{
Expand All @@ -18,7 +13,7 @@ use iroh_blobs::{
};
use iroh_gossip::net::Gossip;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio_util::task::AbortOnDropHandle;
use tracing::{error, error_span, Instrument};

Expand All @@ -45,11 +40,11 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;
#[derive(derive_more::Debug, Clone)]
pub struct Engine<D> {
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
pub(crate) endpoint: Endpoint,
/// Handle to the actor thread.
pub sync: SyncHandle,
pub(crate) sync: SyncHandle,
/// The persistent default author for this engine.
pub default_author: Arc<DefaultAuthor>,
default_author: Arc<DefaultAuthor>,
to_live_actor: mpsc::Sender<ToLiveActor>,
#[allow(dead_code)]
actor_handle: Arc<AbortOnDropHandle<()>>,
Expand Down Expand Up @@ -252,6 +247,93 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
pub fn local_pool_handle(&self) -> &LocalPoolHandle {
&self.local_pool_handle
}

/// Authors API.
pub fn authors(&self) -> Authors {
Authors {
sync: self.sync.clone(),
default_author: self.default_author.clone(),
}
}
}

/// Authors client
#[derive(Debug, Clone)]
pub struct Authors {
sync: SyncHandle,
default_author: Arc<DefaultAuthor>,
}

impl Authors {
/// Creates a new document author.
///
/// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
/// again.
///
/// If you need only a single author, use [`Self::default`].
pub async fn create(&self) -> Result<AuthorId> {
let mut rng = rand::rngs::OsRng::default();
let author = Author::new(&mut rng);
self.sync.import_author(author).await
}

/// Returns the default document author of this node.
///
/// On persistent nodes, the author is created on first start and its public key is saved
/// in the data directory.
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> AuthorId {
self.default_author.get().await
}

/// Sets the node-wide default author.
///
/// If the author does not exist, an error is returned.
///
/// On a persistent node, the author id will be saved to a file in the data directory and
/// reloaded after a restart.
pub async fn set_default(&self, author_id: AuthorId) -> Result<()> {
self.default_author.set(author_id, &self.sync).await
}

/// Lists document authors for which we have a secret key.
///
/// It's only possible to create writes from authors that we have the secret key of.
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
let (tx, rx) = async_channel::bounded(64);
self.sync.list_authors(tx).await?;

Ok(rx)
}

/// Exports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn export(&self, author: AuthorId) -> Result<Option<Author>> {
self.sync.export_author(author).await
}

/// Imports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn import(&self, author: Author) -> Result<AuthorId> {
self.sync.import_author(author).await
}

/// Deletes the given author by id.
///
/// Warning: This permanently removes this author.
///
/// Returns an error if attempting to delete the default author.
pub async fn delete(&self, author: AuthorId) -> Result<()> {
let default_author = self.default().await;
ensure!(
author != default_author,
"Deleting the default author is not supported"
);
self.sync.delete_author(author).await
}
}

/// Converts an [`EntryStatus`] into a ['ContentStatus'].
Expand Down Expand Up @@ -359,7 +441,7 @@ impl DefaultAuthorStorage {
///
/// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
/// store.
pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
pub(crate) async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
match self {
Self::Mem => {
let author = Author::new(&mut rand::thread_rng());
Expand Down Expand Up @@ -432,7 +514,10 @@ impl DefaultAuthor {
/// Load the default author from storage.
///
/// If the storage is empty creates a new author and persists it.
pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
pub(crate) async fn load(
storage: DefaultAuthorStorage,
docs_store: &SyncHandle,
) -> Result<Self> {
let value = storage.load(docs_store).await?;
Ok(Self {
value: RwLock::new(value),
Expand All @@ -441,17 +526,17 @@ impl DefaultAuthor {
}

/// Get the current default author.
pub fn get(&self) -> AuthorId {
*self.value.read().unwrap()
pub async fn get(&self) -> AuthorId {
*self.value.read().await
}

/// Set the default author.
pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
pub(crate) async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
if docs_store.export_author(author_id).await?.is_none() {
bail!("The author does not exist");
}
self.storage.persist(author_id).await?;
*self.value.write().unwrap() = author_id;
*self.value.write().await = author_id;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub const ALPN: &[u8] = b"/iroh-sync/1";
mod codec;

/// Connect to a peer and sync a replica
pub async fn connect_and_sync(
pub(crate) async fn connect_and_sync(
endpoint: &Endpoint,
sync: &SyncHandle,
namespace: NamespaceId,
Expand Down Expand Up @@ -104,7 +104,7 @@ pub enum AcceptOutcome {
}

/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
pub(crate) async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: iroh::endpoint::Connecting,
accept_cb: F,
Expand Down
8 changes: 0 additions & 8 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
SetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_set_download_policy).await,
GetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_get_download_policy).await,
GetSyncPeers(msg) => chan.rpc(msg, this, Self::doc_get_sync_peers).await,

AuthorList(msg) => chan.server_streaming(msg, this, Self::author_list).await,
AuthorCreate(msg) => chan.rpc(msg, this, Self::author_create).await,
AuthorImport(msg) => chan.rpc(msg, this, Self::author_import).await,
AuthorExport(msg) => chan.rpc(msg, this, Self::author_export).await,
AuthorDelete(msg) => chan.rpc(msg, this, Self::author_delete).await,
AuthorGetDefault(msg) => chan.rpc(msg, this, Self::author_default).await,
AuthorSetDefault(msg) => chan.rpc(msg, this, Self::author_set_default).await,
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use anyhow::Result;
use futures_util::{Stream, StreamExt};

pub mod authors;
pub mod docs;

fn flatten<T, E1, E2>(
Expand Down
10 changes: 2 additions & 8 deletions src/rpc/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,7 @@ use quic_rpc::{client::BoxedConnector, Connector};
use super::flatten;
#[doc(inline)]
pub use crate::engine::{Origin, SyncEvent, SyncReason};
use crate::{
rpc::proto::{
AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest,
AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService,
},
Author, AuthorId,
};
use crate::{actor::ImportAuthorAction, rpc::proto::RpcService, Author, AuthorId};

/// Iroh docs client.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -85,7 +79,7 @@ impl<C: Connector<RpcService>> Client<C> {
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn import(&self, author: Author) -> Result<()> {
self.rpc.rpc(AuthorImportRequest { author }).await??;
self.rpc.rpc(ImportAuthorAction { author }).await??;
Ok(())
}

Expand Down
7 changes: 1 addition & 6 deletions src/rpc/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use quic_rpc::{
};
use serde::{Deserialize, Serialize};

use super::{authors, flatten};
use super::flatten;
use crate::{
actor::OpenState,
rpc::proto::{
Expand Down Expand Up @@ -56,11 +56,6 @@ impl<C: Connector<RpcService>> Client<C> {
Self { rpc }
}

/// Returns an authors client.
pub fn authors(&self) -> authors::Client<C> {
authors::Client::new(self.rpc.clone())
}

/// Creates a client.
pub async fn create(&self) -> Result<Doc<C>> {
let res = self.rpc.rpc(CreateRequest {}).await??;
Expand Down
Loading
Loading