diff --git a/crates/datasources/src/bson/builder.rs b/crates/datasources/src/bson/builder.rs index 20fb1d4ed..3138f04ab 100644 --- a/crates/datasources/src/bson/builder.rs +++ b/crates/datasources/src/bson/builder.rs @@ -82,15 +82,14 @@ impl RecordStructBuilder { .field_index .get(key) .ok_or_else(|| BsonError::ColumnNotInInferredSchema(key.to_string()))?; + println!("{}->{}", key, idx); if *cols_set.get(idx).unwrap() { - println!("DUPLICATE SET: {}, {:?}", key, doc); + continue; } // Add value to columns. - let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist. - let col = self.builders.get_mut(idx).unwrap(); // Programmer error if this doesn't exist. - append_value(val, typ, col.as_mut())?; + self.add_value_at_index(idx, Some(val))?; // Track which columns we've added values to. cols_set.set(idx, true); @@ -102,10 +101,46 @@ impl RecordStructBuilder { // Append nulls to all columns not included in the doc. for (idx, did_set) in cols_set.iter().enumerate() { if !did_set { - // Add nulls... - let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist. - let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist. - append_null(typ, col.as_mut())?; + // Add null... + self.add_value_at_index(idx, None)?; + } + } + + Ok(()) + } + + pub fn project_and_append(&mut self, doc: &RawDocument) -> Result<()> { + let mut cols_set: BitVec = BitVec::repeat(false, self.fields.len()); + + for iter_result in doc { + match iter_result { + Ok((key, val)) => { + if let Some(&idx) = self.field_index.get(key) { + if cols_set.get(idx).is_some_and(|v| v == true) { + // If this happens it means that the bson document has a field + // name that appears more than once. This is legal and possible to build + // with some libraries but isn't forbidden, and (I think?) historically + // not (always?) rejected by MongoDB. Regardless "ignoring second + // appearances of the key" is a reasonable semantic. + continue; + } + + // Add value to columns. + self.add_value_at_index(idx, Some(val))?; + + // Track which columns we've added values to. + cols_set.set(idx, true); + }; + } + Err(_) => return Err(BsonError::FailedToReadRawBsonDocument), + } + } + + // Append nulls to all columns not included in the doc. + for (idx, did_set) in cols_set.iter().enumerate() { + if !did_set { + // Add null... + self.add_value_at_index(idx, None)?; } } @@ -115,6 +150,16 @@ impl RecordStructBuilder { pub fn into_fields_and_builders(self) -> (Fields, Vec>) { (self.fields, self.builders) } + + fn add_value_at_index(&mut self, idx: usize, val: Option) -> Result<()> { + let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist. + let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist. + + match val { + Some(v) => append_value(v, typ, col.as_mut()), + None => append_null(typ, col.as_mut()), + } + } } impl ArrayBuilder for RecordStructBuilder { @@ -288,7 +333,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> .as_any_mut() .downcast_mut::() .unwrap(); - builder.append_record(nested)?; + builder.project_and_append(nested)?; } // Array @@ -410,3 +455,92 @@ fn column_builders_for_fields( Ok(cols) } + +#[cfg(test)] +mod test { + use bson::oid::ObjectId; + + use super::*; + + #[test] + fn test_duplicate_field_handling() { + let fields = Fields::from_iter(vec![ + Field::new("_id", DataType::Binary, true), + Field::new("idx", DataType::Int64, true), + Field::new("value", DataType::Utf8, true), + ]); + let mut rsb = RecordStructBuilder::new_with_capacity(fields, 100).unwrap(); + for idx in 0..100 { + let mut buf = bson::RawDocumentBuf::new(); + + buf.append("_id", ObjectId::new()); + buf.append("idx", idx as i64); + buf.append("value", "first"); + buf.append("value", "second"); + assert_eq!(buf.iter().count(), 4); + + rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap()) + .unwrap(); + } + assert_eq!(rsb.len(), 100); + for value in rsb + .builders + .get_mut(2) + .unwrap() + .as_any_mut() + .downcast_mut::() + .unwrap() + .finish_cloned() + .iter() + { + let v = value.unwrap(); + assert_eq!(v, "first"); + } + } + + #[test] + fn test_unexpected_schema_change() { + let fields = Fields::from_iter(vec![ + Field::new("_id", DataType::Binary, true), + Field::new("idx", DataType::Int64, true), + Field::new("value", DataType::Utf8, true), + ]); + let mut rsb = RecordStructBuilder::new_with_capacity(fields, 100).unwrap(); + let mut buf = bson::RawDocumentBuf::new(); + + buf.append("_id", ObjectId::new()); + buf.append("idx", 0); + buf.append("value", "first"); + assert_eq!(buf.iter().count(), 3); + + rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap()) + .expect("first record matchex expectations"); + assert_eq!(rsb.len(), 1); + + let mut buf = bson::RawDocumentBuf::new(); + buf.append("index", 1); + buf.append("values", 3); + assert_eq!(buf.iter().count(), 2); + rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + .expect_err("for append_record schema changes are an error"); + assert_eq!(rsb.len(), 1); + rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + .expect("project and append should filter out unrequired fields"); + assert_eq!(rsb.len(), 2); + + let mut buf = bson::RawDocumentBuf::new(); + buf.append("_id", ObjectId::new()); + buf.append("index", 1); + buf.append("values", 3); + assert_eq!(buf.iter().count(), 3); + + rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + .expect_err("for append_record schema changes are an error"); + // the first value was added successfully to another buffer to the rsb grew + assert_eq!(rsb.len(), 3); + + rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + .expect("project and append should filter out unrequired fields"); + assert_eq!(rsb.len(), 4); + } +} diff --git a/crates/datasources/src/bson/stream.rs b/crates/datasources/src/bson/stream.rs index dd272254c..7635ef3f1 100644 --- a/crates/datasources/src/bson/stream.rs +++ b/crates/datasources/src/bson/stream.rs @@ -58,8 +58,7 @@ impl BsonStream { let mut builder = RecordStructBuilder::new_with_capacity(schema.fields().to_owned(), 100)?; for result in results { - // TOOD: shouldn't convert here. - builder.append_record(&result?)?; + builder.project_and_append(&result?)?; } let (fields, builders) = builder.into_fields_and_builders(); diff --git a/tests/tests/dupes.bson b/tests/tests/dupes.bson new file mode 100644 index 000000000..1e26e66ed Binary files /dev/null and b/tests/tests/dupes.bson differ