Skip to content

Commit

Permalink
feat(mongodb-query-connector): add full queries to the spans (#5014)
Browse files Browse the repository at this point in the history
Add full queries to the `prisma:engine:db_queries` spans in MongoDB.

Closes: prisma/prisma#24463
  • Loading branch information
aqrln authored Oct 9, 2024
1 parent 00f0d73 commit 6a192e2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{constants::*, output_meta, query_builder::MongoReadQueryBuilder, val
use connector_interface::*;
use mongodb::{bson::Document, ClientSession, Database};
use query_structure::{prelude::*, Filter, QueryArguments};
use tracing::{info_span, Instrument};

pub async fn aggregate<'conn>(
database: &Database,
Expand All @@ -17,17 +16,11 @@ pub async fn aggregate<'conn>(
let is_group_by = !group_by.is_empty();
let coll = database.collection(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.aggregate(*)", coll.name())
);

let query = MongoReadQueryBuilder::from_args(query_arguments)?
.with_groupings(group_by, &selections, having)?
.build()?;

let docs = query.execute(coll, session).instrument(span).await?;
let docs = query.execute(coll, session).await?;

if is_group_by && docs.is_empty() {
Ok(vec![])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use query_engine_metrics::{
PRISMA_DATASOURCE_QUERIES_TOTAL,
};
use query_structure::*;
use std::sync::Arc;
use std::time::Instant;
use tracing::debug;
use tracing::{debug, info_span};
use tracing_futures::Instrument;

/// Transforms a document to a `Record`, fields ordered as defined in `fields`.
fn document_to_record(mut doc: Document, fields: &[String], meta_mapping: &OutputMetaMapping) -> crate::Result<Record> {
Expand Down Expand Up @@ -59,19 +61,27 @@ where
F: FnOnce() -> U + 'a,
U: Future<Output = mongodb::error::Result<T>>,
{
// TODO: build the string lazily in the Display impl so it doesn't have to be built if neither
// logs nor traces are enabled. This is tricky because whatever we store in the span has to be
// 'static, and all `QueryString` implementations aren't, so this requires some refactoring.
let query_string: Arc<str> = builder.build().into();

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = %Arc::clone(&query_string)
);

let start = Instant::now();
let res = f().await;
let res = f().instrument(span).await;
let elapsed = start.elapsed().as_millis() as f64;

histogram!(PRISMA_DATASOURCE_QUERIES_DURATION_HISTOGRAM_MS, elapsed);
increment_counter!(PRISMA_DATASOURCE_QUERIES_TOTAL);

// TODO: emit tracing event only when "debug" level query logs are enabled.
// TODO prisma/team-orm#136: fix log subscription.
let query_string = builder.build();
// NOTE: `params` is a part of the interface for query logs.
let params: Vec<i32> = vec![];
debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %query_string, params = ?params, duration_ms = elapsed);
debug!(target: "mongodb_query_connector::query", item_type = "query", is_query = true, query = %query_string, params = %"[]", duration_ms = elapsed);

res
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::{
use mongodb::{bson::doc, options::FindOptions, ClientSession, Database};
use query_structure::*;
use std::future::IntoFuture;
use tracing::{info_span, Instrument};

/// Finds a single record. Joins are not required at the moment because the selector is always a unique one.
pub async fn get_single_record<'conn>(
Expand All @@ -18,20 +17,14 @@ pub async fn get_single_record<'conn>(
) -> crate::Result<Option<SingleRecord>> {
let coll = database.collection(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.findOne(*)", coll.name())
);

let meta_mapping = output_meta::from_selected_fields(selected_fields);
let query_arguments: QueryArguments = (model.clone(), filter.clone()).into();
let query = MongoReadQueryBuilder::from_args(query_arguments)?
.with_model_projection(selected_fields.clone())?
.with_virtual_fields(selected_fields.virtuals())?
.build()?;

let docs = query.execute(coll, session).instrument(span).await?;
let docs = query.execute(coll, session).await?;

if docs.is_empty() {
Ok(None)
Expand Down Expand Up @@ -60,12 +53,6 @@ pub async fn get_many_records<'conn>(
) -> crate::Result<ManyRecords> {
let coll = database.collection(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.findMany(*)", coll.name())
);

let reverse_order = query_arguments.take.map(|t| t < 0).unwrap_or(false);
let field_names: Vec<_> = selected_fields.db_names().collect();

Expand All @@ -81,7 +68,7 @@ pub async fn get_many_records<'conn>(
.with_virtual_fields(selected_fields.virtuals())?
.build()?;

let docs = query.execute(coll, session).instrument(span).await?;
let docs = query.execute(coll, session).await?;
for doc in docs {
let record = document_to_record(doc, &field_names, &meta_mapping)?;
records.push(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use mongodb::{
use query_structure::{Model, PrismaValue, SelectionResult};
use std::future::IntoFuture;
use std::{collections::HashMap, convert::TryInto};
use tracing::{info_span, Instrument};
use update::IntoUpdateDocumentExtension;

/// Create a single record to the database resulting in a
Expand All @@ -31,12 +30,6 @@ pub async fn create_record<'conn>(
) -> crate::Result<SingleRecord> {
let coll = database.collection::<Document>(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.insertOne(*)", coll.name())
);

let id_field = pick_singular_id(model);

// Fields to write to the document.
Expand Down Expand Up @@ -66,9 +59,7 @@ pub async fn create_record<'conn>(
}

let query_builder = InsertOne::new(&doc, coll.name());
let insert_result = observing(&query_builder, || coll.insert_one(&doc).session(session).into_future())
.instrument(span)
.await?;
let insert_result = observing(&query_builder, || coll.insert_one(&doc).session(session).into_future()).await?;
let id_value = value_from_bson(insert_result.inserted_id, &id_meta)?;

Ok(SingleRecord {
Expand All @@ -86,12 +77,6 @@ pub async fn create_records<'conn>(
) -> crate::Result<usize> {
let coll = database.collection::<Document>(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.insertMany(*)", coll.name())
);

let num_records = args.len();
let fields: Vec<_> = model.fields().non_relational();

Expand Down Expand Up @@ -128,8 +113,7 @@ pub async fn create_records<'conn>(
.with_options(options)
.session(session)
.into_future()
})
.instrument(span);
});

match insert.await {
Ok(insert_result) => Ok(insert_result.inserted_ids.len()),
Expand Down Expand Up @@ -184,19 +168,13 @@ pub async fn update_records<'conn>(
.collect::<crate::Result<Vec<_>>>()?
} else {
let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?;
find_ids(database, coll.clone(), session, model, filter).await?
find_ids(coll.clone(), session, model, filter).await?
};

if ids.is_empty() {
return Ok(vec![]);
}

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.updateMany(*)", coll.name())
);

let filter = doc! { id_field.db_name(): { "$in": ids.clone() } };
let fields: Vec<_> = model
.fields()
Expand All @@ -222,7 +200,6 @@ pub async fn update_records<'conn>(
.session(session)
.into_future()
})
.instrument(span)
.await?;

// It's important we check the `matched_count` and not the `modified_count` here.
Expand Down Expand Up @@ -266,25 +243,18 @@ pub async fn delete_records<'conn>(
.collect::<crate::Result<Vec<_>>>()?
} else {
let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?;
find_ids(database, coll.clone(), session, model, filter).await?
find_ids(coll.clone(), session, model, filter).await?
};

if ids.is_empty() {
return Ok(0);
}

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.deleteMany(*)", coll.name())
);

