Skip to content

Commit

Permalink
fix(mongodb): add missing query logs (#4559)
Browse files Browse the repository at this point in the history
  • Loading branch information
laplab authored Dec 14, 2023
1 parent ade9cdb commit 974dce5
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 111 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion query-engine/connectors/mongodb-query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ tracing = "0.1"
tracing-futures = "0.2"
uuid.workspace = true
indexmap = "1.7"
query-engine-metrics = {path = "../../metrics"}
query-engine-metrics = { path = "../../metrics" }
cuid = { git = "https://github.com/prisma/cuid-rust", branch = "wasm32-support" }
derive_more = "0.99.17"

[dependencies.query-structure]
path = "../../query-structure"
Expand Down Expand Up @@ -50,3 +51,6 @@ workspace = true
[dependencies.user-facing-errors]
features = ["sql"]
workspace = true

[dev-dependencies]
pretty_assertions = "1.4.0"
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl ReadQuery {
) -> crate::Result<Vec<Document>> {
let opts = AggregateOptions::builder().allow_disk_use(true).build();
let query_string_builder = Aggregate::new(&self.stages, on_collection.name());
let cursor = observing(Some(&query_string_builder), || {
let cursor = observing(&query_string_builder, || {
on_collection.aggregate_with_session(self.stages.clone(), opts, with_session)
})
.await?;
Expand Down
171 changes: 107 additions & 64 deletions query-engine/connectors/mongodb-query-connector/src/query_strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,81 @@
//! There is a struct for each different type of query to generate. Each of them implement the
//! QueryStringBuilder trait, which is dynamically dispatched to a specific query string builder by
//! `root_queries::observing`
use derive_more::Constructor;
use mongodb::bson::{Bson, Document};
use std::fmt::Write;

pub(crate) trait QueryString: Sync + Send {
fn build(&self) -> String {
let mut buffer = String::new();

write!(&mut buffer, "db.{}.{}(", self.collection(), self.query_type()).unwrap();
if let Some(coll_name) = self.collection() {
write!(&mut buffer, "db.{}.{}(", coll_name, self.query_type()).unwrap();
} else {
write!(&mut buffer, "db.{}(", self.query_type()).unwrap();
}
self.write_query(&mut buffer);
write!(&mut buffer, ")").unwrap();

buffer
}

fn collection(&self) -> &str;
fn collection(&self) -> Option<&str>;
fn query_type(&self) -> &str;
fn write_query(&self, buffer: &mut String);
}

pub(crate) struct Aggregate<'a> {
stages: &'a [Document],
#[derive(Constructor)]
pub(crate) struct RunCommand<'a> {
cmd: &'a Document,
}

impl QueryString for RunCommand<'_> {
fn collection(&self) -> Option<&str> {
None
}

fn query_type(&self) -> &str {
"runCommand"
}

fn write_query(&self, buffer: &mut String) {
fmt_doc(buffer, self.cmd, 1).unwrap();
}
}

#[derive(Constructor)]
pub(crate) struct Find<'a> {
filter: &'a Document,
projection: &'a Document,
coll_name: &'a str,
}

impl Aggregate<'_> {
pub(crate) fn new<'a>(stages: &'a [Document], coll_name: &'a str) -> Aggregate<'a> {
Aggregate { stages, coll_name }
impl QueryString for Find<'_> {
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
"find"
}

fn write_query(&self, buffer: &mut String) {
fmt_doc(buffer, self.filter, 1).unwrap();
write!(buffer, ", ").unwrap();
fmt_doc(buffer, self.projection, 1).unwrap();
}
}

#[derive(Constructor)]
pub(crate) struct Aggregate<'a> {
stages: &'a [Document],
coll_name: &'a str,
}

impl QueryString for Aggregate<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand All @@ -51,20 +93,15 @@ impl QueryString for Aggregate<'_> {
}
}

#[derive(Constructor)]
pub(crate) struct InsertOne<'a> {
doc: &'a Document,
coll_name: &'a str,
}

impl InsertOne<'_> {
pub(crate) fn new<'a>(doc: &'a Document, coll_name: &'a str) -> InsertOne<'a> {
InsertOne { doc, coll_name }
}
}

impl QueryString for InsertOne<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand All @@ -76,25 +113,16 @@ impl QueryString for InsertOne<'_> {
}
}

#[derive(Constructor)]
pub(crate) struct UpdateMany<'a> {
filter: &'a Document,
update_docs: &'a [Document],
coll_name: &'a str,
}

impl UpdateMany<'_> {
pub(crate) fn new<'a>(filter: &'a Document, update_docs: &'a [Document], coll_name: &'a str) -> UpdateMany<'a> {
UpdateMany {
filter,
update_docs,
coll_name,
}
}
}

impl QueryString for UpdateMany<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand Down Expand Up @@ -127,25 +155,16 @@ impl QueryString for UpdateMany<'_> {
}
}

#[derive(Constructor)]
pub(crate) struct UpdateOne<'a> {
filter: &'a Document,
update_doc: &'a Document,
coll_name: &'a str,
}

impl UpdateOne<'_> {
pub(crate) fn new<'a>(filter: &'a Document, update_doc: &'a Document, coll_name: &'a str) -> UpdateOne<'a> {
UpdateOne {
filter,
update_doc,
coll_name,
}
}
}

