Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for parsing Firestore database references from string #24

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/firestore-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ futures = { workspace = true }
googleapis = { workspace = true }
itertools = { workspace = true }
string_cache = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
rstest = "0.18.2"
55 changes: 26 additions & 29 deletions crates/firestore-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use self::{
field_path::FieldPath,
listener::Listener,
query::Query,
reference::{CollectionRef, DocumentRef, Ref, RootRef},
transaction::{RunningTransactions, Transaction, TransactionId},
};
use crate::{
Expand All @@ -40,19 +41,22 @@ pub mod event;
mod field_path;
mod listener;
mod query;
pub mod reference;
mod transaction;

const MAX_EVENT_BACKLOG: usize = 1024;

pub struct Database {
pub name: RootRef,
collections: RwLock<HashMap<DefaultAtom, Arc<Collection>>>,
transactions: RunningTransactions,
events: broadcast::Sender<Arc<DatabaseEvent>>,
}

impl Database {
pub fn new() -> Arc<Self> {
pub fn new(name: RootRef) -> Arc<Self> {
Arc::new_cyclic(|database| Database {
name,
collections: Default::default(),
transactions: RunningTransactions::new(Weak::clone(database)),
events: broadcast::channel(MAX_EVENT_BACKLOG).0,
Expand All @@ -64,10 +68,10 @@ impl Database {
#[instrument(skip_all, err, fields(in_txn = consistency.is_transaction(), found))]
pub async fn get_doc(
&self,
name: &DefaultAtom,
name: &DocumentRef,
consistency: &ReadConsistency,
) -> Result<Option<Document>> {
info!(name = name.deref());
info!(%name);
let version = if let Some(txn) = self.get_txn_for_consistency(consistency).await? {
txn.read_doc(name)
.await?
Expand All @@ -85,28 +89,32 @@ impl Database {
Ok(version)
}

pub async fn get_collection(&self, collection_name: &DefaultAtom) -> Arc<Collection> {
pub async fn get_collection(&self, collection_name: &CollectionRef) -> Arc<Collection> {
debug_assert_eq!(self.name, collection_name.root_ref);
Arc::clone(
&*self
.collections
.get_or_insert(collection_name, || {
Arc::new(Collection::new(collection_name.into()))
.get_or_insert(&collection_name.collection_id, || {
Arc::new(Collection::new(collection_name.clone()))
})
.await,
)
}

#[instrument(skip_all, err)]
pub async fn get_doc_meta(&self, name: &DefaultAtom) -> Result<Arc<DocumentMeta>> {
let collection = collection_name(name)?;
let meta = self.get_collection(&collection).await.get_doc(name).await;
pub async fn get_doc_meta(&self, name: &DocumentRef) -> Result<Arc<DocumentMeta>> {
let meta = self
.get_collection(&name.collection_ref)
.await
.get_doc(name)
.await;
Ok(meta)
}

#[instrument(skip_all, err)]
pub async fn get_doc_meta_mut_no_txn(
&self,
name: &DefaultAtom,
name: &DocumentRef,
) -> Result<OwnedDocumentContentsWriteGuard> {
self.get_doc_meta(name)
.await?
Expand Down Expand Up @@ -134,7 +142,7 @@ impl Database {
}

#[instrument(skip_all)]
pub async fn get_collection_ids(&self, parent: &DefaultAtom) -> Result<Vec<DefaultAtom>> {
pub async fn get_collection_ids(&self, parent_doc: &DocumentRef) -> Result<Vec<DefaultAtom>> {
// Get all collections asap in order to keep the read lock time minimal.
let all_collections = self
.collections
Expand All @@ -146,11 +154,7 @@ impl Database {
// Cannot use `filter_map` because of the `await`.
let mut result = vec![];
for col in all_collections {
let Some(path) = col
.name
.strip_prefix(parent.deref())
.and_then(|p| p.strip_prefix('/'))
else {
let Some(path) = col.name.strip_document_prefix(parent_doc) else {
continue;
};
if col.has_doc().await? {
Expand All @@ -163,13 +167,14 @@ impl Database {
#[instrument(skip_all, err)]
pub async fn run_query(
&self,
parent: String,
parent: Ref,
query: StructuredQuery,
consistency: ReadConsistency,
) -> Result<Vec<Document>> {
let mut query = Query::from_structured(parent, query, consistency)?;
info!(?query);
query.once(self).await
let result = query.once(self).await?;
Ok(result.into_iter().map(|t| t.1).collect())
}

#[instrument(skip_all, err)]
Expand All @@ -183,7 +188,7 @@ impl Database {

let mut write_results = vec![];
let mut updates = HashMap::new();
let mut write_guard_cache = HashMap::<DefaultAtom, OwnedDocumentContentsWriteGuard>::new();
let mut write_guard_cache = HashMap::<DocumentRef, OwnedDocumentContentsWriteGuard>::new();
// This must be done in two phases. First acquire the lock on all docs, only then start to
// update them.
for write in &writes {
Expand Down Expand Up @@ -391,15 +396,7 @@ fn apply_transform(
Ok(result)
}

fn collection_name(name: &DefaultAtom) -> Result<DefaultAtom> {
Ok(name
.rsplit_once('/')
.ok_or_else(|| Status::invalid_argument("invalid document path, missing collection-name"))?
.0
.into())
}

pub fn get_doc_name_from_write(write: &Write) -> Result<DefaultAtom> {
pub fn get_doc_name_from_write(write: &Write) -> Result<DocumentRef> {
let operation = write
.operation
.as_ref()
Expand All @@ -410,7 +407,7 @@ pub fn get_doc_name_from_write(write: &Write) -> Result<DefaultAtom> {
Delete(name) => name,
Transform(trans) => &trans.document,
};
Ok(DefaultAtom::from(name))
Ok(name.parse()?)
}

#[derive(Clone, Debug)]
Expand Down
16 changes: 10 additions & 6 deletions crates/firestore-database/src/database/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@ use tokio::sync::RwLock;
use tonic::Result;
use tracing::instrument;

use super::document::DocumentMeta;
use super::{
document::DocumentMeta,
reference::{CollectionRef, DocumentRef},
};
use crate::utils::RwLockHashMapExt;

pub struct Collection {
pub name: DefaultAtom,
pub name: CollectionRef,
documents: RwLock<HashMap<DefaultAtom, Arc<DocumentMeta>>>,
}

impl Collection {
#[instrument(skip_all)]
pub fn new(name: DefaultAtom) -> Self {
pub fn new(name: CollectionRef) -> Self {
Self {
name,
documents: Default::default(),
}
}

pub async fn get_doc(self: &Arc<Self>, name: &DefaultAtom) -> Arc<DocumentMeta> {
pub async fn get_doc(self: &Arc<Self>, name: &DocumentRef) -> Arc<DocumentMeta> {
debug_assert_eq!(self.name, name.collection_ref);
Arc::clone(
self.documents
.get_or_insert(name, || {
Arc::new(DocumentMeta::new(name.clone(), self.name.clone()))
.get_or_insert(&name.document_id, || {
Arc::new(DocumentMeta::new(name.clone()))
})
.await
.deref(),
Expand Down
Loading
Loading