Skip to content

Commit

Permalink
#529 WIP propvalsub index
Browse files Browse the repository at this point in the history
  • Loading branch information
joepio committed Nov 1, 2022
1 parent 3b67266 commit d2e73fb
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 332 deletions.
2 changes: 1 addition & 1 deletion cli/src/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn print_resource(
Format::Json => resource.to_json(&context.store)?,
Format::JsonLd => resource.to_json_ld(&context.store)?,
Format::JsonAd => resource.to_json_ad()?,
Format::NTriples => serialize::atoms_to_ntriples(resource.to_atoms()?, &context.store)?,
Format::NTriples => serialize::atoms_to_ntriples(resource.to_atoms(), &context.store)?,
Format::Pretty => pretty_print_resource(resource, &context.store)?,
};
println!("{}", out);
Expand Down
38 changes: 34 additions & 4 deletions lib/src/atoms.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! The smallest units of data, consiting of a Subject, a Property and a Value
//! The smallest units of data, consisting of a Subject, a Property and a Value
use crate::{errors::AtomicResult, values::Value};
use crate::{
errors::AtomicResult,
values::{ReferenceString, Value},
};

/// The Atom is the (non-validated) string representation of a piece of data.
/// It's RichAtom sibling provides some extra methods.
/// The Atom is the smallest meaningful piece of data.
/// It describes how one value relates to a subject.
/// A [Resource] can be converted into a bunch of Atoms.
#[derive(Clone, Debug)]
pub struct Atom {
/// The URL where the resource is located
Expand All @@ -27,6 +31,32 @@ impl Atom {
let base_path = format!("{} {}", self.subject, self.property);
self.value.to_subjects(Some(base_path))
}

/// Converts one Atom to a series of stringified values that can be indexed.
pub fn to_indexable_atoms(&self) -> Vec<IndexAtom> {
let index_atoms = match &self.value.to_reference_index_strings() {
Some(v) => v,
None => return vec![],
}
.iter()
.map(|v| IndexAtom {
value: v.into(),
subject: self.subject.clone(),
property: self.property.clone(),
})
.collect();
index_atoms
}
}

/// Differs from a regular [Atom], since the value here is always a string,
/// and in the case of ResourceArrays, only a _single_ subject is used for each atom.
/// One IndexAtom for every member of the ResourceArray is created.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IndexAtom {
pub subject: String,
pub property: String,
pub value: ReferenceString,
}