impl QueryString for UpdateOne<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand All @@ -156,7 +175,7 @@ impl QueryString for UpdateOne<'_> {
fmt_doc(buffer, self.filter, 1).unwrap();

if cfg!(debug_assertions) {
writeln!(buffer, ", ").unwrap();
writeln!(buffer, ",").unwrap();
} else {
write!(buffer, ", ").unwrap();
}
Expand All @@ -165,20 +184,15 @@ impl QueryString for UpdateOne<'_> {
}
}

#[derive(Constructor)]
pub(crate) struct DeleteMany<'a> {
filter: &'a Document,
coll_name: &'a str,
}

impl DeleteMany<'_> {
pub(crate) fn new<'a>(filter: &'a Document, coll_name: &'a str) -> DeleteMany<'a> {
DeleteMany { filter, coll_name }
}
}

impl QueryString for DeleteMany<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand All @@ -190,25 +204,16 @@ impl QueryString for DeleteMany<'_> {
}
}

#[derive(Constructor)]
pub(crate) struct InsertMany<'a> {
insert_docs: &'a [Document],
coll_name: &'a str,
ordered: bool,
}

impl InsertMany<'_> {
pub(crate) fn new<'a>(insert_docs: &'a [Document], ordered: bool, coll_name: &'a str) -> InsertMany<'a> {
InsertMany {
insert_docs,
coll_name,
ordered,
}
}
}

impl QueryString for InsertMany<'_> {
fn collection(&self) -> &str {
self.coll_name
fn collection(&self) -> Option<&str> {
Some(self.coll_name)
}

fn query_type(&self) -> &str {
Expand Down Expand Up @@ -300,6 +305,7 @@ fn fmt_val(buffer: &mut String, val: &Bson, depth: usize) -> std::fmt::Result {
mod tests {
use super::*;
use bson::doc;
use pretty_assertions::assert_eq;

#[test]
fn test_aggregate() {
Expand Down Expand Up @@ -414,7 +420,7 @@ db.collection.updateMany({
query.trim(),
r#"db.collection.updateOne({
name: "Jane",
},
},
{
$set: {
position: {
Expand Down Expand Up @@ -448,7 +454,7 @@ db.collection.deleteMany({
doc! { "name": "Jane", "position": {"department": "engineering", "title": "principal"} },
doc! { "name": "John", "position": {"department": "product", "title": "senior manager"} },
];
let insert = InsertMany::new(&docs, true, "collection");
let insert = InsertMany::new(&docs, "collection", true);
let query = insert.build();
assert_eq!(
query.trim(),
Expand All @@ -470,4 +476,41 @@ db.collection.insertMany([{
.trim()
);
}

#[test]
fn test_find() {
let filter = doc! {
"department": "product",
"title": "senior manager",
};
let project = doc! {
"department": 1,
};
let find = Find::new(&filter, &project, "collection");
let query = find.build();
assert_eq!(
query.trim(),
r#"db.collection.find({
department: "product",
title: "senior manager",
}, {
department: 1,
})"#
);
}

#[test]
fn test_run_command() {
let cmd = doc! {
"hello": 1,
};
let run_command = RunCommand::new(&cmd);
let query = run_command.build();
assert_eq!(
query.trim(),
r#"db.runCommand({
hello: 1,
})"#
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,14 @@ fn pick_singular_id(model: &Model) -> ScalarFieldRef {
.unwrap()
}

// Performs both metrics pushing and query logging. Query logging might be disabled and thus
// the query_string might not need to be built, that's why rather than a query_string
// we receive a Builder, as it's not trivial to buid a query and we want to skip that when possible.
//
// As a reminder, the query string is not fed into mongo db directly, we built it for debugging
// purposes and it's only used when the query log is enabled. For querying mongo, we use the driver
// wire protocol to build queries from a graphql query rather than executing raw mongodb statements.
//
// As we don't have a mongodb query string, we need to create it from the driver object model, which
// we better skip it if we don't need it (i.e. when the query log is disabled.)
pub(crate) async fn observing<'a, 'b, F, T, U>(
query_string_builder: Option<&'b dyn QueryString>,
f: F,
) -> mongodb::error::Result<T>
/// Logs the query and updates metrics for an operation performed by a passed function.
///
/// NOTE:
/// 1. `dyn QueryString` is used instead of a `String` to skip expensive query serialization when
/// query logs are disabled. This, however, is not currently implemented.
/// 2. Query strings logged are for debugging purposes only. The actual queries constructed by
/// MongoDB driver might look slightly different.
pub(crate) async fn observing<'a, 'b, F, T, U>(builder: &'b dyn QueryString, f: F) -> mongodb::error::Result<T>
where
F: FnOnce() -> U + 'a,
U: Future<Output = mongodb::error::Result<T>>,
Expand All @@ -72,13 +66,10 @@ where
histogram!(PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS, elapsed);
increment_counter!(PRISMA_DATASOURCE_QUERIES_TOTAL);

let params: Vec<i32> = Vec::new();

// todo: emit tracing event with the appropriate log level only query_log is enabled. And fix log suscription
if let Some(qs) = query_string_builder {
let qs = qs.build();
debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %qs, params = ?params, duration_ms=elapsed);
}
// TODO: emit tracing event only when "debug" level query logs are enabled.
// TODO prisma/team-orm#136: fix log subscription.
let query_string = builder.build();
debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %query_string, duration_ms = elapsed);

res
}
Loading

0 comments on commit 974dce5

Please sign in to comment.