Skip to content

Commit

Permalink
various cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelatkinson committed Feb 22, 2024
1 parent 6376b32 commit 9269180
Show file tree
Hide file tree
Showing 23 changed files with 203 additions and 301 deletions.
8 changes: 1 addition & 7 deletions src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ mod session;
mod shutdown;
mod watch;

pub use bulk_write::{
error::BulkWriteError,
results::BulkWriteResult,
write_models::WriteModel,
BulkWrite,
BulkWriteOptions,
};
pub use bulk_write::BulkWrite;
pub use drop::{DropDatabase, DropDatabaseFuture};
pub use list_databases::{ListDatabases, ListSpecificationsFuture};
pub use perf::{WarmConnectionPool, WarmConnectionPoolFuture};
Expand Down
83 changes: 50 additions & 33 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,55 @@ impl<'a> BulkWrite<'a> {
}
}

action_impl! {
impl Action<'a> for BulkWrite<'a> {
type Future = BulkWriteFuture;

async fn execute(mut self) -> Result<BulkWriteResult> {
let mut total_attempted = 0;
let mut execution_status = ExecutionStatus::None;

while total_attempted < self.models.len()
&& execution_status.should_continue(self.is_ordered())
{
let mut operation = BulkWriteOperation::new(
self.client.clone(),
&self.models[total_attempted..],
total_attempted,
self.options.as_ref(),
)
.await;
let result = self
.client
.execute_operation::<BulkWriteOperation>(
&mut operation,
self.session.as_deref_mut(),
)
.await;
total_attempted += operation.n_attempted;

match result {
Ok(result) => {
execution_status = execution_status.with_success(result);
}
Err(error) => {
execution_status = execution_status.with_failure(error);
}
}
}

match execution_status {
ExecutionStatus::Success(bulk_write_result) => Ok(bulk_write_result),
ExecutionStatus::Error(error) => Err(error),
ExecutionStatus::None => Err(ErrorKind::InvalidArgument {
message: "bulk_write must be provided at least one write operation".into(),
}
.into()),
}
}
}
}

enum ExecutionStatus {
Success(BulkWriteResult),
Error(Error),
Expand Down Expand Up @@ -155,7 +204,7 @@ impl ExecutionStatus {
let bulk_write_error: Error = ErrorKind::ClientBulkWrite(BulkWriteError {
write_errors: HashMap::new(),
write_concern_errors: Vec::new(),
partial_result: Some(current_result),
partial_result: current_result,
})
.into();
Self::Error(bulk_write_error.with_source(error))
Expand Down Expand Up @@ -206,35 +255,3 @@ impl ExecutionStatus {
}
}
}

action_impl! {
impl Action<'a> for BulkWrite<'a> {
type Future = SummaryBulkWrite;

async fn execute(mut self) -> Result<BulkWriteResult> {
let mut total_attempted = 0;
let mut execution_status = ExecutionStatus::None;

while total_attempted < self.models.len() && execution_status.should_continue(self.is_ordered()) {
let mut operation = BulkWriteOperation::new(self.client.clone(), &self.models[total_attempted..], total_attempted, self.options.as_ref()).await;
let result = self.client.execute_operation::<BulkWriteOperation>(&mut operation, self.session.as_deref_mut()).await;
total_attempted += operation.n_attempted;

match result {
Ok(result) => {
execution_status = execution_status.with_success(result);
}
Err(error) => {
execution_status = execution_status.with_failure(error);
}
}
}

match execution_status {
ExecutionStatus::Success(bulk_write_result) => Ok(bulk_write_result),
ExecutionStatus::Error(error) => Err(error),
ExecutionStatus::None => Err(ErrorKind::InvalidArgument { message: "bulk_write must be provided at least one write operation".into() }.into())
}
}
}
}
22 changes: 3 additions & 19 deletions src/action/bulk_write/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,17 @@ use super::results::BulkWriteResult;
pub struct BulkWriteError {
pub write_concern_errors: Vec<WriteConcernError>,
pub write_errors: HashMap<usize, WriteError>,
pub partial_result: Option<BulkWriteResult>,
pub partial_result: BulkWriteResult,
}