impl std::fmt::Display for Atom {
Expand Down
10 changes: 7 additions & 3 deletions lib/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,22 @@ impl Commit {
// Remove all atoms from index if destroy
if let Some(destroy) = self.destroy {
if destroy {
for atom in resource.to_atoms()?.into_iter() {
for atom in resource.to_atoms().into_iter() {
remove_atoms.push(atom);
}
}
}

if update_index {
for atom in remove_atoms {
store.remove_atom_from_index(&atom, &resource_unedited)?;
store
.remove_atom_from_index(&atom, &resource_unedited)
.map_err(|e| format!("Error removing atom from index: {e} Atom: {e}"))?
}
for atom in add_atoms {
store.add_atom_to_index(&atom, &resource)?;
store
.add_atom_to_index(&atom, &resource)
.map_err(|e| format!("Error adding atom to index: {e} Atom: {e}"))?;
}
}
Ok(resource)
Expand Down
192 changes: 65 additions & 127 deletions lib/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
//! Persistent, ACID compliant, threadsafe to-disk store.
//! Powered by Sled - an embedded database.
mod migrations;
mod prop_val_sub_index;
mod query_index;
mod reference_index;
#[cfg(test)]
pub mod test;

use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};

use tracing::{instrument, trace};
use tracing::{info, instrument, trace};

use crate::{
atoms::IndexAtom,
commit::CommitResponse,
db::reference_index::key_to_atom,
endpoints::{default_endpoints, Endpoint},
errors::{AtomicError, AtomicResult},
resources::PropVals,
Expand All @@ -19,20 +28,20 @@ use crate::{

use self::{
migrations::migrate_maybe,
prop_val_sub_index::{
add_atom_to_prop_val_sub_index, find_in_prop_val_sub_index,
remove_atom_from_prop_val_sub_index,
},
query_index::{
atom_to_indexable_atoms, check_if_atom_matches_watched_query_filters, query_indexed,
update_indexed_member, watch_collection, IndexAtom, QueryFilter, END_CHAR,
check_if_atom_matches_watched_query_filters, query_indexed, update_indexed_member,
watch_collection, IndexIterator, QueryFilter,
},
reference_index::{add_atom_to_reference_index, remove_atom_from_reference_index},
};

// A function called by the Store when a Commit is accepted
type HandleCommit = Box<dyn Fn(&CommitResponse) + Send + Sync>;

mod migrations;
mod query_index;
#[cfg(test)]
pub mod test;

/// Inside the reference_index, each value is mapped to this type.
/// The String on the left represents a Property URL, and the second one is the set of subjects.
pub type PropSubjectMap = HashMap<String, HashSet<String>>;
Expand All @@ -54,13 +63,15 @@ pub struct Db {
default_agent: Arc<Mutex<Option<crate::agents::Agent>>>,
/// Stores all resources. The Key is the Subject as a `string.as_bytes()`, the value a [PropVals]. Propvals must be serialized using [bincode].
resources: sled::Tree,
/// Index for all AtomicURLs, indexed by their Value. Used to speed up TPF queries. See [key_for_reference_index]
/// Index of all Atoms, sorted by {Value}-{Property}-{Subject}.
/// See [reference_index]
reference_index: sled::Tree,
/// Index sorted by property + value.
/// Used for TPF queries where the property is known.
prop_val_sub_index: sled::Tree,
/// Stores the members of Collections, easily sortable.
/// See [collections_index]
members_index: sled::Tree,
/// A list of all the Collections currently being used. Is used to update `members_index`.
/// See [collections_index]
query_index: sled::Tree,
/// A list of all the Collections currently being used. Is used to update `query_index`.
watched_queries: sled::Tree,
/// The address where the db will be hosted, e.g. http://localhost/
server_url: String,
Expand All @@ -78,14 +89,16 @@ impl Db {
let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?;
let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?;
let reference_index = db.open_tree("reference_index")?;
let members_index = db.open_tree("members_index")?;
let query_index = db.open_tree("members_index")?;
let prop_val_sub_index = db.open_tree("prop_val_sub_index")?;
let watched_queries = db.open_tree("watched_queries")?;
let store = Db {
db,
default_agent: Arc::new(Mutex::new(None)),
resources,
reference_index,
members_index,
query_index,
prop_val_sub_index,
server_url,
watched_queries,
endpoints: default_endpoints(),
Expand All @@ -112,6 +125,22 @@ impl Db {
Ok(store)
}

#[instrument(skip(self))]
fn all_index_atoms(&self, include_external: bool) -> IndexIterator {
Box::new(
self.all_resources(include_external)
.flat_map(|resource| {
let index_atoms: Vec<IndexAtom> = resource
.to_atoms()
.iter()
.flat_map(|atom| atom.to_indexable_atoms())
.collect();
index_atoms
})
.map(Ok),
)
}

/// Internal method for fetching Resource data.
#[instrument(skip(self))]
fn set_propvals(&self, subject: &str, propvals: &PropVals) -> AtomicResult<()> {
Expand Down Expand Up @@ -152,15 +181,11 @@ impl Db {
}
}

/// Returns true if the index has been built.
pub fn has_index(&self) -> bool {
!self.reference_index.is_empty()
}

/// Removes all values from the indexes.
pub fn clear_index(&self) -> AtomicResult<()> {
self.reference_index.clear()?;
self.members_index.clear()?;
self.prop_val_sub_index.clear()?;
self.query_index.clear()?;
self.watched_queries.clear()?;
Ok(())
}
Expand Down Expand Up @@ -216,9 +241,9 @@ impl Storelike for Db {

#[instrument(skip(self))]
fn add_atom_to_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> {
for index_atom in atom_to_indexable_atoms(atom)? {
// It's OK if this overwrites a value
for index_atom in atom.to_indexable_atoms() {
add_atom_to_reference_index(&index_atom, self)?;
add_atom_to_prop_val_sub_index(&index_atom, self)?;
// Also update the query index to keep collections performant
check_if_atom_matches_watched_query_filters(self, &index_atom, atom, false, resource)
.map_err(|e| {
Expand Down Expand Up @@ -261,7 +286,7 @@ impl Storelike for Db {
})?;
}
}
for a in resource.to_atoms()? {
for a in resource.to_atoms() {
self.add_atom_to_index(&a, resource)
.map_err(|e| format!("Failed to add atom to index {}. {}", a, e))?;
}
Expand All @@ -271,8 +296,9 @@ impl Storelike for Db {

#[instrument(skip(self))]
fn remove_atom_from_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> {
for index_atom in atom_to_indexable_atoms(atom)? {
delete_atom_from_reference_index(&index_atom, self)?;
for index_atom in atom.to_indexable_atoms() {
remove_atom_from_reference_index(&index_atom, self)?;
remove_atom_from_prop_val_sub_index(&index_atom, self)?;

check_if_atom_matches_watched_query_filters(self, &index_atom, atom, true, resource)
.map_err(|e| format!("Checking atom went wrong: {}", e))?;
Expand Down Expand Up @@ -455,75 +481,21 @@ impl Storelike for Db {
}
}

// No cache hit, perform the query
let mut atoms = self.tpf(
None,
q.property.as_deref(),
q.value.as_ref(),
// We filter later on, not here
true,
)?;
let count = atoms.len();

let mut subjects = Vec::new();
let mut resources = Vec::new();
for atom in atoms.iter() {
// These nested resources are not fully calculated - they will be presented as -is
subjects.push(atom.subject.clone());
// We need the Resources if we want to sort by a non-subject value
if q.include_nested || q.sort_by.is_some() {
// We skip checking for Agent, because we don't return these results directly anyway
match self.get_resource_extended(&atom.subject, true, None) {
Ok(resource) => {
resources.push(resource);
}
Err(e) => match &e.error_type {
crate::AtomicErrorType::NotFoundError => {}
crate::AtomicErrorType::UnauthorizedError => {}
_err => {
return Err(
format!("Error when getting resource in collection: {}", e).into()
)
}
},
}
}
}

if atoms.is_empty() {
return Ok(QueryResult {
subjects: vec![],
resources: vec![],
count,
});
}

// If there is a sort value, we need to change the atoms to contain that sorted value, instead of the one matched in the TPF query
if let Some(sort_prop) = &q.sort_by {
// We don't use the existing array, we clear it.
atoms = Vec::new();
for r in &resources {
// Users _can_ sort by optional properties! So we need a fallback defauil
let fallback_default = crate::Value::String(END_CHAR.into());
let sorted_val = r.get(sort_prop).unwrap_or(&fallback_default);
let atom = Atom {
subject: r.get_subject().to_string(),
property: sort_prop.to_string(),
value: sorted_val.to_owned(),
};
atoms.push(atom)
}
// Now we sort by the value that the user wants to sort by
atoms.sort_by(|a, b| a.value.to_string().cmp(&b.value.to_string()));
}

let q_filter: QueryFilter = q.into();

// Maybe make this optional?
watch_collection(self, &q_filter)?;

// Add the atoms to the query_index
for atom in atoms {
info!(filter = ?q_filter, "Building query index");

let atoms: IndexIterator = match (&q.property, q.value.as_ref()) {
(Some(prop), val) => find_in_prop_val_sub_index(self, prop, val),
(None, None) => self.all_index_atoms(q.include_external),
(None, Some(_)) => todo!(),
};

for a in atoms {
let atom = a?;
update_indexed_member(self, &q_filter, &atom.subject, &atom.value, false)?;
}

Expand Down Expand Up @@ -663,7 +635,7 @@ impl Storelike for Db {
find_in_resource(&resource);
Ok(vec)
} else {
resource.to_atoms()
Ok(resource.to_atoms())
}
}
Err(_) => Ok(vec),
Expand Down Expand Up @@ -697,42 +669,8 @@ impl Storelike for Db {
}
}

#[instrument(skip(store))]
fn add_atom_to_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
let _existing = store
.reference_index
.insert(key_for_reference_index(index_atom).as_bytes(), b"")?;
Ok(())
}

#[instrument(skip(store))]
fn delete_atom_from_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
store
.reference_index
.remove(key_for_reference_index(index_atom).as_bytes())?;
Ok(())
}

/// Constructs the Key for the index_value cache.
fn key_for_reference_index(atom: &IndexAtom) -> String {
format!("{}\n{}\n{}", atom.value, atom.property, atom.subject)
}

/// Parses a Value index key string, converts it into an atom. Note that the Value of the atom will allways be a single AtomicURL here.
fn key_to_atom(key: &str) -> AtomicResult<Atom> {
let mut parts = key.split('\n');
let val = parts.next().ok_or("Invalid key for value index")?;
let prop = parts.next().ok_or("Invalid key for value index")?;
let subj = parts.next().ok_or("Invalid key for value index")?;
Ok(Atom::new(
subj.into(),
prop.into(),
Value::AtomicUrl(val.into()),
))
}

fn corrupt_db_message(subject: &str) -> String {
format!("Could not deserialize item {} from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export / serialize your data and import your data again.", subject)
format!("Could not deserialize item {} from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export your data and import your data again.", subject)
}

const DB_CORRUPT_MSG: &str = "Could not deserialize item from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export / serialize your data and import your data again.";
const DB_CORRUPT_MSG: &str = "Could not deserialize item from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export your data and import your data again.";
Loading

0 comments on commit d2e73fb

Please sign in to comment.