diff --git a/CHANGELOG.md b/CHANGELOG.md index aad9e2278..3d787d116 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ By far most changes relate to `atomic-server`, so if not specified, assume the c **Changes to JS assets (including the front-end and JS libraries) are not shown here**, but in [`/browser/CHANGELOG`](/browser/CHANGELOG.md). See [STATUS.md](server/STATUS.md) to learn more about which features will remain stable. +## [UNRELEASED] + +- Speed up Commits by bundling DB transactions #297 +- Introduce `Db::apply_transaction` and `Storelike::apply_commit` +- Deprecate `add_atom_to_index` and `remove_atom_from_index` as public methods. + ## [v0.39.0] - 2024-08-21 - The download endpoint can now optimize images on the fly. This is controlled via query parameters. #257 diff --git a/lib/benches/benchmarks.rs b/lib/benches/benchmarks.rs index ab2a84a48..04b3bf0f0 100644 --- a/lib/benches/benchmarks.rs +++ b/lib/benches/benchmarks.rs @@ -23,14 +23,6 @@ fn random_resource(atom: &Atom) -> Resource { fn criterion_benchmark(c: &mut Criterion) { let store = Db::init_temp("bench").unwrap(); - c.bench_function("add_atom_to_index", |b| { - b.iter(|| { - let atom = random_atom(); - let resource = random_resource(&random_atom()); - store.add_atom_to_index(&atom, &resource).unwrap(); - }) - }); - c.bench_function("add_resource", |b| { b.iter(|| { let resource = random_resource(&random_atom()); diff --git a/lib/src/authentication.rs b/lib/src/authentication.rs index 5c1572f96..407fc4ee2 100644 --- a/lib/src/authentication.rs +++ b/lib/src/authentication.rs @@ -2,9 +2,10 @@ use crate::{ agents::{decode_base64, ForAgent}, - commit::check_timestamp, errors::AtomicResult, - urls, Storelike, + urls, + utils::check_timestamp_in_past, + Storelike, }; /// Set of values extracted from the request. @@ -47,6 +48,8 @@ pub fn check_auth_signature(subject: &str, auth_header: &AuthValues) -> AtomicRe Ok(()) } +const ACCEPTABLE_TIME_DIFFERENCE: i64 = 10000; + /// Get the Agent's subject from [AuthValues] /// Checks if the auth headers are correct, whether signature matches the public key, whether the timestamp is valid. /// by default, returns the public agent @@ -60,7 +63,7 @@ pub fn get_agent_from_auth_values_and_check( check_auth_signature(&auth_vals.requested_subject, &auth_vals) .map_err(|e| format!("Error checking authentication headers. {}", e))?; // check if the timestamp is valid - check_timestamp(auth_vals.timestamp)?; + check_timestamp_in_past(auth_vals.timestamp, ACCEPTABLE_TIME_DIFFERENCE)?; // check if the public key belongs to the agent let found_public_key = store.get_value(&auth_vals.agent_subject, urls::PUBLIC_KEY)?; if found_public_key.to_string() != auth_vals.public_key { diff --git a/lib/src/commit.rs b/lib/src/commit.rs index 30cc04cf9..9470bdf0d 100644 --- a/lib/src/commit.rs +++ b/lib/src/commit.rs @@ -8,7 +8,6 @@ use crate::{ agents::{decode_base64, encode_base64}, datatype::DataType, errors::AtomicResult, - hierarchy, resources::PropVals, urls, values::SubResource, @@ -20,10 +19,23 @@ use crate::{ /// When deleting a resource, the `resource_new` field is None. #[derive(Clone, Debug)] pub struct CommitResponse { + pub commit: Commit, pub commit_resource: Resource, pub resource_new: Option, pub resource_old: Option, - pub commit_struct: Commit, + pub add_atoms: Vec, + pub remove_atoms: Vec, +} + +pub struct CommitApplied { + /// The resource before the Commit was applied + pub resource_old: Resource, + /// The modified resources where the commit has been applied to + pub resource_new: Resource, + /// The atoms that should be added to the store (for updating indexes) + pub add_atoms: Vec, + /// The atoms that should be removed from the store (for updating indexes) + pub remove_atoms: Vec, } #[derive(Clone, Debug)] @@ -47,6 +59,20 @@ pub struct CommitOpts { pub validate_for_agent: Option, } +impl CommitOpts { + pub fn no_validations_no_index() -> Self { + Self { + validate_schema: false, + validate_signature: false, + validate_timestamp: false, + validate_rights: false, + validate_previous_commit: false, + update_index: false, + validate_for_agent: None, + } + } +} + /// A Commit is a set of changes to a Resource. /// Use CommitBuilder if you're programmatically constructing a Delta. #[derive(Clone, Debug, Serialize)] @@ -84,95 +110,128 @@ pub struct Commit { } impl Commit { - /// Apply a single signed Commit to the store. - /// Creates, edits or destroys a resource. - /// Allows for control over which validations should be performed. - /// Returns the generated Commit, the old Resource and the new Resource. - #[tracing::instrument(skip(store))] - pub fn apply_opts( + /// Throws an error if the parent is set to itself + pub fn check_for_circular_parents(&self) -> AtomicResult<()> { + // Check if the set hashset has a parent property and if it matches with this subject. + if let Some(set) = self.set.clone() { + if let Some(parent) = set.get(urls::PARENT) { + if parent.to_string() == self.subject { + return Err("Circular parent reference".into()); + } + } + } + + // TODO: Check for circular parents by going up the parent tree. + Ok(()) + } + + pub fn validate_previous_commit( &self, - store: &impl Storelike, + resource_old: &Resource, + subject_url: &str, + ) -> AtomicResult<()> { + let commit = self; + if let Ok(last_commit_val) = resource_old.get(urls::LAST_COMMIT) { + let last_commit = last_commit_val.to_string(); + + if let Some(prev_commit) = commit.previous_commit.clone() { + // TODO: try auto merge + if last_commit != prev_commit { + return Err(format!( + "previousCommit mismatch. Had lastCommit '{}' in Resource {}, but got in Commit '{}'. Perhaps you created the Commit based on an outdated version of the Resource.", + last_commit, subject_url, prev_commit, + ) + .into()); + } + } else { + return Err(format!("Missing `previousCommit`. Resource {} already exists, and it has a `lastCommit` field, so a `previousCommit` field is required in your Commit.", commit.subject).into()); + } + } else { + // If there is no lastCommit in the Resource, we'll accept the Commit. + tracing::warn!("No `lastCommit` in Resource. This can be a bug, or it could be that the resource was never properly updated."); + } + Ok(()) + } + + /// Check if the Commit's signature matches the signer's public key. + pub fn validate_signature(&self, store: &impl Storelike) -> AtomicResult<()> { + let commit = self; + let signature = match commit.signature.as_ref() { + Some(sig) => sig, + None => return Err("No signature set".into()), + }; + let pubkey_b64 = store + .get_resource(&commit.signer)? + .get(urls::PUBLIC_KEY)? + .to_string(); + let agent_pubkey = decode_base64(&pubkey_b64)?; + let stringified_commit = commit.serialize_deterministically_json_ad(store)?; + let peer_public_key = + ring::signature::UnparsedPublicKey::new(&ring::signature::ED25519, agent_pubkey); + let signature_bytes = decode_base64(signature)?; + peer_public_key + .verify(stringified_commit.as_bytes(), &signature_bytes) + .map_err(|_e| { + format!( + "Incorrect signature for Commit. This could be due to an error during signing or serialization of the commit. Compare this to the serialized commit in the client: {}", + stringified_commit, + ) + })?; + Ok(()) + } + + /// Performs the checks specified in CommitOpts and constructs a new Resource. + /// Warning: Does not save the new resource to the Store - doet not delete if it `destroy: true`. + /// Use [Storelike::apply_commit] to save the resource to the Store. + pub fn validate_and_build_response( + self, opts: &CommitOpts, + store: &impl Storelike, ) -> AtomicResult { - let subject_url = url::Url::parse(&self.subject) - .map_err(|e| format!("Subject '{}' is not a URL. {}", &self.subject, e))?; + let commit = self; + let subject_url = url::Url::parse(&commit.subject) + .map_err(|e| format!("Subject '{}' is not a URL. {}", commit.subject, e))?; if subject_url.query().is_some() { return Err("Subject URL cannot have query parameters".into()); } if opts.validate_signature { - let signature = match self.signature.as_ref() { - Some(sig) => sig, - None => return Err("No signature set".into()), - }; - let pubkey_b64 = store - .get_resource(&self.signer)? - .get(urls::PUBLIC_KEY)? - .to_string(); - let agent_pubkey = decode_base64(&pubkey_b64)?; - let stringified_commit = self.serialize_deterministically_json_ad(store)?; - let peer_public_key = - ring::signature::UnparsedPublicKey::new(&ring::signature::ED25519, agent_pubkey); - let signature_bytes = decode_base64(signature)?; - peer_public_key - .verify(stringified_commit.as_bytes(), &signature_bytes) - .map_err(|_e| { - format!( - "Incorrect signature for Commit. This could be due to an error during signing or serialization of the commit. Compare this to the serialized commit in the client: {}", - stringified_commit, - ) - })?; + commit.validate_signature(store)?; } - // Check if the created_at lies in the past if opts.validate_timestamp { - check_timestamp(self.created_at)?; + commit.validate_timestamp()?; } - self.check_for_circular_parents()?; - - let commit_resource: Resource = self.into_resource(store)?; + commit.check_for_circular_parents()?; let mut is_new = false; - // Create a new resource if it doens't exist yet - let mut resource_old = match store.get_resource(&self.subject) { + // Create a new resource if it doesn't exist yet + let mut resource_old = match store.get_resource(&commit.subject) { Ok(rs) => rs, Err(_) => { is_new = true; - Resource::new(self.subject.clone()) + Resource::new(commit.subject.clone()) } }; // Make sure the one creating the commit had the same idea of what the current state is. if !is_new && opts.validate_previous_commit { - if let Ok(last_commit_val) = resource_old.get(urls::LAST_COMMIT) { - let last_commit = last_commit_val.to_string(); - - if let Some(prev_commit) = self.previous_commit.clone() { - // TODO: try auto merge - if last_commit != prev_commit { - return Err(format!( - "previousCommit mismatch. Had lastCommit '{}' in Resource {}, but got in Commit '{}'. Perhaps you created the Commit based on an outdated version of the Resource.", - last_commit, subject_url, prev_commit, - ) - .into()); - } - } else { - return Err(format!("Missing `previousCommit`. Resource {} already exists, and it has a `lastCommit` field, so a `previousCommit` field is required in your Commit.", self.subject).into()); - } - } else { - // If there is no lastCommit in the Resource, we'll accept the Commit. - tracing::warn!("No `lastCommit` in Resource. This can be a bug, or it could be that the resource was never properly updated."); - } + commit.validate_previous_commit(&resource_old, subject_url.as_str())?; }; - let mut resource_new = self - .apply_changes(resource_old.clone(), store, false) - .map_err(|e| format!("Error applying changes to Resource {}. {}", self.subject, e))?; + let mut applied = commit + .apply_changes(resource_old.clone(), store) + .map_err(|e| { + format!( + "Error applying changes to Resource {}. {}", + commit.subject, e + ) + })?; if opts.validate_rights { - let validate_for = opts.validate_for_agent.as_ref().unwrap_or(&self.signer); + let validate_for = opts.validate_for_agent.as_ref().unwrap_or(&commit.signer); if is_new { - hierarchy::check_append(store, &resource_new, &validate_for.into())?; + crate::hierarchy::check_append(store, &applied.resource_new, &validate_for.into())?; } else { // Set a parent only if the rights checks are to be validated. // If there is no explicit parent set on the previous resource, use a default. @@ -186,110 +245,57 @@ impl Commit { )?; } // This should use the _old_ resource, no the new one, as the new one might maliciously give itself write rights. - hierarchy::check_write(store, &resource_old, &validate_for.into())?; + crate::hierarchy::check_write(store, &resource_old, &validate_for.into())?; } }; // Check if all required props are there if opts.validate_schema { - resource_new.check_required_props(store)?; + applied.resource_new.check_required_props(store)?; } + let commit_resource: Resource = commit.into_resource(store)?; + // Set the `lastCommit` to the newly created Commit - resource_new.set( + applied.resource_new.set( urls::LAST_COMMIT.to_string(), Value::AtomicUrl(commit_resource.get_subject().into()), store, )?; - let _resource_new_classes = resource_new.get_classes(store)?; + let destroyed = commit.destroy.unwrap_or(false); - // BEFORE APPLY COMMIT HANDLERS - #[cfg(feature = "db")] - for class in &_resource_new_classes { - match class.subject.as_str() { - urls::COMMIT => return Err("Commits can not be edited or created directly.".into()), - urls::INVITE => { - crate::plugins::invite::before_apply_commit(store, self, &resource_new)? - } - _other => {} - }; - } - - // If a Destroy field is found, remove the resource and return early - // TODO: Should we remove the existing commits too? Probably. - if let Some(destroy) = self.destroy { - if destroy { - // Note: the value index is updated before this action, in resource.apply_changes() - store.remove_resource(&self.subject)?; - store.add_resource_opts(&commit_resource, false, opts.update_index, false)?; - return Ok(CommitResponse { - resource_new: None, - resource_old: Some(resource_old), - commit_resource, - commit_struct: self.clone(), - }); - } - } - - // We apply the changes again, but this time also update the index - self.apply_changes(resource_old.clone(), store, opts.update_index)?; - - // Save the Commit to the Store. We can skip the required props checking, but we need to make sure the commit hasn't been applied before. - store.add_resource_opts(&commit_resource, false, opts.update_index, false)?; - // Save the resource, but skip updating the index - that has been done in a previous step. - store.add_resource_opts(&resource_new, false, false, true)?; - - let commit_response = CommitResponse { - resource_new: Some(resource_new.clone()), - resource_old: Some(resource_old), + Ok(CommitResponse { + commit, + add_atoms: applied.add_atoms, + remove_atoms: applied.remove_atoms, commit_resource, - commit_struct: self.clone(), - }; - - store.handle_commit(&commit_response); - - // AFTER APPLY COMMIT HANDLERS - // Commit has been checked and saved. - // Here you can add side-effects, such as creating new Commits. - #[cfg(feature = "db")] - for class in _resource_new_classes { - match class.subject.as_str() { - urls::MESSAGE => crate::plugins::chatroom::after_apply_commit_message( - store, - self, - &resource_new, - )?, - _other => {} - }; - } - - Ok(commit_response) + resource_new: if destroyed { + None + } else { + Some(applied.resource_new) + }, + resource_old: if is_new { + None + } else { + Some(applied.resource_old) + }, + }) } - fn check_for_circular_parents(&self) -> AtomicResult<()> { - // Check if the set hashset has a parent property and if it matches with this subject. - if let Some(set) = self.set.clone() { - if let Some(parent) = set.get(urls::PARENT) { - if parent.to_string() == self.subject { - return Err("Circular parent reference".into()); - } - } - } - - // TODO: Check for circular parents by going up the parent tree. - Ok(()) + /// Checks if the Commit has been created in the future or if it is expired. + #[tracing::instrument(skip_all)] + pub fn validate_timestamp(&self) -> AtomicResult<()> { + crate::utils::check_timestamp_in_past(self.created_at, ACCEPTABLE_TIME_DIFFERENCE) } /// Updates the values in the Resource according to the `set`, `remove`, `push`, and `destroy` attributes in the Commit. - /// Optionally also updates the index in the Store. - /// The Old Resource is only needed when `update_index` is true, and is used for checking + /// Optionally also returns the updated Atoms. #[tracing::instrument(skip(store))] pub fn apply_changes( &self, mut resource: Resource, store: &impl Storelike, - update_index: bool, - ) -> AtomicResult { + ) -> AtomicResult { let resource_unedited = resource.clone(); let mut remove_atoms: Vec = Vec::new(); @@ -299,18 +305,15 @@ impl Commit { for prop in remove.iter() { resource.remove_propval(prop); - if update_index { - if let Ok(val) = resource_unedited.get(prop) { - let atom = - Atom::new(resource.get_subject().clone(), prop.into(), val.clone()); - remove_atoms.push(atom); - } else { - // The property does not exist, so nothing to remove. - // - // This may happen if another concurrent commit has removed it first, or - // client removed it without validating it exists. (Currently rust and - // typescript clients do not validate that.) - } + if let Ok(val) = resource_unedited.get(prop) { + let atom = Atom::new(resource.get_subject().clone(), prop.into(), val.clone()); + remove_atoms.push(atom); + } else { + // The property does not exist, so nothing to remove. + // + // This may happen if another concurrent commit has removed it first, or + // client removed it without validating it exists. (Currently rust and + // typescript clients do not validate that.) } } } @@ -325,16 +328,14 @@ impl Commit { ) })?; - if update_index { - let new_atom = - Atom::new(resource.get_subject().clone(), prop.into(), new_val.clone()); - if let Ok(old_val) = resource_unedited.get(prop) { - let old_atom = - Atom::new(resource.get_subject().clone(), prop.into(), old_val.clone()); - remove_atoms.push(old_atom); - } - add_atoms.push(new_atom); + let new_atom = + Atom::new(resource.get_subject().clone(), prop.into(), new_val.clone()); + if let Ok(old_val) = resource_unedited.get(prop) { + let old_atom = + Atom::new(resource.get_subject().clone(), prop.into(), old_val.clone()); + remove_atoms.push(old_atom); } + add_atoms.push(new_atom); } } if let Some(push) = self.push.clone() { @@ -352,15 +353,13 @@ impl Commit { }; old_vec.append(&mut new_vec.clone()); resource.set_unsafe(prop.into(), old_vec.into()); - if update_index { - for added_resource in new_vec { - let atom = Atom::new( - resource.get_subject().clone(), - prop.into(), - added_resource.into(), - ); - add_atoms.push(atom); - } + for added_resource in new_vec { + let atom = Atom::new( + resource.get_subject().clone(), + prop.into(), + added_resource.into(), + ); + add_atoms.push(atom); } } } @@ -373,34 +372,12 @@ impl Commit { } } - if update_index { - for atom in remove_atoms { - store - .remove_atom_from_index(&atom, &resource_unedited) - .map_err(|e| format!("Error removing atom from index: {e} Atom: {e}"))? - } - for atom in add_atoms { - store - .add_atom_to_index(&atom, &resource) - .map_err(|e| format!("Error adding atom to index: {e} Atom: {e}"))?; - } - } - Ok(resource) - } - - /// Applies a commit without performing authorization / signature / schema checks. - /// Does not update the index. - pub fn apply_unsafe(&self, store: &impl Storelike) -> AtomicResult { - let opts = CommitOpts { - validate_schema: false, - validate_signature: false, - validate_timestamp: false, - validate_rights: false, - validate_previous_commit: false, - validate_for_agent: None, - update_index: false, - }; - self.apply_opts(store, &opts) + Ok(CommitApplied { + resource_old: resource_unedited, + resource_new: resource, + add_atoms, + remove_atoms, + }) } /// Converts a Resource of a Commit into a Commit @@ -683,21 +660,6 @@ pub fn sign_message(message: &str, private_key: &str, public_key: &str) -> Atomi /// The amount of milliseconds that a Commit signature is valid for. const ACCEPTABLE_TIME_DIFFERENCE: i64 = 10000; -/// Checks if the Commit has been created in the future or if it is expired. -#[tracing::instrument(skip_all)] -pub fn check_timestamp(timestamp: i64) -> AtomicResult<()> { - let now = crate::utils::now(); - if timestamp > now + ACCEPTABLE_TIME_DIFFERENCE { - return Err(format!( - "Commit CreatedAt timestamp must lie in the past. Check your clock. Timestamp now: {} CreatedAt is: {}", - now, timestamp - ) - .into()); - // TODO: also check that no younger commits exist - } - Ok(()) -} - #[cfg(test)] mod test { lazy_static::lazy_static! { @@ -731,7 +693,7 @@ mod test { commitbuiler.set(property2.into(), value2); let commit = commitbuiler.sign(&agent, &store, &resource).unwrap(); let commit_subject = commit.get_subject().to_string(); - let _created_resource = commit.apply_opts(&store, &OPTS).unwrap(); + let _created_resource = store.apply_commit(commit, &OPTS).unwrap(); let resource = store.get_resource(subject).unwrap(); assert!(resource.get(property1).unwrap().to_string() == value1.to_string()); @@ -829,13 +791,13 @@ mod test { let subject = "https://localhost/?q=invalid"; let commitbuiler = crate::commit::CommitBuilder::new(subject.into()); let commit = commitbuiler.sign(&agent, &store, &resource).unwrap(); - commit.apply_opts(&store, &OPTS).unwrap_err(); + store.apply_commit(commit, &OPTS).unwrap_err(); } { let subject = "https://localhost/valid"; let commitbuiler = crate::commit::CommitBuilder::new(subject.into()); let commit = commitbuiler.sign(&agent, &store, &resource).unwrap(); - commit.apply_opts(&store, &OPTS).unwrap(); + store.apply_commit(commit, &OPTS).unwrap(); } } } diff --git a/lib/src/db.rs b/lib/src/db.rs index cc506bab7..1c23b27b5 100644 --- a/lib/src/db.rs +++ b/lib/src/db.rs @@ -6,6 +6,7 @@ mod prop_val_sub_index; mod query_index; #[cfg(test)] pub mod test; +mod trees; mod val_prop_sub_index; use std::{ @@ -15,11 +16,12 @@ use std::{ }; use tracing::{info, instrument}; +use trees::{Method, Operation, Transaction, Tree}; use crate::{ agents::ForAgent, atoms::IndexAtom, - commit::CommitResponse, + commit::{CommitOpts, CommitResponse}, db::{ query_index::{requires_query_index, NO_VALUE}, val_prop_sub_index::find_in_val_prop_sub_index, @@ -28,21 +30,19 @@ use crate::{ errors::{AtomicError, AtomicResult}, resources::PropVals, storelike::{Query, QueryResult, Storelike}, + urls, values::SortableValue, - Atom, Resource, + Atom, Commit, Resource, }; use self::{ migrations::migrate_maybe, - prop_val_sub_index::{ - add_atom_to_prop_val_sub_index, find_in_prop_val_sub_index, - remove_atom_from_prop_val_sub_index, - }, + prop_val_sub_index::{add_atom_to_prop_val_sub_index, find_in_prop_val_sub_index}, query_index::{ check_if_atom_matches_watched_query_filters, query_sorted_indexed, should_include_resource, update_indexed_member, IndexIterator, QueryFilter, }, - val_prop_sub_index::{add_atom_to_reference_index, remove_atom_from_reference_index}, + val_prop_sub_index::add_atom_to_valpropsub_index, }; // A function called by the Store when a Commit is accepted @@ -69,15 +69,13 @@ pub struct Db { default_agent: Arc>>, /// Stores all resources. The Key is the Subject as a `string.as_bytes()`, the value a [PropVals]. Propvals must be serialized using [bincode]. resources: sled::Tree, - /// Index of all Atoms, sorted by {Value}-{Property}-{Subject}. - /// See [reference_index] + /// [Tree::ValPropSub] reference_index: sled::Tree, - /// Index sorted by property + value. - /// Used for queries where the property is known. + /// [Tree::PropValSub] prop_val_sub_index: sled::Tree, - /// Stores the members of Collections, easily sortable. + /// [Tree::QueryMembers] query_index: sled::Tree, - /// A list of all the Collections currently being used. Is used to update `query_index`. + /// [Tree::WatchedQueries] watched_queries: sled::Tree, /// The address where the db will be hosted, e.g. http://localhost/ server_url: String, @@ -95,11 +93,11 @@ impl Db { tracing::info!("Opening database at {:?}", path); let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?; - let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?; - let reference_index = db.open_tree("reference_index_v1")?; - let query_index = db.open_tree("members_index")?; - let prop_val_sub_index = db.open_tree("prop_val_sub_index")?; - let watched_queries = db.open_tree("watched_queries")?; + let resources = db.open_tree(Tree::Resources).map_err(|e| format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?; + let reference_index = db.open_tree(Tree::ValPropSub)?; + let query_index = db.open_tree(Tree::QueryMembers)?; + let prop_val_sub_index = db.open_tree(Tree::PropValSub)?; + let watched_queries = db.open_tree(Tree::WatchedQueries)?; let store = Db { db, default_agent: Arc::new(Mutex::new(None)), @@ -133,6 +131,47 @@ impl Db { Ok(store) } + #[instrument(skip(self))] + fn add_atom_to_index( + &self, + atom: &Atom, + resource: &Resource, + transaction: &mut Transaction, + ) -> AtomicResult<()> { + for index_atom in atom.to_indexable_atoms() { + add_atom_to_valpropsub_index(&index_atom, transaction)?; + add_atom_to_prop_val_sub_index(&index_atom, transaction)?; + // Also update the query index to keep collections performant + check_if_atom_matches_watched_query_filters( + self, + &index_atom, + atom, + false, + resource, + transaction, + ) + .map_err(|e| format!("Failed to check_if_atom_matches_watched_collections. {}", e))?; + } + Ok(()) + } + + fn add_resource_tx( + &self, + resource: &Resource, + transaction: &mut Transaction, + ) -> AtomicResult<()> { + let subject = resource.get_subject(); + let propvals = resource.get_propvals(); + let resource_bin = bincode::serialize(propvals)?; + transaction.push(Operation { + tree: Tree::Resources, + method: Method::Insert, + key: subject.as_bytes().to_vec(), + val: Some(resource_bin), + }); + Ok(()) + } + #[instrument(skip(self))] fn all_index_atoms(&self, include_external: bool) -> IndexIterator { Box::new( @@ -149,6 +188,22 @@ impl Db { ) } + /// Constructs the value index from all resources in the store. Could take a while. + pub fn build_index(&self, include_external: bool) -> AtomicResult<()> { + tracing::info!("Building index (this could take a few minutes for larger databases)"); + for r in self.all_resources(include_external) { + let mut transaction = Transaction::new(); + for atom in r.to_atoms() { + self.add_atom_to_index(&atom, &r, &mut transaction) + .map_err(|e| format!("Failed to add atom to index {}. {}", atom, e))?; + } + self.apply_transaction(&mut transaction) + .map_err(|e| format!("Failed to commit transaction. {}", e))?; + } + tracing::info!("Building index finished!"); + Ok(()) + } + /// Internal method for fetching Resource data. #[instrument(skip(self))] fn set_propvals(&self, subject: &str, propvals: &PropVals) -> AtomicResult<()> { @@ -220,6 +275,7 @@ impl Db { &self, atom: &IndexAtom, query_filter: &QueryFilter, + transaction: &mut Transaction, ) -> AtomicResult<()> { // Get the SortableValue either from the Atom or the Resource. let sort_val: SortableValue = if let Some(sort) = &query_filter.sort_by { @@ -238,7 +294,7 @@ impl Db { atom.sort_value.clone() }; - update_indexed_member(self, query_filter, &atom.subject, &sort_val, false)?; + update_indexed_member(query_filter, &atom.subject, &sort_val, false, transaction)?; Ok(()) } @@ -250,6 +306,71 @@ impl Db { } } + /// Apply made changes to the store. + #[instrument(skip(self))] + fn apply_transaction(&self, transaction: &mut Transaction) -> AtomicResult<()> { + let mut batch_resources = sled::Batch::default(); + let mut batch_propvalsub = sled::Batch::default(); + let mut batch_valpropsub = sled::Batch::default(); + let mut batch_watched_queries = sled::Batch::default(); + let mut batch_query_members = sled::Batch::default(); + + for op in transaction.iter() { + match op.tree { + trees::Tree::Resources => match op.method { + trees::Method::Insert => { + batch_resources.insert::<&[u8], &[u8]>(&op.key, op.val.as_ref().unwrap()); + } + trees::Method::Delete => { + batch_resources.remove(op.key.clone()); + } + }, + trees::Tree::PropValSub => match op.method { + trees::Method::Insert => { + batch_propvalsub.insert::<&[u8], &[u8]>(&op.key, op.val.as_ref().unwrap()); + } + trees::Method::Delete => { + batch_propvalsub.remove(op.key.clone()); + } + }, + trees::Tree::ValPropSub => match op.method { + trees::Method::Insert => { + batch_valpropsub.insert::<&[u8], &[u8]>(&op.key, op.val.as_ref().unwrap()); + } + trees::Method::Delete => { + batch_valpropsub.remove(op.key.clone()); + } + }, + trees::Tree::WatchedQueries => match op.method { + trees::Method::Insert => { + batch_watched_queries + .insert::<&[u8], &[u8]>(&op.key, op.val.as_ref().unwrap()); + } + trees::Method::Delete => { + batch_watched_queries.remove(op.key.clone()); + } + }, + trees::Tree::QueryMembers => match op.method { + trees::Method::Insert => { + batch_query_members + .insert::<&[u8], &[u8]>(&op.key, op.val.as_ref().unwrap()); + } + trees::Method::Delete => { + batch_query_members.remove(op.key.clone()); + } + }, + } + } + + self.resources.apply_batch(batch_resources)?; + self.prop_val_sub_index.apply_batch(batch_propvalsub)?; + self.reference_index.apply_batch(batch_valpropsub)?; + self.watched_queries.apply_batch(batch_watched_queries)?; + self.query_index.apply_batch(batch_query_members)?; + + Ok(()) + } + fn query_basic(&self, q: &Query) -> AtomicResult { let self_url = self .get_self_url() @@ -303,10 +424,12 @@ impl Db { let atoms = self.get_index_iterator_for_query(q); q_filter.watch(self)?; + let mut transaction = Transaction::new(); // Build indexes for atom in atoms.flatten() { - self.build_index_for_atom(&atom, &q_filter)?; + self.build_index_for_atom(&atom, &q_filter, &mut transaction)?; } + self.apply_transaction(&mut transaction)?; // Query through the new indexes. (subjects, resources, total_count) = query_sorted_indexed(self, q)?; @@ -318,6 +441,30 @@ impl Db { count: total_count, }) } + + #[instrument(skip(self))] + fn remove_atom_from_index( + &self, + atom: &Atom, + resource: &Resource, + transaction: &mut Transaction, + ) -> AtomicResult<()> { + for index_atom in atom.to_indexable_atoms() { + transaction.push(Operation::remove_atom_from_reference_index(&index_atom)); + transaction.push(Operation::remove_atom_from_prop_val_sub_index(&index_atom)); + + check_if_atom_matches_watched_query_filters( + self, + &index_atom, + atom, + true, + resource, + transaction, + ) + .map_err(|e| format!("Checking atom went wrong: {}", e))?; + } + Ok(()) + } } impl Drop for Db { @@ -359,20 +506,6 @@ impl Storelike for Db { Ok(()) } - #[instrument(skip(self))] - fn add_atom_to_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> { - for index_atom in atom.to_indexable_atoms() { - add_atom_to_reference_index(&index_atom, self)?; - add_atom_to_prop_val_sub_index(&index_atom, self)?; - // Also update the query index to keep collections performant - check_if_atom_matches_watched_query_filters(self, &index_atom, atom, false, resource) - .map_err(|e| { - format!("Failed to check_if_atom_matches_watched_collections. {}", e) - })?; - } - Ok(()) - } - #[instrument(skip(self, resource), fields(sub = %resource.get_subject()))] fn add_resource_opts( &self, @@ -395,35 +528,122 @@ impl Storelike for Db { resource.check_required_props(self)?; } if update_index { + let mut transaction = Transaction::new(); if let Some(pv) = existing { let subject = resource.get_subject(); for (prop, val) in pv.iter() { // Possible performance hit - these clones can be replaced by modifying remove_atom_from_index let remove_atom = crate::Atom::new(subject.into(), prop.into(), val.clone()); - self.remove_atom_from_index(&remove_atom, resource) + self.remove_atom_from_index(&remove_atom, resource, &mut transaction) .map_err(|e| { format!("Failed to remove atom from index {}. {}", remove_atom, e) })?; } } for a in resource.to_atoms() { - self.add_atom_to_index(&a, resource) + self.add_atom_to_index(&a, resource, &mut transaction) .map_err(|e| format!("Failed to add atom to index {}. {}", a, e))?; } + self.apply_transaction(&mut transaction)?; } self.set_propvals(resource.get_subject(), resource.get_propvals()) } - #[instrument(skip(self))] - fn remove_atom_from_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> { - for index_atom in atom.to_indexable_atoms() { - remove_atom_from_reference_index(&index_atom, self)?; - remove_atom_from_prop_val_sub_index(&index_atom, self)?; + /// Apply a single signed Commit to the Db. + /// Creates, edits or destroys a resource. + /// Allows for control over which validations should be performed. + /// Returns the generated Commit, the old Resource and the new Resource. + #[tracing::instrument(skip(self))] + fn apply_commit(&self, commit: Commit, opts: &CommitOpts) -> AtomicResult { + let store = self; + + let commit_response = commit.validate_and_build_response(opts, store)?; + + let mut transaction = Transaction::new(); + + // BEFORE APPLY COMMIT HANDLERS + // TODO: Move to something dynamic + if let Some(resource_new) = &commit_response.resource_new { + let _resource_new_classes = resource_new.get_classes(store)?; + #[cfg(feature = "db")] + for class in &_resource_new_classes { + match class.subject.as_str() { + urls::COMMIT => { + return Err("Commits can not be edited or created directly.".into()) + } + urls::INVITE => crate::plugins::invite::before_apply_commit( + store, + &commit_response.commit, + resource_new, + )?, + _other => {} + }; + } + } - check_if_atom_matches_watched_query_filters(self, &index_atom, atom, true, resource) - .map_err(|e| format!("Checking atom went wrong: {}", e))?; + // Save the Commit to the Store. We can skip the required props checking, but we need to make sure the commit hasn't been applied before. + store.add_resource_tx(&commit_response.commit_resource, &mut transaction)?; + // We still need to index the Commit! + for atom in commit_response.commit_resource.to_atoms() { + store.add_atom_to_index(&atom, &commit_response.commit_resource, &mut transaction)?; } - Ok(()) + + match (&commit_response.resource_old, &commit_response.resource_new) { + (None, None) => { + return Err("Neither an old nor a new resource is returned from the commit - something went wrong.".into()) + }, + (Some(_old), None) => { + assert_eq!(_old.get_subject(), &commit_response.commit.subject); + assert!(&commit_response.commit.destroy.expect("Resource was removed but `commit.destroy` was not set!")); + self.remove_resource(&commit_response.commit.subject)?; + }, + _ => {} + }; + + if let Some(new) = &commit_response.resource_new { + self.add_resource_tx(new, &mut transaction)?; + } + + if opts.update_index { + if let Some(old) = &commit_response.resource_old { + for atom in &commit_response.remove_atoms { + store + .remove_atom_from_index(atom, old, &mut transaction) + .map_err(|e| format!("Error removing atom from index: {e} Atom: {e}"))? + } + } + if let Some(new) = &commit_response.resource_new { + for atom in &commit_response.add_atoms { + store + .add_atom_to_index(atom, new, &mut transaction) + .map_err(|e| format!("Error adding atom to index: {e} Atom: {e}"))? + } + } + } + + store.apply_transaction(&mut transaction)?; + + store.handle_commit(&commit_response); + + // AFTER APPLY COMMIT HANDLERS + // Commit has been checked and saved. + // Here you can add side-effects, such as creating new Commits. + #[cfg(feature = "db")] + if let Some(resource_new) = &commit_response.resource_new { + let _resource_new_classes = resource_new.get_classes(store)?; + #[cfg(feature = "db")] + for class in &_resource_new_classes { + match class.subject.as_str() { + urls::MESSAGE => crate::plugins::chatroom::after_apply_commit_message( + store, + &commit_response.commit, + resource_new, + )?, + _other => {} + }; + } + } + Ok(commit_response) } fn get_server_url(&self) -> &str { @@ -660,11 +880,12 @@ impl Storelike for Db { #[instrument(skip(self))] fn remove_resource(&self, subject: &str) -> AtomicResult<()> { + let mut transaction = Transaction::new(); if let Ok(found) = self.get_propvals(subject) { let resource = Resource::from_propvals(found, subject.to_string()); for (prop, val) in resource.get_propvals() { let remove_atom = crate::Atom::new(subject.into(), prop.clone(), val.clone()); - self.remove_atom_from_index(&remove_atom, &resource)?; + self.remove_atom_from_index(&remove_atom, &resource, &mut transaction)?; } let _found = self.resources.remove(subject.as_bytes())?; } else { @@ -674,6 +895,7 @@ impl Storelike for Db { ) .into()); } + self.apply_transaction(&mut transaction)?; Ok(()) } diff --git a/lib/src/db/migrations.rs b/lib/src/db/migrations.rs index aa4b74939..610b0e6c4 100644 --- a/lib/src/db/migrations.rs +++ b/lib/src/db/migrations.rs @@ -9,10 +9,10 @@ Therefore, we need migrations to convert the old schema to the new one. - Write a function called `v{OLD}_to_v{NEW} that takes a [Db]. Make sure it removed the old `Tree`. Use [assert] to check if the process worked. - In [migrate_maybe] add the key of the outdated Tree - Add the function to the [migrate_maybe] `match` statement, select the older version of the Tree -- Update the Tree key used in [Db::init] +- Update the Tree key used in [crate::db::trees] */ -use crate::{errors::AtomicResult, Db, Storelike}; +use crate::{errors::AtomicResult, Db}; /// Checks the current version(s) of the internal Store, and performs migrations if needed. pub fn migrate_maybe(store: &Db) -> AtomicResult<()> { diff --git a/lib/src/db/prop_val_sub_index.rs b/lib/src/db/prop_val_sub_index.rs index 2a3fc318b..b4d1fb70c 100644 --- a/lib/src/db/prop_val_sub_index.rs +++ b/lib/src/db/prop_val_sub_index.rs @@ -4,7 +4,10 @@ use tracing::instrument; use crate::{atoms::IndexAtom, errors::AtomicResult, Db, Value}; -use super::query_index::{IndexIterator, SEPARATION_BIT}; +use super::{ + query_index::{IndexIterator, SEPARATION_BIT}, + trees::{Method, Operation, Transaction, Tree}, +}; /// Finds all Atoms for a given {property}-{value} tuple. pub fn find_in_prop_val_sub_index(store: &Db, prop: &str, val: Option<&Value>) -> IndexIterator { @@ -19,22 +22,27 @@ pub fn find_in_prop_val_sub_index(store: &Db, prop: &str, val: Option<&Value>) - })) } -#[instrument(skip(store))] -pub fn add_atom_to_prop_val_sub_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { - let _existing = store - .prop_val_sub_index - .insert(key_from_atom(index_atom), b""); +pub fn add_atom_to_prop_val_sub_index( + index_atom: &IndexAtom, + transaction: &mut Transaction, +) -> AtomicResult<()> { + transaction.push(Operation { + key: propvalsub_key(index_atom), + val: Some(b"".to_vec()), + tree: Tree::PropValSub, + method: Method::Insert, + }); Ok(()) } #[instrument(skip(store))] pub fn remove_atom_from_prop_val_sub_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { - let _existing = store.prop_val_sub_index.remove(key_from_atom(index_atom)); + let _existing = store.prop_val_sub_index.remove(propvalsub_key(index_atom)); Ok(()) } /// Constructs the Key for the prop_val_sub_index. -fn key_from_atom(atom: &IndexAtom) -> Vec { +pub fn propvalsub_key(atom: &IndexAtom) -> Vec { [ atom.property.as_bytes(), &[SEPARATION_BIT], @@ -79,7 +87,7 @@ mod test { sort_value: "2".into(), subject: "http://example.com/subj".into(), }; - let key = key_from_atom(&atom); + let key = propvalsub_key(&atom); let atom2 = key_to_index_atom(&key).unwrap(); assert_eq!(atom, atom2); } diff --git a/lib/src/db/query_index.rs b/lib/src/db/query_index.rs index 442272624..325b0da0c 100644 --- a/lib/src/db/query_index.rs +++ b/lib/src/db/query_index.rs @@ -7,6 +7,8 @@ use crate::{ }; use serde::{Deserialize, Serialize}; +use super::trees::{self, Operation, Transaction, Tree}; + /// Returned by functions that iterate over [IndexAtom]s pub type IndexIterator = Box>>; @@ -103,7 +105,7 @@ pub fn query_sorted_indexed( .get_self_url() .ok_or("No self_url set, required for Queries")?; - let limit = q.limit.unwrap_or(std::usize::MAX); + let limit = q.limit.unwrap_or(usize::MAX); for (i, kv) in iter.enumerate() { // The user's maximum amount of results has not yet been reached @@ -258,6 +260,7 @@ pub fn check_if_atom_matches_watched_query_filters( atom: &Atom, delete: bool, resource: &Resource, + transaction: &mut Transaction, ) -> AtomicResult<()> { for query in store.watched_queries.iter() { // The keys store all the data @@ -270,7 +273,7 @@ pub fn check_if_atom_matches_watched_query_filters( Ok(val) => val.to_sortable_string(), Err(_e) => NO_VALUE.to_string(), }; - update_indexed_member(store, &q_filter, &atom.subject, &update_val, delete)?; + update_indexed_member(&q_filter, &atom.subject, &update_val, delete, transaction)?; } } else { return Err(format!("Can't deserialize collection index: {:?}", query).into()); @@ -279,14 +282,14 @@ pub fn check_if_atom_matches_watched_query_filters( Ok(()) } -/// Adds or removes a single item (IndexAtom) to the index_members cache. -#[tracing::instrument(skip(store))] +/// Adds or removes a single item (IndexAtom) to the [Tree::QueryMembers] cache. +#[tracing::instrument(skip())] pub fn update_indexed_member( - store: &Db, collection: &QueryFilter, subject: &str, value: &SortableValue, delete: bool, + transaction: &mut Transaction, ) -> AtomicResult<()> { let key = create_query_index_key( collection, @@ -295,9 +298,19 @@ pub fn update_indexed_member( Some(subject), )?; if delete { - store.query_index.remove(key)?; + transaction.push(Operation { + tree: Tree::QueryMembers, + method: trees::Method::Delete, + key, + val: None, + }) } else { - store.query_index.insert(key, b"")?; + transaction.push(Operation { + tree: Tree::QueryMembers, + method: trees::Method::Insert, + key, + val: Some(b"".into()), + }); } Ok(()) } diff --git a/lib/src/db/test.rs b/lib/src/db/test.rs index 0c577f246..f3ec91cd3 100644 --- a/lib/src/db/test.rs +++ b/lib/src/db/test.rs @@ -155,7 +155,15 @@ fn destroy_resource_and_check_collection_and_commits() { "The commits collection did not increase after saving the resource." ); - _res.resource_new.unwrap().destroy(&store).unwrap(); + let clone = _res.resource_new.clone().unwrap(); + let resp = _res.resource_new.unwrap().destroy(&store).unwrap(); + assert!(resp.resource_new.is_none()); + assert_eq!( + resp.resource_old.as_ref().unwrap().to_json_ad().unwrap(), + clone.to_json_ad().unwrap(), + "JSON AD differs between removed resource and resource passed back from commit" + ); + assert!(resp.resource_old.is_some()); let agents_collection_3 = store .get_resource_extended(&agents_url, false, for_agent) .unwrap(); diff --git a/lib/src/db/trees.rs b/lib/src/db/trees.rs new file mode 100644 index 000000000..74607e129 --- /dev/null +++ b/lib/src/db/trees.rs @@ -0,0 +1,87 @@ +use crate::atoms::IndexAtom; + +use super::{prop_val_sub_index::propvalsub_key, val_prop_sub_index::valpropsub_key}; + +#[derive(Debug)] +pub enum Tree { + /// Full resources, Key: Subject, Value: [Resource](crate::Resource) + Resources, + /// Stores the members of Collections, easily sortable. + QueryMembers, + /// A list of all the Collections currently being used. Is used to update `query_index`. + WatchedQueries, + /// Index sorted by {Property}-{Value}-{Subject}. + /// Used for queries where the property is known. + PropValSub, + /// Reference index, used for queries where the value (or one of the values, in case of an array) is but the subject is not. + /// Index sorted by {Value}-{Property}-{Subject}. + ValPropSub, +} + +const RESOURCES: &str = "resources_v1"; +const VALPROPSUB: &str = "reference_index_v1"; +const QUERY_MEMBERS: &str = "members_index"; +const PROPVALSUB: &str = "prop_val_sub_index"; +const QUERIES_WATCHED: &str = "watched_queries"; + +impl std::fmt::Display for Tree { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Tree::Resources => f.write_str(RESOURCES), + Tree::WatchedQueries => f.write_str(QUERIES_WATCHED), + Tree::PropValSub => f.write_str(PROPVALSUB), + Tree::ValPropSub => f.write_str(VALPROPSUB), + Tree::QueryMembers => f.write_str(QUERY_MEMBERS), + } + } +} + +// convert Tree into AsRef<[u8]> by using the string above +impl AsRef<[u8]> for Tree { + fn as_ref(&self) -> &[u8] { + match self { + Tree::Resources => RESOURCES.as_bytes(), + Tree::WatchedQueries => QUERIES_WATCHED.as_bytes(), + Tree::PropValSub => PROPVALSUB.as_bytes(), + Tree::ValPropSub => VALPROPSUB.as_bytes(), + Tree::QueryMembers => QUERY_MEMBERS.as_bytes(), + } + } +} + +#[derive(Debug)] +pub enum Method { + Insert, + Delete, +} + +/// A single operation to be executed on the database. +#[derive(Debug)] +pub struct Operation { + pub tree: Tree, + pub method: Method, + pub key: Vec, + pub val: Option>, +} + +impl Operation { + pub fn remove_atom_from_reference_index(index_atom: &IndexAtom) -> Self { + Operation { + tree: Tree::ValPropSub, + method: Method::Delete, + key: valpropsub_key(index_atom), + val: None, + } + } + pub fn remove_atom_from_prop_val_sub_index(index_atom: &IndexAtom) -> Self { + Operation { + tree: Tree::PropValSub, + method: Method::Delete, + key: propvalsub_key(index_atom), + val: None, + } + } +} + +/// A set of [Operation]s that should be executed atomically by the database. +pub type Transaction = Vec; diff --git a/lib/src/db/val_prop_sub_index.rs b/lib/src/db/val_prop_sub_index.rs index 2b46d50d1..8310c08bb 100644 --- a/lib/src/db/val_prop_sub_index.rs +++ b/lib/src/db/val_prop_sub_index.rs @@ -1,25 +1,26 @@ //! Index sorted by {Value}-{Property}-{Subject}. use crate::{atoms::IndexAtom, errors::AtomicResult, Db, Value}; -use tracing::instrument; -use super::query_index::{IndexIterator, SEPARATION_BIT}; +use super::{ + query_index::{IndexIterator, SEPARATION_BIT}, + trees::{Method, Operation, Transaction, Tree}, +}; -#[instrument(skip(store))] -pub fn add_atom_to_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { - let _existing = store - .reference_index - .insert(key_from_atom(index_atom), b"")?; - Ok(()) -} - -#[instrument(skip(store))] -pub fn remove_atom_from_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { - store.reference_index.remove(key_from_atom(index_atom))?; +pub fn add_atom_to_valpropsub_index( + index_atom: &IndexAtom, + transaction: &mut Transaction, +) -> AtomicResult<()> { + transaction.push(Operation { + key: valpropsub_key(index_atom), + val: Some(b"".to_vec()), + tree: Tree::ValPropSub, + method: Method::Insert, + }); Ok(()) } /// Constructs the Key for the prop_val_sub_index. -fn key_from_atom(atom: &IndexAtom) -> Vec { +pub fn valpropsub_key(atom: &IndexAtom) -> Vec { [ atom.ref_value.as_bytes(), &[SEPARATION_BIT], @@ -87,7 +88,7 @@ mod test { sort_value: "2".into(), subject: "http://example.com/subj".into(), }; - let key = key_from_atom(&atom); + let key = valpropsub_key(&atom); let atom2 = key_to_index_atom(&key).unwrap(); assert_eq!(atom, atom2); } diff --git a/lib/src/parse.rs b/lib/src/parse.rs index 11c3c010a..3993d37b3 100644 --- a/lib/src/parse.rs +++ b/lib/src/parse.rs @@ -127,15 +127,7 @@ pub fn parse_json_ad_string( ), _other => return Err("Root JSON element must be an object or array.".into()), } - // For most save menthods, we need to add the atoms to the index here. - // The `Commit` feature adds to index by itself, so we can skip that step here. - if parse_opts.save != SaveOpts::Commit { - for res in &vec { - for atom in res.to_atoms() { - store.add_atom_to_index(&atom, res)?; - } - } - } + Ok(vec) } @@ -351,8 +343,8 @@ fn parse_json_ad_map_to_resource( update_index: true, }; - commit - .apply_opts(store, &opts) + store + .apply_commit(commit, &opts) .map_err(|e| format!("Failed to save {}: {}", r.get_subject(), e))? .resource_new .unwrap() diff --git a/lib/src/plugins/chatroom.rs b/lib/src/plugins/chatroom.rs index 962fbede4..45352abb7 100644 --- a/lib/src/plugins/chatroom.rs +++ b/lib/src/plugins/chatroom.rs @@ -6,7 +6,7 @@ They list a bunch of Messages. use crate::{ agents::ForAgent, - commit::{CommitBuilder, CommitResponse}, + commit::{CommitBuilder, CommitOpts}, errors::AtomicResult, storelike::Query, urls::{self, PARENT}, @@ -101,15 +101,10 @@ pub fn after_apply_commit_message( let new_message = crate::values::SubResource::Resource(Box::new(resource_new.to_owned())); commit_builder.push_propval(urls::MESSAGES, new_message)?; let commit = commit_builder.sign(&store.get_default_agent()?, store, &chat_room)?; + let resp = + commit.validate_and_build_response(&CommitOpts::no_validations_no_index(), store)?; - let commit_response = CommitResponse { - commit_resource: commit.into_resource(store)?, - resource_new: None, - resource_old: None, - commit_struct: commit, - }; - - store.handle_commit(&commit_response); + store.handle_commit(&resp); } Ok(()) } diff --git a/lib/src/plugins/versioning.rs b/lib/src/plugins/versioning.rs index a8eedd558..8ee57fd8b 100644 --- a/lib/src/plugins/versioning.rs +++ b/lib/src/plugins/versioning.rs @@ -95,6 +95,7 @@ fn get_commits_for_resource(subject: &str, store: &impl Storelike) -> AtomicResu let mut q = Query::new_prop_val(urls::SUBJECT, subject); q.sort_by = Some(urls::CREATED_AT.into()); let result = store.query(&q)?; + let filtered: Vec = result .resources .iter() @@ -135,8 +136,8 @@ pub fn construct_version( let mut version = Resource::new(subject.into()); for commit in commits { if let Some(current_commit) = commit.url.clone() { - let updated = commit.apply_changes(version, store, false)?; - version = updated; + let applied = commit.apply_changes(version, store)?; + version = applied.resource_new; // Stop iterating when the target commit has been applied. if current_commit == commit_url { break; @@ -199,7 +200,8 @@ mod test { resource .set_string(crate::urls::DESCRIPTION.into(), second_val, &store) .unwrap(); - let second_commit = resource.save_locally(&store).unwrap().commit_resource; + let commit_resp = resource.save_locally(&store).unwrap(); + let second_commit = commit_resp.commit_resource; let commits = get_commits_for_resource(subject, &store).unwrap(); assert_eq!(commits.len(), 2, "We should have two commits"); diff --git a/lib/src/resources.rs b/lib/src/resources.rs index f9fedbd7f..0963b0e3b 100644 --- a/lib/src/resources.rs +++ b/lib/src/resources.rs @@ -366,7 +366,7 @@ impl Resource { validate_previous_commit: false, update_index: true, }; - let commit_response = commit.apply_opts(store, &opts)?; + let commit_response = store.apply_commit(commit, &opts)?; if let Some(new) = &commit_response.resource_new { self.subject = new.subject.clone(); self.propvals = new.propvals.clone(); @@ -394,7 +394,7 @@ impl Resource { validate_previous_commit: false, update_index: true, }; - let commit_response = commit.apply_opts(store, &opts)?; + let commit_response = store.apply_commit(commit, &opts)?; if let Some(new) = &commit_response.resource_new { self.subject = new.subject.clone(); self.propvals = new.propvals.clone(); @@ -694,9 +694,9 @@ mod test { .clone() .sign(&agent, &store, &new_resource) .unwrap(); - commit - .apply_opts( - &store, + store + .apply_commit( + commit, &CommitOpts { validate_schema: true, validate_signature: true, @@ -792,7 +792,7 @@ mod test { "The first element should be the appended value" ); let resp = resource.save_locally(&store).unwrap(); - assert!(resp.commit_struct.push.is_some()); + assert!(resp.commit_resource.get(urls::PUSH).is_ok()); let new_val = resp .resource_new diff --git a/lib/src/storelike.rs b/lib/src/storelike.rs index 095b17583..54705bfca 100644 --- a/lib/src/storelike.rs +++ b/lib/src/storelike.rs @@ -33,12 +33,6 @@ pub trait Storelike: Sized { )] fn add_atoms(&self, atoms: Vec) -> AtomicResult<()>; - /// Adds an Atom to the PropSubjectMap. Overwrites if already present. - /// The default implementation for this does not do anything, so overwrite it if your store needs indexing. - fn add_atom_to_index(&self, _atom: &Atom, _resource: &Resource) -> AtomicResult<()> { - Ok(()) - } - /// Adds a Resource to the store. /// Replaces existing resource with the contents. /// Updates the index. @@ -63,23 +57,41 @@ pub trait Storelike: Sized { /// If Include_external is false, this is filtered by selecting only resoureces that match the `self` URL of the store. fn all_resources(&self, include_external: bool) -> Box>; - /// Constructs the value index from all resources in the store. Could take a while. - fn build_index(&self, include_external: bool) -> AtomicResult<()> { - tracing::info!("Building index (this could take a few minutes for larger databases)"); - for r in self.all_resources(include_external) { - for atom in r.to_atoms() { - self.add_atom_to_index(&atom, &r) - .map_err(|e| format!("Failed to add atom to index {}. {}", atom, e))?; + /// Takes a Commit and applies it to the Store. + /// This includes changing the resource, writing the changes, verifying the checks specified in your CommitOpts + /// The returned CommitResponse contains the new resource and the saved Commit Resource. + fn apply_commit( + &self, + commit: crate::Commit, + opts: &crate::commit::CommitOpts, + ) -> AtomicResult { + let applied = commit.validate_and_build_response(opts, self)?; + + self.add_resource(&applied.commit_resource)?; + + match (&applied.resource_old, &applied.resource_new) { + (None, None) => { + return Err("Neither an old nor a new resource is returned from the commit - something went wrong.".into()) + }, + (None, Some(new)) => { + self.add_resource(new)?; + }, + (Some(_old), Some(new)) => { + self.add_resource(new)?; + }, + (Some(_old), None) => { + assert_eq!(_old.get_subject(), &applied.commit.subject); + self.remove_resource(&applied.commit.subject)?; } } - tracing::info!("Building index finished!"); - Ok(()) + + Ok(applied) } /// Returns a single [Value] from a [Resource] fn get_value(&self, subject: &str, property: &str) -> AtomicResult { self.get_resource(subject) - .and_then(|r| r.get(property).map(|v| v.clone())) + .and_then(|r| r.get(property).cloned()) } /// Returns the base URL where the default store is. @@ -340,11 +352,6 @@ pub trait Storelike: Sized { /// Search the Store, returns the matching subjects. fn query(&self, q: &Query) -> AtomicResult; - /// Removes an Atom from the PropSubjectMap. - fn remove_atom_from_index(&self, _atom: &Atom, _resource: &Resource) -> AtomicResult<()> { - Ok(()) - } - /// Sets the default Agent for applying commits. fn set_default_agent(&self, agent: crate::agents::Agent); diff --git a/lib/src/utils.rs b/lib/src/utils.rs index 55df89fbc..d40fb1096 100644 --- a/lib/src/utils.rs +++ b/lib/src/utils.rs @@ -45,3 +45,15 @@ pub fn random_string(n: usize) -> String { .collect(); random_string.to_lowercase() } + +pub fn check_timestamp_in_past(timestamp: i64, difference: i64) -> AtomicResult<()> { + let now = crate::utils::now(); + if timestamp > now + difference { + return Err(format!( + "Commit CreatedAt timestamp must lie in the past. Check your clock. Timestamp now: {} CreatedAt is: {}", + now, timestamp + ) + .into()); + } + return Ok(()); +} diff --git a/server/build.rs b/server/build.rs index 44f836333..a3d3234b8 100644 --- a/server/build.rs +++ b/server/build.rs @@ -18,6 +18,8 @@ struct Dirs { } fn main() -> std::io::Result<()> { + // Uncomment this line if you want faster builds during development + // return Ok(()); const BROWSER_ROOT: &str = "../browser/"; let dirs: Dirs = { Dirs { diff --git a/server/src/commit_monitor.rs b/server/src/commit_monitor.rs index e923fb913..4e18a61ca 100644 --- a/server/src/commit_monitor.rs +++ b/server/src/commit_monitor.rs @@ -103,7 +103,7 @@ impl CommitMonitor { /// and update the value index. /// The search index is only updated if the last search commit is 15 seconds or older. fn handle_internal(&mut self, msg: CommitMessage) -> AtomicServerResult<()> { - let target = msg.commit_response.commit_struct.subject.clone(); + let target = msg.commit_response.commit.subject.clone(); // Notify websocket listeners if let Some(subscribers) = self.subscriptions.get(&target) { diff --git a/server/src/handlers/commit.rs b/server/src/handlers/commit.rs index 2ef23b35b..43a7e2797 100644 --- a/server/src/handlers/commit.rs +++ b/server/src/handlers/commit.rs @@ -36,7 +36,7 @@ pub async fn post_commit( validate_for_agent: Some(incoming_commit.signer.to_string()), update_index: true, }; - let commit_response = incoming_commit.apply_opts(store, &opts)?; + let commit_response = store.apply_commit(incoming_commit, &opts)?; let message = commit_response.commit_resource.to_json_ad()?; diff --git a/server/src/serve.rs b/server/src/serve.rs index e7d0123f4..9f7440375 100644 --- a/server/src/serve.rs +++ b/server/src/serve.rs @@ -1,6 +1,5 @@ use actix_cors::Cors; use actix_web::{middleware, web, HttpServer}; -use atomic_lib::Storelike; use crate::errors::AtomicServerResult;