impl BulkWriteError {
pub(crate) fn is_empty(&self) -> bool {
self.write_concern_errors.is_empty()
&& self.write_errors.is_empty()
&& self
.partial_result
.as_ref()
.map(|r| r.is_empty())
.unwrap_or(true)
}

pub(crate) fn merge(&mut self, other: BulkWriteError) {
self.write_concern_errors.extend(other.write_concern_errors);
self.write_errors.extend(other.write_errors);
if let Some(other_partial_result) = other.partial_result {
self.merge_results(other_partial_result);
}
self.merge_results(other.partial_result);
}

pub(crate) fn merge_results(&mut self, other_result: BulkWriteResult) {
if let Some(ref mut partial_result) = self.partial_result {
partial_result.merge(other_result);
} else {
self.partial_result = Some(other_result);
}
self.partial_result.merge(other_result);
}
}
24 changes: 0 additions & 24 deletions src/action/bulk_write/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,6 @@ pub struct BulkWriteResult {
}

impl BulkWriteResult {
pub(crate) fn new() -> Self {
Self {
inserted_count: 0,
upserted_count: 0,
matched_count: 0,
modified_count: 0,
deleted_count: 0,
insert_results: HashMap::new(),
update_results: HashMap::new(),
delete_results: HashMap::new(),
}
}

pub(crate) fn is_empty(&self) -> bool {
self.inserted_count == 0
&& self.upserted_count == 0
&& self.matched_count == 0
&& self.modified_count == 0
&& self.deleted_count == 0
&& self.insert_results.is_empty()
&& self.update_results.is_empty()
&& self.delete_results.is_empty()
}

pub(crate) fn populate_summary_info(&mut self, summary_info: &BulkWriteSummaryInfo) {
self.inserted_count += summary_info.n_inserted;
self.upserted_count += summary_info.n_upserted;
Expand Down
35 changes: 14 additions & 21 deletions src/action/bulk_write/write_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use serde::Serialize;
use serde_with::skip_serializing_none;

use crate::{
bson::{rawdoc, Array, Bson, Document, RawBson, RawDocumentBuf},
bson_util::{extend_raw_document_buf, prepend_id_field},
bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
bson_util::get_or_prepend_id_field,
error::Result,
options::UpdateModifications,
Namespace,
Expand All @@ -18,7 +18,7 @@ pub enum WriteModel {
InsertOne {
#[serde(skip)]
namespace: Namespace,
document: RawDocumentBuf,
document: Document,
},
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -125,32 +125,25 @@ impl WriteModel {
}
}

// Returns this model as a document within the bulkWrite ops list and, if this is an insert
// model, the document's _id field.
pub(crate) fn to_ops_document(
&self,
namespace_index: usize,
) -> Result<(RawDocumentBuf, Option<RawBson>)> {
let mut operation = rawdoc! { self.operation_name(): namespace_index as i32 };

let inserted_id = match self {
// Returns the operation-specific fields that should be included in this model's entry in the
// ops array. Also returns an inserted ID if this is an insert operation.
pub(crate) fn get_ops_document_contents(&self) -> Result<(RawDocumentBuf, Option<Bson>)> {
let (mut model_document, inserted_id) = match self {
Self::InsertOne { document, .. } => {
let mut document = document.clone();
let inserted_id = prepend_id_field(&mut document)?;
operation.append("document", document);
Some(inserted_id)
let mut insert_document = RawDocumentBuf::from_document(document)?;
let inserted_id = get_or_prepend_id_field(&mut insert_document)?;
(rawdoc! { "document": insert_document }, Some(inserted_id))
}
_ => {
let operation_fields = bson::to_raw_document_buf(&self)?;
extend_raw_document_buf(&mut operation, operation_fields)?;
None
let model_document = bson::to_raw_document_buf(&self)?;
(model_document, None)
}
};

if let Some(multi) = self.multi() {
operation.append("multi", multi);
model_document.append("multi", multi);
}

Ok((operation, inserted_id))
Ok((model_document, inserted_id))
}
}
4 changes: 2 additions & 2 deletions src/bson_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ pub(crate) fn extend_raw_document_buf(

/// Returns the _id field of this document, prepending the field to the document if one is not
/// already present.
pub(crate) fn prepend_id_field(doc: &mut RawDocumentBuf) -> Result<RawBson> {
pub(crate) fn get_or_prepend_id_field(doc: &mut RawDocumentBuf) -> Result<Bson> {
match doc.get("_id")? {
Some(id) => Ok(id.to_raw_bson()),
Some(id) => Ok(id.try_into()?),
None => {
let id = ObjectId::new();
let mut new_bytes = rawdoc! { "_id": id }.into_bytes();
Expand Down
31 changes: 7 additions & 24 deletions src/coll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,10 +1158,6 @@ where
}

let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true);
#[cfg(feature = "in-use-encryption-unstable")]
let encrypted = self.client().auto_encryption_opts().await.is_some();
#[cfg(not(feature = "in-use-encryption-unstable"))]
let encrypted = false;

let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut error_labels: HashSet<String> = Default::default();
Expand All @@ -1175,7 +1171,7 @@ where
self.namespace(),
docs,
options.clone(),
encrypted,
self.client().should_auto_encrypt().await,
self.inner.human_readable_serialization,
);

Expand Down Expand Up @@ -1300,16 +1296,11 @@ where
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;

#[cfg(feature = "in-use-encryption-unstable")]
let encrypted = self.client().auto_encryption_opts().await.is_some();
#[cfg(not(feature = "in-use-encryption-unstable"))]
let encrypted = false;

let insert = Insert::new(
self.namespace(),
vec![doc],
options.map(InsertManyOptions::from_insert_one_options),
encrypted,
self.client().should_auto_encrypt().await,
self.inner.human_readable_serialization,
);
self.client()
Expand Down Expand Up @@ -1459,24 +1450,16 @@ impl<'de> Deserialize<'de> for Namespace {
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct NamespaceHelper {
db: String,
coll: String,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum NamespaceOptions {
enum NamespaceHelper {
String(String),
Object(NamespaceHelper),
Object { db: String, coll: String },
}
match NamespaceOptions::deserialize(deserializer)? {
NamespaceOptions::String(string) => Self::from_str(&string)
match NamespaceHelper::deserialize(deserializer)? {
NamespaceHelper::String(string) => Self::from_str(&string)
.ok_or_else(|| D::Error::custom("Missing one or more fields in namespace")),
NamespaceOptions::Object(object) => Ok(Self {
db: object.db,
coll: object.coll,
}),
NamespaceHelper::Object { db, coll } => Ok(Self { db, coll }),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::{
action::BulkWriteError as ClientBulkWriteError,
action::bulk_write::error::BulkWriteError as ClientBulkWriteError,
bson::Document,
options::ServerAddress,
sdam::TopologyVersion,
Expand Down Expand Up @@ -287,7 +287,7 @@ impl Error {
}

/// Gets the code from this error.
#[allow(unused)]
#[cfg(test)]
pub(crate) fn code(&self) -> Option<i32> {
match self.kind.as_ref() {
ErrorKind::Command(command_error) => Some(command_error.code),
Expand Down Expand Up @@ -878,6 +878,7 @@ impl WriteFailure {
}
}

#[cfg(test)]
pub(crate) fn code(&self) -> i32 {
match self {
Self::WriteConcernError(e) => e.code,
Expand Down
20 changes: 5 additions & 15 deletions src/operation/abort_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,18 @@ impl OperationWithDefaults for AbortTransaction {
))
}

fn handle_response<'a>(
&'a self,
fn handle_response(
&self,
response: RawCommandResponse,
_description: &'a StreamDescription,
_session: Option<&'a mut ClientSession>,
) -> OperationResponse<'a, Self::O> {
_description: &StreamDescription,
_session: Option<&mut ClientSession>,
) -> OperationResponse<'static, Self::O> {
handle_response_sync! {{
let response: WriteConcernOnlyBody = response.body()?;
response.validate()
}}
}

// handle_response!((
// &self,
// response: RawCommandResponse,
// description: &StreamDescription,
// session: Option<&mut ClientSession
// ) -> Result<Self::O> {
// let response: WriteConcernOnlyBody = response.body()?;
// response.validate()
// });

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
match &self.pinned {
Some(TransactionPin::Mongos(s)) => Some(s),
Expand Down
Loading

0 comments on commit 9269180

Please sign in to comment.