diff --git a/Cargo.lock b/Cargo.lock index 80abaac18974..e3a56425f0ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2490,11 +2490,13 @@ dependencies = [ "bson", "chrono", "cuid", + "derive_more", "futures", "indexmap 1.9.3", "itertools", "mongodb", "mongodb-client", + "pretty_assertions", "prisma-value", "psl", "query-connector", diff --git a/query-engine/connectors/mongodb-query-connector/Cargo.toml b/query-engine/connectors/mongodb-query-connector/Cargo.toml index 385487917d21..089ec0974497 100644 --- a/query-engine/connectors/mongodb-query-connector/Cargo.toml +++ b/query-engine/connectors/mongodb-query-connector/Cargo.toml @@ -21,8 +21,9 @@ tracing = "0.1" tracing-futures = "0.2" uuid.workspace = true indexmap = "1.7" -query-engine-metrics = {path = "../../metrics"} +query-engine-metrics = { path = "../../metrics" } cuid = { git = "https://github.com/prisma/cuid-rust", branch = "wasm32-support" } +derive_more = "0.99.17" [dependencies.query-structure] path = "../../query-structure" @@ -50,3 +51,6 @@ workspace = true [dependencies.user-facing-errors] features = ["sql"] workspace = true + +[dev-dependencies] +pretty_assertions = "1.4.0" diff --git a/query-engine/connectors/mongodb-query-connector/src/query_builder/read_query_builder.rs b/query-engine/connectors/mongodb-query-connector/src/query_builder/read_query_builder.rs index fcf749fc2d35..e8cdf26caba0 100644 --- a/query-engine/connectors/mongodb-query-connector/src/query_builder/read_query_builder.rs +++ b/query-engine/connectors/mongodb-query-connector/src/query_builder/read_query_builder.rs @@ -36,7 +36,7 @@ impl ReadQuery { ) -> crate::Result> { let opts = AggregateOptions::builder().allow_disk_use(true).build(); let query_string_builder = Aggregate::new(&self.stages, on_collection.name()); - let cursor = observing(Some(&query_string_builder), || { + let cursor = observing(&query_string_builder, || { on_collection.aggregate_with_session(self.stages.clone(), opts, with_session) }) .await?; diff --git a/query-engine/connectors/mongodb-query-connector/src/query_strings.rs b/query-engine/connectors/mongodb-query-connector/src/query_strings.rs index b51adc663f1a..fa010a1a5c2f 100644 --- a/query-engine/connectors/mongodb-query-connector/src/query_strings.rs +++ b/query-engine/connectors/mongodb-query-connector/src/query_strings.rs @@ -5,6 +5,7 @@ //! There is a struct for each different type of query to generate. Each of them implement the //! QueryStringBuilder trait, which is dynamically dispatched to a specific query string builder by //! `root_queries::observing` +use derive_more::Constructor; use mongodb::bson::{Bson, Document}; use std::fmt::Write; @@ -12,32 +13,73 @@ pub(crate) trait QueryString: Sync + Send { fn build(&self) -> String { let mut buffer = String::new(); - write!(&mut buffer, "db.{}.{}(", self.collection(), self.query_type()).unwrap(); + if let Some(coll_name) = self.collection() { + write!(&mut buffer, "db.{}.{}(", coll_name, self.query_type()).unwrap(); + } else { + write!(&mut buffer, "db.{}(", self.query_type()).unwrap(); + } self.write_query(&mut buffer); write!(&mut buffer, ")").unwrap(); buffer } - fn collection(&self) -> &str; + fn collection(&self) -> Option<&str>; fn query_type(&self) -> &str; fn write_query(&self, buffer: &mut String); } -pub(crate) struct Aggregate<'a> { - stages: &'a [Document], +#[derive(Constructor)] +pub(crate) struct RunCommand<'a> { + cmd: &'a Document, +} + +impl QueryString for RunCommand<'_> { + fn collection(&self) -> Option<&str> { + None + } + + fn query_type(&self) -> &str { + "runCommand" + } + + fn write_query(&self, buffer: &mut String) { + fmt_doc(buffer, self.cmd, 1).unwrap(); + } +} + +#[derive(Constructor)] +pub(crate) struct Find<'a> { + filter: &'a Document, + projection: &'a Document, coll_name: &'a str, } -impl Aggregate<'_> { - pub(crate) fn new<'a>(stages: &'a [Document], coll_name: &'a str) -> Aggregate<'a> { - Aggregate { stages, coll_name } +impl QueryString for Find<'_> { + fn collection(&self) -> Option<&str> { + Some(self.coll_name) + } + + fn query_type(&self) -> &str { + "find" + } + + fn write_query(&self, buffer: &mut String) { + fmt_doc(buffer, self.filter, 1).unwrap(); + write!(buffer, ", ").unwrap(); + fmt_doc(buffer, self.projection, 1).unwrap(); } } +#[derive(Constructor)] +pub(crate) struct Aggregate<'a> { + stages: &'a [Document], + coll_name: &'a str, +} + impl QueryString for Aggregate<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -51,20 +93,15 @@ impl QueryString for Aggregate<'_> { } } +#[derive(Constructor)] pub(crate) struct InsertOne<'a> { doc: &'a Document, coll_name: &'a str, } -impl InsertOne<'_> { - pub(crate) fn new<'a>(doc: &'a Document, coll_name: &'a str) -> InsertOne<'a> { - InsertOne { doc, coll_name } - } -} - impl QueryString for InsertOne<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -76,25 +113,16 @@ impl QueryString for InsertOne<'_> { } } +#[derive(Constructor)] pub(crate) struct UpdateMany<'a> { filter: &'a Document, update_docs: &'a [Document], coll_name: &'a str, } -impl UpdateMany<'_> { - pub(crate) fn new<'a>(filter: &'a Document, update_docs: &'a [Document], coll_name: &'a str) -> UpdateMany<'a> { - UpdateMany { - filter, - update_docs, - coll_name, - } - } -} - impl QueryString for UpdateMany<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -127,25 +155,16 @@ impl QueryString for UpdateMany<'_> { } } +#[derive(Constructor)] pub(crate) struct UpdateOne<'a> { filter: &'a Document, update_doc: &'a Document, coll_name: &'a str, } -impl UpdateOne<'_> { - pub(crate) fn new<'a>(filter: &'a Document, update_doc: &'a Document, coll_name: &'a str) -> UpdateOne<'a> { - UpdateOne { - filter, - update_doc, - coll_name, - } - } -} - impl QueryString for UpdateOne<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -156,7 +175,7 @@ impl QueryString for UpdateOne<'_> { fmt_doc(buffer, self.filter, 1).unwrap(); if cfg!(debug_assertions) { - writeln!(buffer, ", ").unwrap(); + writeln!(buffer, ",").unwrap(); } else { write!(buffer, ", ").unwrap(); } @@ -165,20 +184,15 @@ impl QueryString for UpdateOne<'_> { } } +#[derive(Constructor)] pub(crate) struct DeleteMany<'a> { filter: &'a Document, coll_name: &'a str, } -impl DeleteMany<'_> { - pub(crate) fn new<'a>(filter: &'a Document, coll_name: &'a str) -> DeleteMany<'a> { - DeleteMany { filter, coll_name } - } -} - impl QueryString for DeleteMany<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -190,25 +204,16 @@ impl QueryString for DeleteMany<'_> { } } +#[derive(Constructor)] pub(crate) struct InsertMany<'a> { insert_docs: &'a [Document], coll_name: &'a str, ordered: bool, } -impl InsertMany<'_> { - pub(crate) fn new<'a>(insert_docs: &'a [Document], ordered: bool, coll_name: &'a str) -> InsertMany<'a> { - InsertMany { - insert_docs, - coll_name, - ordered, - } - } -} - impl QueryString for InsertMany<'_> { - fn collection(&self) -> &str { - self.coll_name + fn collection(&self) -> Option<&str> { + Some(self.coll_name) } fn query_type(&self) -> &str { @@ -300,6 +305,7 @@ fn fmt_val(buffer: &mut String, val: &Bson, depth: usize) -> std::fmt::Result { mod tests { use super::*; use bson::doc; + use pretty_assertions::assert_eq; #[test] fn test_aggregate() { @@ -414,7 +420,7 @@ db.collection.updateMany({ query.trim(), r#"db.collection.updateOne({ name: "Jane", -}, +}, { $set: { position: { @@ -448,7 +454,7 @@ db.collection.deleteMany({ doc! { "name": "Jane", "position": {"department": "engineering", "title": "principal"} }, doc! { "name": "John", "position": {"department": "product", "title": "senior manager"} }, ]; - let insert = InsertMany::new(&docs, true, "collection"); + let insert = InsertMany::new(&docs, "collection", true); let query = insert.build(); assert_eq!( query.trim(), @@ -470,4 +476,41 @@ db.collection.insertMany([{ .trim() ); } + + #[test] + fn test_find() { + let filter = doc! { + "department": "product", + "title": "senior manager", + }; + let project = doc! { + "department": 1, + }; + let find = Find::new(&filter, &project, "collection"); + let query = find.build(); + assert_eq!( + query.trim(), + r#"db.collection.find({ + department: "product", + title: "senior manager", +}, { + department: 1, +})"# + ); + } + + #[test] + fn test_run_command() { + let cmd = doc! { + "hello": 1, + }; + let run_command = RunCommand::new(&cmd); + let query = run_command.build(); + assert_eq!( + query.trim(), + r#"db.runCommand({ + hello: 1, +})"# + ); + } } diff --git a/query-engine/connectors/mongodb-query-connector/src/root_queries/mod.rs b/query-engine/connectors/mongodb-query-connector/src/root_queries/mod.rs index f66adbac3e3b..5bc5f68db883 100644 --- a/query-engine/connectors/mongodb-query-connector/src/root_queries/mod.rs +++ b/query-engine/connectors/mongodb-query-connector/src/root_queries/mod.rs @@ -47,20 +47,14 @@ fn pick_singular_id(model: &Model) -> ScalarFieldRef { .unwrap() } -// Performs both metrics pushing and query logging. Query logging might be disabled and thus -// the query_string might not need to be built, that's why rather than a query_string -// we receive a Builder, as it's not trivial to buid a query and we want to skip that when possible. -// -// As a reminder, the query string is not fed into mongo db directly, we built it for debugging -// purposes and it's only used when the query log is enabled. For querying mongo, we use the driver -// wire protocol to build queries from a graphql query rather than executing raw mongodb statements. -// -// As we don't have a mongodb query string, we need to create it from the driver object model, which -// we better skip it if we don't need it (i.e. when the query log is disabled.) -pub(crate) async fn observing<'a, 'b, F, T, U>( - query_string_builder: Option<&'b dyn QueryString>, - f: F, -) -> mongodb::error::Result +/// Logs the query and updates metrics for an operation performed by a passed function. +/// +/// NOTE: +/// 1. `dyn QueryString` is used instead of a `String` to skip expensive query serialization when +/// query logs are disabled. This, however, is not currently implemented. +/// 2. Query strings logged are for debugging purposes only. The actual queries constructed by +/// MongoDB driver might look slightly different. +pub(crate) async fn observing<'a, 'b, F, T, U>(builder: &'b dyn QueryString, f: F) -> mongodb::error::Result where F: FnOnce() -> U + 'a, U: Future>, @@ -72,13 +66,10 @@ where histogram!(PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS, elapsed); increment_counter!(PRISMA_DATASOURCE_QUERIES_TOTAL); - let params: Vec = Vec::new(); - - // todo: emit tracing event with the appropriate log level only query_log is enabled. And fix log suscription - if let Some(qs) = query_string_builder { - let qs = qs.build(); - debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %qs, params = ?params, duration_ms=elapsed); - } + // TODO: emit tracing event only when "debug" level query logs are enabled. + // TODO prisma/team-orm#136: fix log subscription. + let query_string = builder.build(); + debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %query_string, duration_ms = elapsed); res } diff --git a/query-engine/connectors/mongodb-query-connector/src/root_queries/read.rs b/query-engine/connectors/mongodb-query-connector/src/root_queries/read.rs index 0d9ac09ae26a..c4bceeaab7f9 100644 --- a/query-engine/connectors/mongodb-query-connector/src/root_queries/read.rs +++ b/query-engine/connectors/mongodb-query-connector/src/root_queries/read.rs @@ -1,7 +1,7 @@ use super::*; use crate::{ error::DecorateErrorWithFieldInformationExtension, output_meta, query_builder::MongoReadQueryBuilder, - vacuum_cursor, IntoBson, + query_strings::Find, vacuum_cursor, IntoBson, }; use connector_interface::RelAggregationSelection; use mongodb::{bson::doc, options::FindOptions, ClientSession, Database}; @@ -124,16 +124,20 @@ pub async fn get_related_m2m_record_ids<'conn>( }) .collect::>>()?; - let filter = doc! { id_field.db_name(): { "$in": ids } }; - // Scalar field name where the relation ids list is on `model`. let id_holder_field = from_field.scalar_fields().into_iter().next().unwrap(); let relation_ids_field_name = id_holder_field.name().to_owned(); - let find_options = FindOptions::builder() - .projection(doc! { id_field.db_name(): 1, relation_ids_field_name: 1 }) - .build(); - let cursor = observing(None, || coll.find_with_session(filter, Some(find_options), session)).await?; + let filter = doc! { id_field.db_name(): { "$in": ids } }; + let projection = doc! { id_field.db_name(): 1, relation_ids_field_name: 1 }; + + let query_string_builder = Find::new(&filter, &projection, coll.name()); + let find_options = FindOptions::builder().projection(projection.clone()).build(); + + let cursor = observing(&query_string_builder, || { + coll.find_with_session(filter.clone(), Some(find_options), session) + }) + .await?; let docs = vacuum_cursor(cursor, session).await?; let parent_id_meta = output_meta::from_scalar_field(&id_field); let related_ids_holder_meta = output_meta::from_scalar_field(&id_holder_field); diff --git a/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs b/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs index c0c43f108aec..a821eddc3c69 100644 --- a/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs +++ b/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs @@ -4,7 +4,7 @@ use crate::{ filter::{FilterPrefix, MongoFilter, MongoFilterVisitor}, output_meta, query_builder::MongoReadQueryBuilder, - query_strings::{DeleteMany, InsertMany, InsertOne, UpdateMany, UpdateOne}, + query_strings::{Aggregate, DeleteMany, Find, InsertMany, InsertOne, RunCommand, UpdateMany, UpdateOne}, root_queries::raw::{MongoCommand, MongoOperation}, IntoBson, }; @@ -65,11 +65,9 @@ pub async fn create_record<'conn>( } let query_builder = InsertOne::new(&doc, coll.name()); - let insert_result = observing(Some(&query_builder), || { - coll.insert_one_with_session(&doc, None, session) - }) - .instrument(span) - .await?; + let insert_result = observing(&query_builder, || coll.insert_one_with_session(&doc, None, session)) + .instrument(span) + .await?; let id_value = value_from_bson(insert_result.inserted_id, &id_meta)?; Ok(SingleRecord { @@ -122,9 +120,9 @@ pub async fn create_records<'conn>( let ordered = !skip_duplicates; let options = Some(InsertManyOptions::builder().ordered(ordered).build()); - let query_string_builder = InsertMany::new(&docs, ordered, coll.name()); + let query_string_builder = InsertMany::new(&docs, coll.name(), ordered); let docs_iter = docs.iter(); - let insert = observing(Some(&query_string_builder), || { + let insert = observing(&query_string_builder, || { coll.insert_many_with_session(docs_iter, options, session) }) .instrument(span); @@ -206,7 +204,7 @@ pub async fn update_records<'conn>( if !update_docs.is_empty() { let query_string_builder = UpdateMany::new(&filter, &update_docs, coll.name()); - let res = observing(Some(&query_string_builder), || { + let res = observing(&query_string_builder, || { coll.update_many_with_session(filter.clone(), update_docs.clone(), None, session) }) .instrument(span) @@ -268,7 +266,7 @@ pub async fn delete_records<'conn>( let filter = doc! { id_field.db_name(): { "$in": ids } }; let query_string_builder = DeleteMany::new(&filter, coll.name()); - let delete_result = observing(Some(&query_string_builder), || { + let delete_result = observing(&query_string_builder, || { coll.delete_many_with_session(filter.clone(), None, session) }) .instrument(span) @@ -352,7 +350,7 @@ pub async fn m2m_connect<'conn>( let query_string_builder = UpdateOne::new(&parent_filter, &parent_update, parent_coll.name()); - observing(Some(&query_string_builder), || { + observing(&query_string_builder, || { parent_coll.update_one_with_session(parent_filter.clone(), parent_update.clone(), None, session) }) .await?; @@ -373,7 +371,7 @@ pub async fn m2m_connect<'conn>( let child_updates = vec![child_update.clone()]; let query_string_builder = UpdateMany::new(&child_filter, &child_updates, child_coll.name()); - observing(Some(&query_string_builder), || { + observing(&query_string_builder, || { child_coll.update_many_with_session(child_filter.clone(), child_update.clone(), None, session) }) .await?; @@ -418,7 +416,7 @@ pub async fn m2m_disconnect<'conn>( // First update the parent and remove all child IDs to the m:n scalar field. let query_string_builder = UpdateOne::new(&parent_filter, &parent_update, parent_coll.name()); - observing(Some(&query_string_builder), || { + observing(&query_string_builder, || { parent_coll.update_one_with_session(parent_filter.clone(), parent_update.clone(), None, session) }) .await?; @@ -440,7 +438,7 @@ pub async fn m2m_disconnect<'conn>( let child_updates = vec![child_update.clone()]; let query_string_builder = UpdateMany::new(&child_filter, &child_updates, child_coll.name()); - observing(Some(&query_string_builder), || { + observing(&query_string_builder, || { child_coll.update_many_with_session(child_filter.clone(), child_update, None, session) }) .await?; @@ -477,7 +475,11 @@ pub async fn query_raw<'conn>( async { let json_result = match mongo_command { MongoCommand::Raw { cmd } => { - let mut result = observing(None, || database.run_command_with_session(cmd, None, session)).await?; + let query_string_builder = RunCommand::new(&cmd); + let mut result = observing(&query_string_builder, || { + database.run_command_with_session(cmd.clone(), None, session) + }) + .await?; // Removes unnecessary properties from raw response // See https://docs.mongodb.com/v5.0/reference/method/db.runCommand @@ -495,12 +497,25 @@ pub async fn query_raw<'conn>( match operation { MongoOperation::Find(filter, options) => { - let cursor = coll.find_with_session(filter, options, session).await?; + let unwrapped_filter = filter.clone().unwrap_or_default(); + let projection = options + .as_ref() + .and_then(|options| options.projection.clone()) + .unwrap_or_default(); + let query_string_builder = Find::new(&unwrapped_filter, &projection, coll.name()); + let cursor = observing(&query_string_builder, || { + coll.find_with_session(filter, options, session) + }) + .await?; raw::cursor_to_json(cursor, session).await? } MongoOperation::Aggregate(pipeline, options) => { - let cursor = coll.aggregate_with_session(pipeline, options, session).await?; + let query_string_builder = Aggregate::new(&pipeline, coll.name()); + let cursor = observing(&query_string_builder, || { + coll.aggregate_with_session(pipeline.clone(), options, session) + }) + .await?; raw::cursor_to_json(cursor, session).await? }