diff --git a/TODO.md b/TODO.md index c2f2235..f1a53b2 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,4 @@ - common-document-store-backend - - UNIMPLEMENTED: run_aggregation_query is not supported yet - wao-claim-appengine-backend - - UNIMPLEMENTED: mask is not supported yet - UNIMPLEMENTED: target_id should always be 1 is not supported yet - polis-soc-reg-backend - ??? diff --git a/crates/emulator-grpc/src/lib.rs b/crates/emulator-grpc/src/lib.rs index 6daa756..f6cd934 100644 --- a/crates/emulator-grpc/src/lib.rs +++ b/crates/emulator-grpc/src/lib.rs @@ -3,6 +3,7 @@ use std::{mem, sync::Arc}; use firestore_database::{ event::DatabaseEvent, get_doc_name_from_write, + projection::{Project, Projection}, read_consistency::ReadConsistency, reference::{DocumentRef, Ref}, FirestoreDatabase, FirestoreProject, @@ -59,10 +60,11 @@ impl firestore_server::Firestore for FirestoreEmulator { mask, consistency_selector, } = request.into_inner(); - unimplemented_option!(mask); let name: DocumentRef = name.parse()?; + let projection = mask.map(Projection::try_from).transpose()?; + let doc = self .project .database(&name.collection_ref.root_ref) @@ -70,7 +72,7 @@ impl firestore_server::Firestore for FirestoreEmulator { .get_doc(&name, &consistency_selector.try_into()?) .await? .ok_or_else(|| Status::not_found(Code::NotFound.description()))?; - Ok(Response::new(doc)) + Ok(Response::new(projection.project(&doc))) } /// Server streaming response type for the BatchGetDocuments method. @@ -94,13 +96,13 @@ impl firestore_server::Firestore for FirestoreEmulator { mask, consistency_selector, } = request.into_inner(); - unimplemented_option!(mask); let database = self.project.database(&database.parse()?).await; let documents: Vec<_> = documents .into_iter() .map(|name| name.parse::()) .try_collect()?; + let projection = mask.map(Projection::try_from).transpose()?; let ( // Only used for new transactions. @@ -125,7 +127,7 @@ impl firestore_server::Firestore for FirestoreEmulator { Ok(doc) => Ok(BatchGetDocumentsResponse { result: Some(match doc { None => Missing(name.to_string()), - Some(doc) => Found(doc), + Some(doc) => Found(projection.project(&doc)), }), read_time: Some(Timestamp::now()), transaction: mem::take(&mut new_transaction), diff --git a/crates/emulator-ui/src/routes/emulator.rs b/crates/emulator-ui/src/routes/emulator.rs index 3dc5a40..0c33d7f 100644 --- a/crates/emulator-ui/src/routes/emulator.rs +++ b/crates/emulator-ui/src/routes/emulator.rs @@ -45,7 +45,7 @@ async fn get_by_ref( .into_response()), Ref::Document(r) => Ok(Json(json!({ "type": "document", - "document": database.get_doc(&r, &ReadConsistency::Default).await?, + "document": database.get_doc(&r, &ReadConsistency::Default).await?.map(|d| d.to_document()), "collections": database.get_collection_ids(&Ref::Document(r)).await?, })) .into_response()), diff --git a/crates/firestore-database/src/database.rs b/crates/firestore-database/src/database.rs index f852e65..baf4075 100644 --- a/crates/firestore-database/src/database.rs +++ b/crates/firestore-database/src/database.rs @@ -1,6 +1,5 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - ops::Deref, sync::{Arc, Weak}, }; @@ -25,7 +24,10 @@ use tracing::{info, instrument, Span}; use self::{ collection::Collection, - document::{DocumentContents, DocumentMeta, DocumentVersion, OwnedDocumentContentsWriteGuard}, + document::{ + DocumentContents, DocumentMeta, DocumentVersion, OwnedDocumentContentsWriteGuard, + StoredDocumentVersion, + }, event::DatabaseEvent, field_path::FieldPath, query::Query, @@ -42,6 +44,7 @@ mod collection; pub(crate) mod document; pub mod event; mod field_path; +pub mod projection; pub(crate) mod query; pub mod read_consistency; pub mod reference; @@ -78,20 +81,20 @@ impl FirestoreDatabase { &self, name: &DocumentRef, consistency: &ReadConsistency, - ) -> Result> { + ) -> Result>> { info!(%name); let version = if let Some(txn) = self.get_txn_for_consistency(consistency).await? { txn.read_doc(name) .await? .version_for_consistency(consistency)? - .map(|version| version.to_document()) + .map(Arc::clone) } else { self.get_doc_meta(name) .await? .read() .await? .version_for_consistency(consistency)? - .map(|version| version.to_document()) + .map(Arc::clone) }; Span::current().record("found", version.is_some()); Ok(version) @@ -230,10 +233,10 @@ impl FirestoreDatabase { info!(?query); let result = query.once(self).await?; info!(result_count = result.len()); - result + Ok(result .into_iter() .map(|version| query.project(&version)) - .try_collect() + .collect()) } #[instrument(skip_all, err)] @@ -394,7 +397,7 @@ impl FirestoreDatabase { operation, } = write; - let operation = operation.ok_or(GenericDatabaseError::not_implemented( + let operation = operation.ok_or(GenericDatabaseError::invalid_argument( "missing operation in write", ))?; let condition = current_document @@ -408,6 +411,8 @@ impl FirestoreDatabase { use write::Operation::*; let document_version = match operation { Update(doc) => { + info!(name = %contents.name, "Update"); + let mut fields = if let Some(mask) = update_mask { apply_updates(contents, mask, &doc.fields)? } else { @@ -428,6 +433,8 @@ impl FirestoreDatabase { contents.add_version(fields, commit_time.clone()).await } Delete(_) => { + info!(name = %contents.name, "Delete"); + unimplemented_option!(update_mask); unimplemented_collection!(update_transforms); contents.delete(commit_time.clone()).await @@ -495,7 +502,7 @@ fn apply_updates( .map(|v| v.fields.clone()) .unwrap_or_default(); for field_path in mask.field_paths { - let field_path: FieldPath = field_path.deref().try_into()?; + let field_path: FieldPath = field_path.parse()?; match field_path.get_value(updated_values) { Some(new_value) => field_path.set_value(&mut fields, new_value.clone()), None => { @@ -512,7 +519,7 @@ fn apply_transform( transform: TransformType, commit_time: &Timestamp, ) -> Result { - let field_path: FieldPath = path.deref().try_into()?; + let field_path: FieldPath = path.parse()?; let result = match transform { TransformType::SetToServerValue(code) => { match ServerValue::try_from(code).map_err(|_| { diff --git a/crates/firestore-database/src/database/document.rs b/crates/firestore-database/src/database/document.rs index 1d3edd4..e020d5d 100644 --- a/crates/firestore-database/src/database/document.rs +++ b/crates/firestore-database/src/database/document.rs @@ -18,7 +18,7 @@ use tokio::{ }, time::{error::Elapsed, timeout}, }; -use tracing::{instrument, trace, Level}; +use tracing::{info, instrument, trace, Level}; use super::{read_consistency::ReadConsistency, reference::DocumentRef}; use crate::{error::Result, GenericDatabaseError}; @@ -182,6 +182,16 @@ impl DocumentContents { update_time, fields, })); + if let Some(last) = self.versions.last_mut() { + if last.update_time() == version.update_time() { + last.clone_from(&version); + return version; + } + assert!( + last.update_time() < version.update_time(), + "update or commit time earlier than last version" + ); + } self.versions.push(version.clone()); version } @@ -209,6 +219,7 @@ pub type OwnedDocumentContentsWriteGuard = OwnedRwLockWriteGuard Result { + info!(name = %self.meta.name); let check_time = self.guard.last_updated(); let OwnedDocumentContentsReadGuard { meta, diff --git a/crates/firestore-database/src/database/field_path.rs b/crates/firestore-database/src/database/field_path.rs index 1417d08..bfd31c5 100644 --- a/crates/firestore-database/src/database/field_path.rs +++ b/crates/firestore-database/src/database/field_path.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::HashMap, convert::Infallible, mem::take, ops::Deref}; +use std::{borrow::Cow, collections::HashMap, convert::Infallible, mem::take, str::FromStr}; use googleapis::google::firestore::v1::*; @@ -35,17 +35,17 @@ impl TryFrom<&structured_query::FieldReference> for FieldReference { type Error = GenericDatabaseError; fn try_from(value: &structured_query::FieldReference) -> Result { - value.field_path.deref().try_into() + value.field_path.parse() } } -impl TryFrom<&str> for FieldReference { - type Error = GenericDatabaseError; +impl FromStr for FieldReference { + type Err = GenericDatabaseError; - fn try_from(path: &str) -> Result { + fn from_str(path: &str) -> Result { match path { DOC_NAME => Ok(Self::DocumentName), - path => Ok(Self::FieldPath((path.try_into())?)), + path => Ok(Self::FieldPath((path.parse())?)), } } } @@ -124,10 +124,10 @@ impl FieldPath { } } -impl TryFrom<&str> for FieldPath { - type Error = GenericDatabaseError; +impl FromStr for FieldPath { + type Err = GenericDatabaseError; - fn try_from(path: &str) -> Result { + fn from_str(path: &str) -> Result { if path.is_empty() { return Err(GenericDatabaseError::invalid_argument( "invalid empty field path", diff --git a/crates/firestore-database/src/database/projection.rs b/crates/firestore-database/src/database/projection.rs new file mode 100644 index 0000000..dfa6170 --- /dev/null +++ b/crates/firestore-database/src/database/projection.rs @@ -0,0 +1,63 @@ +use googleapis::google::firestore::v1::{structured_query, Document, DocumentMask}; +use itertools::Itertools; + +use super::field_path::FieldReference; +use crate::{document::StoredDocumentVersion, GenericDatabaseError}; + +#[derive(Debug)] +pub struct Projection { + fields: Vec, +} + +pub trait Project { + fn project(&self, version: &StoredDocumentVersion) -> Document; +} + +impl Project for Projection { + fn project(&self, version: &StoredDocumentVersion) -> Document { + let mut doc = Document { + fields: Default::default(), + create_time: Some(version.create_time.clone()), + update_time: Some(version.update_time.clone()), + name: version.name.to_string(), + }; + for field in &self.fields { + match field { + FieldReference::DocumentName => continue, + FieldReference::FieldPath(path) => { + if let Some(val) = path.get_value(&version.fields) { + path.set_value(&mut doc.fields, val.clone()); + } + } + } + } + doc + } +} + +impl Project for Option { + fn project(&self, version: &StoredDocumentVersion) -> Document { + match self { + Some(projection) => projection.project(version), + None => version.to_document(), + } + } +} + +impl TryFrom for Projection { + type Error = GenericDatabaseError; + + fn try_from(value: structured_query::Projection) -> Result { + let fields = value.fields.iter().map(TryInto::try_into).try_collect()?; + Ok(Self { fields }) + } +} + +impl TryFrom for Projection { + type Error = GenericDatabaseError; + + fn try_from(value: DocumentMask) -> Result { + let fields = value.field_paths.iter().map(|s| s.parse()).try_collect()?; + Ok(Self { fields }) + } +} diff --git a/crates/firestore-database/src/database/query.rs b/crates/firestore-database/src/database/query.rs index 362daa8..3156443 100644 --- a/crates/firestore-database/src/database/query.rs +++ b/crates/firestore-database/src/database/query.rs @@ -9,6 +9,7 @@ use super::{ collection::Collection, document::StoredDocumentVersion, field_path::FieldReference, + projection::{Project, Projection}, read_consistency::ReadConsistency, reference::{CollectionRef, Ref}, FirestoreDatabase, @@ -27,7 +28,7 @@ pub(crate) struct Query { /// This acts as a [DocumentMask][google.firestore.v1.DocumentMask] over the /// documents returned from a query. When not set, assumes that the caller /// wants all fields returned. - select: Option>, + select: Option, /// The collections to query. from: Vec, @@ -133,15 +134,7 @@ impl Query { limit, } = query; - let select = select - .map(|projection| { - projection - .fields - .iter() - .map(TryInto::try_into) - .try_collect() - }) - .transpose()?; + let select = select.map(Projection::try_from).transpose()?; let filter: Option = filter.map(TryInto::try_into).transpose()?; let mut order_by: Vec = order_by.into_iter().map(TryInto::try_into).try_collect()?; if order_by.is_empty() { @@ -286,37 +279,8 @@ impl Query { Ok(true) } - pub fn project(&self, version: &StoredDocumentVersion) -> Result { - let Some(projection) = &self.select else { - return Ok(version.to_document()); - }; - - match &projection[..] { - [] => Ok(version.to_document()), - [FieldReference::DocumentName] => Ok(Document { - fields: Default::default(), - create_time: Some(version.create_time.clone()), - update_time: Some(version.update_time.clone()), - name: version.name.to_string(), - }), - fields => { - let mut doc = Document { - fields: Default::default(), - create_time: Some(version.create_time.clone()), - update_time: Some(version.update_time.clone()), - name: version.name.to_string(), - }; - for field in fields { - let FieldReference::FieldPath(path) = field else { - continue; - }; - if let Some(val) = path.get_value(&version.fields) { - path.set_value(&mut doc.fields, val.clone()); - } - } - Ok(doc) - } - } + pub fn project(&self, version: &StoredDocumentVersion) -> Document { + self.select.project(version) } fn order_by_cmp( @@ -381,8 +345,7 @@ impl TryFrom for Order { .as_ref() .ok_or_else(|| GenericDatabaseError::invalid_argument("order_by without field"))? .field_path - .deref() - .try_into()?, + .parse()?, direction: value.direction().try_into()?, }) } diff --git a/crates/firestore-database/src/database/query/filter.rs b/crates/firestore-database/src/database/query/filter.rs index a4fe167..f50e1f7 100644 --- a/crates/firestore-database/src/database/query/filter.rs +++ b/crates/firestore-database/src/database/query/filter.rs @@ -1,5 +1,3 @@ -use std::ops::Deref; - use googleapis::google::firestore::v1::{structured_query, Value}; use itertools::Itertools; @@ -175,8 +173,7 @@ impl TryFrom for FieldFilter { .as_ref() .expect("missing field in FieldFilter") .field_path - .deref() - .try_into()?, + .parse()?, op: value.op().try_into()?, value: value.value.expect("missing value in FieldFilter"), }) @@ -336,7 +333,7 @@ impl TryFrom for UnaryFilter { .as_ref() .expect("missing operand_type in UnaryFilter"); Ok(Self { - field: field.field_path.deref().try_into()?, + field: field.field_path.parse()?, op: value.op().try_into()?, }) } diff --git a/crates/firestore-database/src/listener.rs b/crates/firestore-database/src/listener.rs index 507cb72..fa86781 100644 --- a/crates/firestore-database/src/listener.rs +++ b/crates/firestore-database/src/listener.rs @@ -244,7 +244,7 @@ impl Listener { let send_initial = match &send_if_newer_than { Some(previous_time) => !doc .as_ref() - .is_some_and(|v| (v.update_time.as_ref().unwrap()) <= (previous_time)), + .is_some_and(|v| (&v.update_time) <= (previous_time)), _ => true, }; @@ -252,7 +252,7 @@ impl Listener { // Response: This is the current version, whether you like it or not. let msg = match doc { Some(d) => ResponseType::DocumentChange(DocumentChange { - document: Some(d), + document: Some(d.to_document()), target_ids: vec![TARGET_ID], removed_target_ids: vec![], }), @@ -489,7 +489,7 @@ impl QueryTarget { }, ); msgs.push(ResponseType::DocumentChange(DocumentChange { - document: Some(self.query.project(&version)?), + document: Some(self.query.project(&version)), target_ids: vec![TARGET_ID], removed_target_ids: vec![], })) diff --git a/test-suite/tests/2-basic-query.test.ts b/test-suite/tests/2-basic-query.test.ts index a7b1a64..b5e63bc 100644 --- a/test-suite/tests/2-basic-query.test.ts +++ b/test-suite/tests/2-basic-query.test.ts @@ -1,4 +1,4 @@ -import { orderBy, reverse, without } from 'lodash'; +import { orderBy, reverse, uniq, without } from 'lodash'; import { fs } from './utils'; interface Data { @@ -328,6 +328,17 @@ describe('paginating results', () => { }); }); +describe('projection', () => { + test.each([ + { requested: [], got: [] }, + { requested: ['type'], got: ['type'] }, + { requested: ['type', 'ordered', 'non_existing'], got: ['type', 'ordered'] }, + ])('select subset of fields', async ({ requested, got }) => { + const result = await fs.collection.select(...requested).get(); + expect(uniq(result.docs.flatMap(d => Object.keys(d.data())))).toIncludeSameMembers(got); + }); +}); + describe('aggregation queries', () => { const cities = [ { city: 'Buren', population: 27_718, area: 142.92 }, diff --git a/test-suite/tests/3-basic-transaction.test.ts b/test-suite/tests/3-basic-transaction.test.ts index ab2a07a..743f944 100644 --- a/test-suite/tests/3-basic-transaction.test.ts +++ b/test-suite/tests/3-basic-transaction.test.ts @@ -26,6 +26,18 @@ describe('concurrent tests', () => { expect(await getData(docRef1.get())).toEqual({ foo: 'bar' }); }); + test.concurrent('updating same doc multiple times', async () => { + const [docRef1] = refs(); + + await fs.firestore.runTransaction(async txn => { + expect(await txn.get(docRef1)).toHaveProperty('exists', false); + + txn.set(docRef1, writeData({ foo: fs.exported.FieldValue.increment(1) })); + txn.update(docRef1, { foo: fs.exported.FieldValue.increment(1) }); + }); + expect(await getData(docRef1.get())).toEqual({ foo: 2 }); + }); + test.concurrent('using txn.getAll', async () => { const [docRef1, docRef2] = refs(); diff --git a/test-suite/tests/4-live-docs.test.ts b/test-suite/tests/4-live-docs.test.ts index 47482d9..d69bc81 100644 --- a/test-suite/tests/4-live-docs.test.ts +++ b/test-suite/tests/4-live-docs.test.ts @@ -70,6 +70,38 @@ describe('listen to document updates', () => { for (const { stop } of listeners) stop(); }); + + test.concurrent('receiving a single update on multiple updates in single txn', async () => { + const doc = fs.collection.doc(); + const { stop, getCurrent, getNext } = listen(doc); + + const firstSnap = await getCurrent(); + expect(firstSnap.exists).toBeFalse(); + + const nextSnap = getNext(); + + await fs.firestore.runTransaction(async txn => { + expect(await txn.get(doc)).toHaveProperty('exists', false); + txn.create(doc, { created: fs.exported.FieldValue.serverTimestamp() }); + txn.update(doc, { updated: fs.exported.FieldValue.serverTimestamp() }); + txn.update(doc, { counter: fs.exported.FieldValue.increment(1) }); + txn.update(doc, { counter: fs.exported.FieldValue.increment(1) }); + txn.update(doc, { array: fs.exported.FieldValue.arrayUnion({ id: 1 }, { id: 2 }) }); + txn.update(doc, { array: fs.exported.FieldValue.arrayUnion({ id: 2 }, { id: 3 }) }); + txn.update(doc, { array: fs.exported.FieldValue.arrayRemove({ id: 1 }) }); + }); + + const snap = (await nextSnap).data(); + expect(snap).toEqual({ + created: expect.any(fs.exported.Timestamp), + updated: expect.any(fs.exported.Timestamp), + counter: 2, + array: [{ id: 2 }, { id: 3 }], + }); + expect(snap?.created).toEqual(snap?.updated); + + stop(); + }); }); function listen(doc: FirebaseFirestore.DocumentReference) {