let filter = doc! { id_field.db_name(): { "$in": ids } };
let query_string_builder = DeleteMany::new(&filter, coll.name());
let delete_result = observing(&query_string_builder, || {
coll.delete_many(filter.clone()).session(session).into_future()
})
.instrument(span)
.await?;

Ok(delete_result.deleted_count as usize)
Expand Down Expand Up @@ -312,16 +282,10 @@ pub async fn delete_record<'conn>(
"$expr": filter,
};

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.findAndModify(*)", coll.name())
);
let query_string_builder = DeleteOne::new(&filter, coll.name());
let document = observing(&query_string_builder, || {
coll.find_one_and_delete(filter.clone()).session(session).into_future()
})
.instrument(span)
.await?
.ok_or(MongoError::RecordDoesNotExist {
cause: "Record to delete does not exist.".to_owned(),
Expand All @@ -335,20 +299,11 @@ pub async fn delete_record<'conn>(

/// Retrives document ids based on the given filter.
async fn find_ids(
database: &Database,
collection: Collection<Document>,
session: &mut ClientSession,
model: &Model,
filter: MongoFilter,
) -> crate::Result<Vec<Bson>> {
let coll = database.collection::<Document>(model.db_name());

let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &format_args!("db.{}.findMany(*)", coll.name())
);

let id_field = model.primary_identifier();
let mut builder = MongoReadQueryBuilder::new(model.clone());

Expand All @@ -363,7 +318,7 @@ async fn find_ids(

let builder = builder.with_model_projection(id_field)?;
let query = builder.build()?;
let docs = query.execute(collection, session).instrument(span).await?;
let docs = query.execute(collection, session).await?;
let ids = docs.into_iter().map(|mut doc| doc.remove("_id").unwrap()).collect();

Ok(ids)
Expand Down Expand Up @@ -533,13 +488,6 @@ pub async fn query_raw<'conn>(
inputs: HashMap<String, PrismaValue>,
query_type: Option<String>,
) -> crate::Result<RawJson> {
let db_statement = get_raw_db_statement(&query_type, &model, database);
let span = info_span!(
"prisma:engine:db_query",
user_facing = true,
"db.statement" = &&db_statement.as_str()
);

let mongo_command = MongoCommand::from_raw_query(model, inputs, query_type)?;

async {
Expand Down Expand Up @@ -601,17 +549,5 @@ pub async fn query_raw<'conn>(

Ok(RawJson::try_new(json_result)?)
}
.instrument(span)
.await
}

fn get_raw_db_statement(query_type: &Option<String>, model: &Option<&Model>, database: &Database) -> String {
match (query_type.as_deref(), model) {
(Some("findRaw"), Some(m)) => format!("db.{}.findRaw(*)", database.collection::<Document>(m.db_name()).name()),
(Some("aggregateRaw"), Some(m)) => format!(
"db.{}.aggregateRaw(*)",
database.collection::<Document>(m.db_name()).name()
),
_ => "db.runCommandRaw(*)".to_string(),
}
}

0 comments on commit 6a192e2

Please sign in to comment.