Skip to content

Commit

Permalink
Merge pull request #24 from skunkteam/Add-reference-struct-implementa…
Browse files Browse the repository at this point in the history
…tion

feat: add support for parsing Firestore database references from string
  • Loading branch information
pavadeli authored Mar 8, 2024
2 parents af69ef1 + 0659017 commit 37634a5
Show file tree
Hide file tree
Showing 14 changed files with 748 additions and 200 deletions.
65 changes: 65 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/firestore-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ futures = { workspace = true }
googleapis = { workspace = true }
itertools = { workspace = true }
string_cache = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
rstest = "0.18.2"
55 changes: 26 additions & 29 deletions crates/firestore-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use self::{
field_path::FieldPath,
listener::Listener,
query::Query,
reference::{CollectionRef, DocumentRef, Ref, RootRef},
transaction::{RunningTransactions, Transaction, TransactionId},
};
use crate::{
Expand All @@ -40,19 +41,22 @@ pub mod event;
mod field_path;
mod listener;
mod query;
pub mod reference;
mod transaction;

const MAX_EVENT_BACKLOG: usize = 1024;

pub struct Database {
pub name: RootRef,
collections: RwLock<HashMap<DefaultAtom, Arc<Collection>>>,
transactions: RunningTransactions,
events: broadcast::Sender<Arc<DatabaseEvent>>,
}

impl Database {
pub fn new() -> Arc<Self> {
pub fn new(name: RootRef) -> Arc<Self> {
Arc::new_cyclic(|database| Database {
name,
collections: Default::default(),
transactions: RunningTransactions::new(Weak::clone(database)),
events: broadcast::channel(MAX_EVENT_BACKLOG).0,
Expand All @@ -64,10 +68,10 @@ impl Database {
#[instrument(skip_all, err, fields(in_txn = consistency.is_transaction(), found))]
pub async fn get_doc(
&self,
name: &DefaultAtom,
name: &DocumentRef,
consistency: &ReadConsistency,
) -> Result<Option<Document>> {
info!(name = name.deref());
info!(%name);
let version = if let Some(txn) = self.get_txn_for_consistency(consistency).await? {
txn.read_doc(name)
.await?
Expand All @@ -85,28 +89,32 @@ impl Database {
Ok(version)
}

pub async fn get_collection(&self, collection_name: &DefaultAtom) -> Arc<Collection> {
pub async fn get_collection(&self, collection_name: &CollectionRef) -> Arc<Collection> {
debug_assert_eq!(self.name, collection_name.root_ref);
Arc::clone(
&*self
.collections
.get_or_insert(collection_name, || {
Arc::new(Collection::new(collection_name.into()))
.get_or_insert(&collection_name.collection_id, || {
Arc::new(Collection::new(collection_name.clone()))
})
.await,
)
}

#[instrument(skip_all, err)]
pub async fn get_doc_meta(&self, name: &DefaultAtom) -> Result<Arc<DocumentMeta>> {
let collection = collection_name(name)?;
let meta = self.get_collection(&collection).await.get_doc(name).await;
pub async fn get_doc_meta(&self, name: &DocumentRef) -> Result<Arc<DocumentMeta>> {
let meta = self
.get_collection(&name.collection_ref)
.await
.get_doc(name)
.await;
Ok(meta)
}

#[instrument(skip_all, err)]
pub async fn get_doc_meta_mut_no_txn(
&self,
name: &DefaultAtom,
name: &DocumentRef,
) -> Result<OwnedDocumentContentsWriteGuard> {
self.get_doc_meta(name)
.await?
Expand Down Expand Up @@ -134,7 +142,7 @@ impl Database {
}

#[instrument(skip_all)]
pub async fn get_collection_ids(&self, parent: &DefaultAtom) -> Result<Vec<DefaultAtom>> {
pub async fn get_collection_ids(&self, parent_doc: &DocumentRef) -> Result<Vec<DefaultAtom>> {
// Get all collections asap in order to keep the read lock time minimal.
let all_collections = self
.collections
Expand All @@ -146,11 +154,7 @@ impl Database {
// Cannot use `filter_map` because of the `await`.
let mut result = vec![];
for col in all_collections {
let Some(path) = col
.name
.strip_prefix(parent.deref())
.and_then(|p| p.strip_prefix('/'))
else {
let Some(path) = col.name.strip_document_prefix(parent_doc) else {
continue;
};
if col.has_doc().await? {
Expand All @@ -163,13 +167,14 @@ impl Database {
#[instrument(skip_all, err)]
pub async fn run_query(
&self,
parent: String,
parent: Ref,
query: StructuredQuery,
consistency: ReadConsistency,
) -> Result<Vec<Document>> {
let mut query = Query::from_structured(parent, query, consistency)?;
info!(?query);
query.once(self).await
let result = query.once(self).await?;
Ok(result.into_iter().map(|t| t.1).collect())
}

#[instrument(skip_all, err)]
Expand All @@ -183,7 +188,7 @@ impl Database {

let mut write_results = vec![];
let mut updates = HashMap::new();
let mut write_guard_cache = HashMap::<DefaultAtom, OwnedDocumentContentsWriteGuard>::new();
let mut write_guard_cache = HashMap::<DocumentRef, OwnedDocumentContentsWriteGuard>::new();
// This must be done in two phases. First acquire the lock on all docs, only then start to
// update them.
for write in &writes {
Expand Down Expand Up @@ -391,15 +396,7 @@ fn apply_transform(
Ok(result)
}

fn collection_name(name: &DefaultAtom) -> Result<DefaultAtom> {
Ok(name
.rsplit_once('/')
.ok_or_else(|| Status::invalid_argument("invalid document path, missing collection-name"))?
.0
.into())
}

pub fn get_doc_name_from_write(write: &Write) -> Result<DefaultAtom> {
pub fn get_doc_name_from_write(write: &Write) -> Result<DocumentRef> {
let operation = write
.operation
.as_ref()
Expand All @@ -410,7 +407,7 @@ pub fn get_doc_name_from_write(write: &Write) -> Result<DefaultAtom> {
Delete(name) => name,
Transform(trans) => &trans.document,
};
Ok(DefaultAtom::from(name))
Ok(name.parse()?)
}

#[derive(Clone, Debug)]
Expand Down
16 changes: 10 additions & 6 deletions crates/firestore-database/src/database/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@ use tokio::sync::RwLock;
use tonic::Result;
use tracing::instrument;

use super::document::DocumentMeta;
use super::{
document::DocumentMeta,
reference::{CollectionRef, DocumentRef},
};
use crate::utils::RwLockHashMapExt;

pub struct Collection {
pub name: DefaultAtom,
pub name: CollectionRef,
documents: RwLock<HashMap<DefaultAtom, Arc<DocumentMeta>>>,
}

impl Collection {
#[instrument(skip_all)]
pub fn new(name: DefaultAtom) -> Self {
pub fn new(name: CollectionRef) -> Self {
Self {
name,
documents: Default::default(),
}
}

pub async fn get_doc(self: &Arc<Self>, name: &DefaultAtom) -> Arc<DocumentMeta> {
pub async fn get_doc(self: &Arc<Self>, name: &DocumentRef) -> Arc<DocumentMeta> {
debug_assert_eq!(self.name, name.collection_ref);
Arc::clone(
self.documents
.get_or_insert(name, || {
Arc::new(DocumentMeta::new(name.clone(), self.name.clone()))
.get_or_insert(&name.document_id, || {
Arc::new(DocumentMeta::new(name.clone()))
})
.await
.deref(),
Expand Down
Loading

0 comments on commit 37634a5

Please sign in to comment.