diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 5c6d7bb49e..e9aa09fbd3 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -886,7 +886,7 @@ mod test { } #[tokio::test] - #[serial_test::parallel] + #[serial_test::serial] async fn test_source_capture_validation() { let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL) .await diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index c02f50baf7..ace7223521 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -80,7 +80,7 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { for (ptr, _, prop_shape, _) in shape.locations() { if prop_shape.annotations.contains_key("x-collection-name") { - return Ok(doc::Pointer::from_str(&ptr)); + return Ok(ptr); } } Err(anyhow::anyhow!( diff --git a/crates/derive/src/combine_api.rs b/crates/derive/src/combine_api.rs index 28ac6e47f9..a6de1e86dd 100644 --- a/crates/derive/src/combine_api.rs +++ b/crates/derive/src/combine_api.rs @@ -1,7 +1,7 @@ use crate::{new_validator, DebugJson, DocCounter, JsonError, StatsAccumulator}; use anyhow::Context; use bytes::Buf; -use doc::shape::{limits::enforce_field_count_limits, schema::to_schema}; +use doc::shape::{limits::{enforce_shape_complexity_limit, DEFAULT_SCHEMA_COMPLEXITY_LIMIT}, schema::to_schema}; use prost::Message; use proto_flow::flow::combine_api::{self, Code}; @@ -291,7 +291,7 @@ pub fn drain_chunk( doc::LazyNode::Heap(h) => shape.widen(h), }; if changed { - enforce_field_count_limits(shape, json::Location::Root); + enforce_shape_complexity_limit(shape, DEFAULT_SCHEMA_COMPLEXITY_LIMIT); *did_change = true; } } @@ -742,8 +742,8 @@ pub mod test { // Test projection fields == their pointer. flow::Projection { - field: ptr.clone(), - ptr: ptr, + field: ptr.to_string(), + ptr: ptr.to_string(), inference: Some(flow::Inference { default_json: shape .default diff --git a/crates/doc/src/ptr.rs b/crates/doc/src/ptr.rs index 2e1927d17e..bd1ea1f9e5 100644 --- a/crates/doc/src/ptr.rs +++ b/crates/doc/src/ptr.rs @@ -39,7 +39,7 @@ impl<'t> std::fmt::Display for Token { } /// Pointer is a parsed JSON pointer. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Pointer(pub Vec); impl Pointer { diff --git a/crates/doc/src/reduce/schema.rs b/crates/doc/src/reduce/schema.rs index 10b5b00246..12e3739c1f 100644 --- a/crates/doc/src/reduce/schema.rs +++ b/crates/doc/src/reduce/schema.rs @@ -1,5 +1,9 @@ use super::{count_nodes, Cursor, Error, Result}; -use crate::{shape::limits, shape::schema::to_schema, AsNode, HeapNode, Shape}; +use crate::{ + shape::limits, + shape::{limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, schema::to_schema}, + AsNode, HeapNode, Shape, +}; use json::schema::index::IndexBuilder; pub fn json_schema_merge<'alloc, L: AsNode, R: AsNode>( @@ -32,7 +36,7 @@ pub fn json_schema_merge<'alloc, L: AsNode, R: AsNode>( let right = shape_from_node(rhs).map_err(|e| Error::with_location(e, loc))?; let mut merged_shape = Shape::union(left, right); - limits::enforce_field_count_limits(&mut merged_shape, json::Location::Root); + limits::enforce_shape_complexity_limit(&mut merged_shape, DEFAULT_SCHEMA_COMPLEXITY_LIMIT); // Union together the LHS and RHS, and convert back from `Shape` into `HeapNode`. let merged_doc = serde_json::to_value(to_schema(merged_shape)) diff --git a/crates/doc/src/shape/limits.rs b/crates/doc/src/shape/limits.rs index 0b37bc7aec..4836c4ea9f 100644 --- a/crates/doc/src/shape/limits.rs +++ b/crates/doc/src/shape/limits.rs @@ -1,63 +1,170 @@ // This module defines limits which are used to simplify complex, // typically inferred schema Shapes. use super::*; -use json::Location; - -// Prune any locations in this shape that have more than the allowed fields, -// squashing those fields into the `additionalProperties` subschema for that location. -pub fn enforce_field_count_limits(slf: &mut Shape, loc: Location) { - // TODO: If we implement inference/widening of array tuple shapes - // then we'll need to also check that those aren't excessively large. - if slf.type_.overlaps(types::ARRAY) { - if let Some(array_shape) = slf.array.additional_items.as_mut() { - enforce_field_count_limits(array_shape, loc.push_item(0)); +use crate::ptr::Token; +use itertools::Itertools; +use std::cmp::Ordering; + +// Potential future improvement: currently this squashes any non-INVALID `additional*` +// shape to accept anything, the equivalent to the `true` schema. But really, we just +// want to remove recursive shapes if we overlap with `OBJECT` or `ARRAY`, and could +// happily leave other non-recursive/atomic types alone, retaining e.g integer or string bounds. +fn squash_addl(props: Option>) -> Option> { + match props { + Some(inner) if inner.type_.eq(&types::INVALID) => Some(Box::new(Shape::nothing())), + Some(_) | None => None, + } +} + +// Squashing a shape inside an array tuple is special because the location +// of shapes inside the tuple is _itself_ the key into that container. +// This means that if we do anything to shift the keys of still-existing shapes, +// they won't be valid any longer. With that in mind, there's also no good reason +// to squash one object field over any other, so let's just treat +// Token::Index and Token::Property as signals to squash _an_ index or property, +// leaving it up to the implementation to determine which one. +fn squash_location_inner(shape: &mut Shape, name: &Token) { + match name { + // Squashing of `additional*` fields is not possible here because we don't + // have access to the parent shape + Token::NextIndex => unreachable!(), + Token::Property(prop) if prop == "*" => unreachable!(), + + Token::Index(_) => { + // Pop the last element from the array tuple shape to avoid + // shifting the indexes of any other tuple shapes + let mut shape_to_squash = shape + .array + .tuple + .pop() + .expect("No array tuple property to squash"); + + shape_to_squash.array.additional_items = + squash_addl(shape_to_squash.array.additional_items); + shape_to_squash.object.additional_properties = + squash_addl(shape_to_squash.object.additional_properties); + + if let Some(addl_items) = shape.array.additional_items.take() { + shape.array.additional_items = + Some(Box::new(Shape::union(*addl_items, shape_to_squash))); + } else { + shape.array.additional_items = Some(Box::new(shape_to_squash)); + } + } + Token::Property(_) => { + // Remove location from parent properties + let ObjProperty { + shape: mut shape_to_squash, + name: prop_name, + .. + } = shape + .object + .properties + .pop() + .expect("No object property to squash"); + + shape_to_squash.array.additional_items = + squash_addl(shape_to_squash.array.additional_items); + shape_to_squash.object.additional_properties = + squash_addl(shape_to_squash.object.additional_properties); + + // First check to see if it matches a pattern + // and if so squash into that pattern's shape + if let Some(pattern) = shape + .object + .pattern_properties + .iter_mut() + .find(|pattern| regex_matches(&pattern.re, &prop_name)) + { + pattern.shape = Shape::union( + // Ideally we'd use a function like `replace_with` to allow replacing + // pattern.shape with a value mapped from its previous value, but + // that function doesn't exist yet. See https://github.com/rust-lang/rfcs/pull/1736 + // Instead, we must replace it with something temporarily while + // Shape::union runs. Once it finishes, this `Shape::nothing()` is discarded. + std::mem::replace(&mut pattern.shape, Shape::nothing()), + shape_to_squash, + ) + } else if let Some(addl_properties) = shape.object.additional_properties.take() { + shape.object.additional_properties = + Some(Box::new(Shape::union(*addl_properties, shape_to_squash))); + } else { + shape.object.additional_properties = Some(Box::new(shape_to_squash)) + } + } + } +} + +fn squash_location(shape: &mut Shape, location: &[Token]) { + match location { + [] => unreachable!(), + [Token::NextIndex] => unreachable!(), + [Token::Property(prop_name)] if prop_name == "*" => unreachable!(), + + [first] => squash_location_inner(shape, first), + [first, more @ ..] => { + let inner = match first { + Token::NextIndex => shape.array.additional_items.as_deref_mut(), + Token::Property(prop_name) if prop_name == "*" => { + shape.object.additional_properties.as_deref_mut() + } + Token::Index(idx) => shape.array.tuple.get_mut(*idx), + Token::Property(prop_name) => shape + .object + .properties + .binary_search_by(|prop| prop.name.as_ref().cmp(&prop_name)) + .ok() + .and_then(|idx| shape.object.properties.get_mut(idx)) + .map(|inner| &mut inner.shape), + } + .expect(&format!( + "Attempted to find property {first} that does not exist (more: {more:?})" + )); + squash_location(inner, more) } } +} + +/// Reduce the size/complexity of a shape while making sure that all +/// objects that used to pass validation still do. +pub fn enforce_shape_complexity_limit(shape: &mut Shape, limit: usize) { + let mut pointers = shape + .locations() + .into_iter() + .filter_map(|(ptr, _, _, _)| match ptr.0.as_slice() { + // We need to include `/*/foo` in order to squash inside `additional*` subschemas, + // but we don't want to include those locations that are leaf nodes, since + // leaf node recursion is squashed every time we squash a concrete property. + [.., Token::NextIndex] => None, + [.., Token::Property(prop_name)] if prop_name == "*" => None, + [] => None, + _ => Some(ptr), + }) + .collect_vec(); - if !slf.type_.overlaps(types::OBJECT) { + if pointers.len() < limit { return; } - let limit = match loc { - Location::Root => MAX_ROOT_FIELDS, - _ => MAX_NESTED_FIELDS, - }; - - if slf.object.properties.len() > limit { - // Take all of the properties' shapes and - // union them into additionalProperties - - let existing_additional_properties = slf - .object - .additional_properties - // `Shape::union` takes owned Shapes which is why we - // have to take ownership here. - .take() - .map(|boxed| *boxed) - .unwrap_or(Shape::nothing()); - - let merged_additional_properties = slf - .object - .properties - // As part of squashing all known property shapes together into - // additionalProperties, we need to also remove those explicit properties. - .drain(..) - .fold(existing_additional_properties, |accum, mut prop| { - // Recur here to avoid excessively large `additionalProperties` shapes - enforce_field_count_limits(&mut prop.shape, loc.push_prop(&prop.name)); - Shape::union(accum, prop.shape) - }); - - slf.object.additional_properties = Some(Box::new(merged_additional_properties)); - } else { - for prop in slf.object.properties.iter_mut() { - enforce_field_count_limits(&mut prop.shape, loc.push_prop(&prop.name)) + pointers.sort_by(|a_ptr, b_ptr| { + // order by depth, then by pointer location + match a_ptr.0.len().cmp(&b_ptr.0.len()) { + // Same depth, stably sort by pointer location + Ordering::Equal => a_ptr.cmp(&b_ptr), + depth => depth, } + }); + + while pointers.len() > limit { + let location_ptr = pointers + .pop() + .expect("locations vec was just checked to be non-empty"); + + squash_location(shape, location_ptr.0.as_slice()); } } -pub const MAX_ROOT_FIELDS: usize = 750; -pub const MAX_NESTED_FIELDS: usize = 200; +pub const DEFAULT_SCHEMA_COMPLEXITY_LIMIT: usize = 1_000; #[cfg(test)] mod test { @@ -70,7 +177,7 @@ mod test { initial_schema: Option<&str>, expected_schema: &str, docs: &[serde_json::Value], - enforce_limits: bool, + enforce_limits: Option, ) -> Shape { let mut schema = match initial_schema { Some(initial) => shape_from(initial), @@ -83,8 +190,8 @@ mod test { let expected = shape_from(expected_schema); - if enforce_limits { - enforce_field_count_limits(&mut schema, Location::Root); + if let Some(limit) = enforce_limits { + enforce_shape_complexity_limit(&mut schema, limit); } assert_eq!(expected, schema); @@ -113,7 +220,7 @@ mod test { maximum: 10000 "#, dynamic_keys.as_slice(), - true, + Some(0), ); } @@ -122,8 +229,10 @@ mod test { // Create an object like // { // "big_key": { - // ...751 properties... + // "key-0": 5, + // ...750 more properties... // }, + // "key-0": 5, // ...750 more properties... // } let mut root = BTreeMap::new(); @@ -143,13 +252,9 @@ mod test { minimum: 0 maximum: 10000 - type: object - additionalProperties: - type: integer - minimum: 0 - maximum: 10000 "#, &[json!(root)], - true, + Some(0), ); } @@ -178,7 +283,7 @@ mod test { maximum: 0 "#, &[json!({ "container": nested })], - true, + Some(4), ); for id in 0..300 { @@ -200,7 +305,7 @@ mod test { maximum: 10000 "#, &[json!({ "container": nested })], - true, + Some(1), ); } @@ -223,9 +328,9 @@ mod test { maxLength: 4 "#, &[json!([{"key": "test"}])], - true, + Some(3), ); - let dynamic_array_objects = (0..800) + let dynamic_array_objects = (0..8) .map(|id| { json!([{ format!("key-{id}"): "test" @@ -262,7 +367,7 @@ mod test { maxLength: 4 "#, &dynamic_array_objects, - true, + Some(0), ); } @@ -294,7 +399,58 @@ mod test { maximum: 0 "#, dynamic_keys.as_slice(), - true, + Some(20), + ); + } + + #[test] + fn test_deep_nesting() { + let mut doc = json!({}); + for idx in 0..10 { + doc = json!({format!("foo{idx}"): doc, format!("bar{idx}"): doc}); + } + + widening_snapshot_helper( + None, + r#" + type: object + additionalProperties: + type: object + "#, + &[doc], + Some(0), + ); + } + + #[test] + fn test_quickcheck_regression() { + widening_snapshot_helper( + None, + r#" + type: array + maxItems: 1 + items: + type: object + additionalProperties: false + "#, + &[json!([{}])], + Some(0), + ); + } + + #[test] + fn test_quickcheck_regression_2() { + widening_snapshot_helper( + None, + r#" + type: object + additionalProperties: + type: array + maxItems: 0 + additionalItems: false + "#, + &[json!({"foo":[]})], + Some(0), ); } } diff --git a/crates/doc/src/shape/location.rs b/crates/doc/src/shape/location.rs index 633f8eb2e6..2434a1bdb6 100644 --- a/crates/doc/src/shape/location.rs +++ b/crates/doc/src/shape/location.rs @@ -70,7 +70,7 @@ impl Shape { /// Produce flattened locations of nested items and properties of this Shape, /// as tuples of the encoded location JSON Pointer, an indication of whether /// the pointer is a pattern, its Shape, and an Exists constraint. - pub fn locations(&self) -> Vec<(String, bool, &Shape, Exists)> { + pub fn locations(&self) -> Vec<(Pointer, bool, &Shape, Exists)> { let mut out = Vec::new(); self.locations_inner(Location::Root, Exists::Must, false, &mut out); out @@ -167,14 +167,14 @@ impl Shape { location: Location<'_>, exists: Exists, pattern: bool, - out: &mut Vec<(String, bool, &'s Shape, Exists)>, + out: &mut Vec<(Pointer, bool, &'s Shape, Exists)>, ) { let exists = if self.type_ == types::INVALID { Exists::Cannot } else { exists }; - out.push((location.pointer_str().to_string(), pattern, self, exists)); + out.push((Pointer::from_location(&location), pattern, self, exists)); // Traverse sub-locations of this location when it takes an object // or array type. As a rule, children must exist only if their parent @@ -338,8 +338,10 @@ mod test { (&arr2, "/-", ("", Exists::Cannot)), ]; - for (shape, ptr, expect) in cases { - let actual = shape.locate(&Pointer::from(ptr)); + for (&ref shape, ptr, expect) in cases { + let mut_shape = &mut shape.clone(); + + let actual = mut_shape.locate(&Pointer::from(ptr)); let actual = ( actual .0 @@ -349,54 +351,79 @@ mod test { .unwrap_or(""), actual.1, ); - assert_eq!(expect, &actual, "case {:?}", ptr); + assert_eq!(*expect, actual, "case {:?}", ptr); } let obj_locations = obj.locations(); let obj_locations = obj_locations .iter() - .map(|(ptr, pattern, shape, exists)| (ptr.as_ref(), *pattern, shape.type_, *exists)) + .map(|(ptr, pattern, shape, exists)| (ptr.to_string(), *pattern, shape.type_, *exists)) .collect::>(); assert_eq!( obj_locations, vec![ - ("", false, types::OBJECT, Exists::Must), - ("/1", false, types::OBJECT, Exists::Must), - ("/1/-", false, types::OBJECT, Exists::Must), - ("/1/-/2", false, types::STRING, Exists::Must), + ("".to_string(), false, types::OBJECT, Exists::Must), + ("/1".to_string(), false, types::OBJECT, Exists::Must), + ("/1/-".to_string(), false, types::OBJECT, Exists::Must), + ("/1/-/2".to_string(), false, types::STRING, Exists::Must), ( - "/multi-type", + "/multi-type".to_string(), false, types::ARRAY | types::OBJECT, Exists::May ), - ("/multi-type/child", false, types::STRING, Exists::May), - ("/parent", false, types::OBJECT, Exists::Must), - ("/parent/40two", false, types::STRING, Exists::May), - ("/parent/impossible", false, types::INVALID, Exists::Cannot), - ("/parent/opt-child", false, types::STRING, Exists::May), - ("/parent/req-child", false, types::STRING, Exists::Must), - ("/prop", false, types::STRING, Exists::May), - ("/pattern+", true, types::STRING, Exists::May), - ("/*", true, types::STRING, Exists::May), + ( + "/multi-type/child".to_string(), + false, + types::STRING, + Exists::May + ), + ("/parent".to_string(), false, types::OBJECT, Exists::Must), + ( + "/parent/40two".to_string(), + false, + types::STRING, + Exists::May + ), + ( + "/parent/impossible".to_string(), + false, + types::INVALID, + Exists::Cannot + ), + ( + "/parent/opt-child".to_string(), + false, + types::STRING, + Exists::May + ), + ( + "/parent/req-child".to_string(), + false, + types::STRING, + Exists::Must + ), + ("/prop".to_string(), false, types::STRING, Exists::May), + ("/pattern+".to_string(), true, types::STRING, Exists::May), + ("/*".to_string(), true, types::STRING, Exists::May), ] ); let arr_locations = arr1.locations(); let arr_locations = arr_locations .iter() - .map(|(ptr, pattern, shape, exists)| (ptr.as_ref(), *pattern, shape.type_, *exists)) + .map(|(ptr, pattern, shape, exists)| (ptr.to_string(), *pattern, shape.type_, *exists)) .collect::>(); assert_eq!( arr_locations, vec![ - ("", false, types::ARRAY, Exists::Must), - ("/0", false, types::STRING, Exists::Must), - ("/1", false, types::STRING, Exists::Must), - ("/2", false, types::STRING, Exists::May), - ("/-", true, types::STRING, Exists::May), + ("".to_string(), false, types::ARRAY, Exists::Must), + ("/0".to_string(), false, types::STRING, Exists::Must), + ("/1".to_string(), false, types::STRING, Exists::Must), + ("/2".to_string(), false, types::STRING, Exists::May), + ("/-".to_string(), true, types::STRING, Exists::May), ] ); } diff --git a/crates/doc/tests/shape_fuzz.rs b/crates/doc/tests/shape_fuzz.rs index 01df67b0f0..aa0242c577 100644 --- a/crates/doc/tests/shape_fuzz.rs +++ b/crates/doc/tests/shape_fuzz.rs @@ -1,6 +1,5 @@ -use doc::{shape::schema::to_schema, Shape, Validator}; use itertools::Itertools; -use quickcheck::{Arbitrary, Gen, QuickCheck}; +use quickcheck::Arbitrary; use serde_json::{Map, Number, Value}; use std::{collections::BTreeMap, ops::Range}; @@ -80,11 +79,13 @@ fn gen_value(g: &mut quickcheck::Gen, n: usize) -> Value { } fn gen_array(g: &mut quickcheck::Gen, n: usize) -> Vec { - (0..gen_range(g, 2..20)).map(|_| gen_value(g, n)).collect() + (0..gen_range(g, 2..(n as u64) + 3)) + .map(|_| gen_value(g, n)) + .collect() } fn gen_map(g: &mut quickcheck::Gen, n: usize) -> Map { - (0..gen_range(g, 2..20)) + (0..gen_range(g, 2..(n as u64) + 3)) .map(|_| { ( ::arbitrary(g), @@ -94,55 +95,122 @@ fn gen_map(g: &mut quickcheck::Gen, n: usize) -> Map { .collect() } -fn roundtrip_schema_widening_validation(vals: Vec) -> bool { - let mut shape = Shape::nothing(); - for val in &vals { - shape.widen(val); - } +#[cfg(test)] +mod test { + use crate::ArbitraryValue; + use doc::{ + shape::{limits::enforce_shape_complexity_limit, schema::to_schema}, + Shape, Validator, + }; + use itertools::Itertools; + use quickcheck::{Gen, QuickCheck, TestResult}; + use serde_json::{json, Value}; - let schema = json::schema::build::build_schema( - url::Url::parse("https://example").unwrap(), - &serde_json::to_value(to_schema(shape.clone())).unwrap(), - ) - .unwrap(); + fn assert_docs_fit_schema(docs: Vec, shape: Shape) -> bool { + let schema = json::schema::build::build_schema( + url::Url::parse("https://example").unwrap(), + &serde_json::to_value(to_schema(shape.clone())).unwrap(), + ) + .unwrap(); - let schema_yaml = serde_yaml::to_string(&to_schema(shape)).unwrap(); + let schema_yaml = serde_yaml::to_string(&to_schema(shape)).unwrap(); - let mut validator = Validator::new(schema).unwrap(); + let mut validator = Validator::new(schema).unwrap(); - for val in vals { - let res = validator.validate(None, &val); - if let Ok(validation) = res { - if validation.validator.invalid() { - let errs = validation - .validator - .outcomes() - .iter() - .map(|(outcome, _span)| format!("{}", outcome)) - .collect_vec() - .join(r#","#); + for val in docs { + let res = validator.validate(None, &val); + if let Ok(validation) = res { + if validation.validator.invalid() { + let errs = validation + .validator + .outcomes() + .iter() + .map(|(outcome, _span)| format!("{}", outcome)) + .collect_vec() + .join(r#","#); - println!( - r#"Schema {} failed validation for document {}: "{}\n"#, - schema_yaml, val, errs - ); + println!( + r#"Schema {} failed validation for document {}: "{}\n"#, + schema_yaml, val, errs + ); + return false; + } + } else { return false; } - } else { + } + return true; + } + + fn shape_limits(vals: Vec, limit: usize) -> bool { + let mut shape = Shape::nothing(); + for val in &vals { + shape.widen(val); + } + + let initial_locations = shape.locations().len(); + let initial_schema_yaml = serde_yaml::to_string(&to_schema(shape.clone())).unwrap(); + + enforce_shape_complexity_limit(&mut shape, limit); + + let enforced_locations = shape + .locations() + .iter() + .filter(|(ptr, pattern, _, _)| !pattern && ptr.0.len() > 0) + .collect_vec() + .len(); + + if enforced_locations > limit || !assert_docs_fit_schema(vals.clone(), shape.clone()) { + let schema_yaml = serde_yaml::to_string(&to_schema(shape)).unwrap(); + println!("Started with {initial_locations} initial locations, enforced down to {enforced_locations}, limit was {limit}"); + println!( + "start: {initial_schema_yaml}\nenforced down to: {schema_yaml}\ndocs:{vals:?}" + ); return false; + } else { + return true; } } - return true; -} -#[test] -fn fuzz_json() { - fn inner_test(av: Vec) -> bool { - let vals = av.into_iter().map(|v| v.0).collect_vec(); - roundtrip_schema_widening_validation(vals) + fn roundtrip_schema_widening_validation(vals: Vec) -> bool { + let mut shape = Shape::nothing(); + for val in &vals { + shape.widen(val); + } + + assert_docs_fit_schema(vals, shape) } - QuickCheck::new() - .gen(Gen::new(20)) - .quickcheck(inner_test as fn(Vec) -> bool); + #[test] + fn test_case_obj() { + assert_eq!(true, shape_limits(vec![json!({"":{"":55}})], 1)); + } + + #[test] + fn fuzz_roundtrip() { + fn inner_test(av: Vec) -> bool { + let vals = av.into_iter().map(|v| v.0).collect_vec(); + roundtrip_schema_widening_validation(vals) + } + + QuickCheck::new() + .gen(Gen::new(100)) + .quickcheck(inner_test as fn(Vec) -> bool); + } + + #[test] + fn fuzz_limiting() { + fn inner_test(av: Vec, limit: usize) -> TestResult { + if limit < 1 { + return TestResult::discard(); + } + let vals = av.into_iter().map(|v| v.0).collect_vec(); + TestResult::from_bool(shape_limits(vals, limit)) + } + + QuickCheck::new() + .gen(Gen::new(100)) + .tests(1000) + .quickcheck(inner_test as fn(Vec, usize) -> TestResult); + } } diff --git a/crates/flow-web/src/lib.rs b/crates/flow-web/src/lib.rs index 46cc8153c5..943e19c212 100644 --- a/crates/flow-web/src/lib.rs +++ b/crates/flow-web/src/lib.rs @@ -87,10 +87,10 @@ pub fn infer(schema: JsValue) -> Result { .locations() .into_iter() .map(|(ptr, is_pattern, prop_shape, exists)| { - let name = if ptr.is_empty() || is_pattern { + let name = if ptr.0.is_empty() || is_pattern { None } else { - Some((&ptr[1..]).to_string()) + Some((&ptr.to_string()[1..]).to_string()) }; let types = prop_shape.type_.iter().map(|ty| ty.to_string()).collect(); @@ -109,7 +109,7 @@ pub fn infer(schema: JsValue) -> Result { title: prop_shape.title.clone().map(Into::into), description: prop_shape.description.clone().map(Into::into), reduction: reduce_description(prop_shape.reduction.clone()).to_string(), - pointer: ptr, + pointer: ptr.to_string(), types, enum_vals, string_format, diff --git a/crates/flowctl/src/raw/discover.rs b/crates/flowctl/src/raw/discover.rs index 90ce1cfc2a..70630f0e41 100644 --- a/crates/flowctl/src/raw/discover.rs +++ b/crates/flowctl/src/raw/discover.rs @@ -213,8 +213,7 @@ fn schema_to_sample_json(schema_shape: &Shape) -> Result anyhow::Result<()> { println!("|---|---|---|---|---|"); for (ptr, pattern, shape, exists) in shape.locations() { - if args.exclude.contains(&ptr) { + let ptr_str = ptr.to_string(); + if args.exclude.contains(&ptr_str) { continue; } let formatted_ptr = surround_if( exists.cannot(), "~~", - surround_if(exists.must(), "**", surround_if(pattern, "_", Code(&ptr))), + surround_if( + exists.must(), + "**", + surround_if(pattern, "_", Code(ptr_str.as_str())), + ), ); let title = shape.title.as_deref().unwrap_or(""); diff --git a/crates/validation/src/collection.rs b/crates/validation/src/collection.rs index 6de753cfa6..989b811d58 100644 --- a/crates/validation/src/collection.rs +++ b/crates/validation/src/collection.rs @@ -288,11 +288,11 @@ fn walk_collection_projections( // Now add all statically inferred locations from the read-time JSON schema // which are not patterns or the document root. for (ptr, pattern, r_shape, r_exists) in effective_read_schema.shape.locations() { - if pattern || ptr.is_empty() { + if pattern || ptr.0.is_empty() { continue; } // Canonical-ize by stripping the leading "/". - let field = ptr[1..].to_string(); + let field = ptr.to_string()[1..].to_string(); // Special case to avoid creating a conflicting projection when the collection // schema contains a field with the same name as the default root projection. if field == FLOW_DOCUMENT { diff --git a/go/runtime/capture.go b/go/runtime/capture.go index d6f5369eee..e8f96370f3 100644 --- a/go/runtime/capture.go +++ b/go/runtime/capture.go @@ -147,8 +147,9 @@ func (c *Capture) StartReadingMessages( cp pf.Checkpoint, _ *flow.Timepoint, ch chan<- consumer.EnvelopeOrError, + enableSchemaInference bool, ) { - if err := c.startReadingMessages(shard, cp, ch); err != nil { + if err := c.startReadingMessages(shard, cp, ch, enableSchemaInference); err != nil { ch <- consumer.EnvelopeOrError{Error: err} } } @@ -157,6 +158,7 @@ func (c *Capture) startReadingMessages( shard consumer.Shard, cp pf.Checkpoint, ch chan<- consumer.EnvelopeOrError, + enableSchemaInference bool, ) error { // A consumer.Envelope requires a JournalSpec, of which only the Name is actually // used (for sequencing messages and producing checkpoints). @@ -271,9 +273,7 @@ func (c *Capture) startReadingMessages( binding.Collection.Key, binding.Collection.PartitionFields, binding.Collection.Projections, - // Enable schema inference for captures - // true, - false, // TODO(johnny): temporarily disable schema inference. + enableSchemaInference, ) } diff --git a/go/runtime/flow_consumer.go b/go/runtime/flow_consumer.go index a5a1181ce9..1969993845 100644 --- a/go/runtime/flow_consumer.go +++ b/go/runtime/flow_consumer.go @@ -27,10 +27,11 @@ import ( type FlowConsumerConfig struct { runconsumer.BaseConfig Flow struct { - BuildsRoot string `long:"builds-root" required:"true" env:"BUILDS_ROOT" description:"Base URL for fetching Flow catalog builds"` - BrokerRoot string `long:"broker-root" required:"true" env:"BROKER_ROOT" default:"/gazette/cluster" description:"Broker Etcd base prefix"` - Network string `long:"network" description:"The Docker network that connector containers are given access to, defaults to the bridge network"` - TestAPIs bool `long:"test-apis" description:"Enable APIs exclusively used while running catalog tests"` + BuildsRoot string `long:"builds-root" required:"true" env:"BUILDS_ROOT" description:"Base URL for fetching Flow catalog builds"` + BrokerRoot string `long:"broker-root" required:"true" env:"BROKER_ROOT" default:"/gazette/cluster" description:"Broker Etcd base prefix"` + Network string `long:"network" description:"The Docker network that connector containers are given access to, defaults to the bridge network"` + TestAPIs bool `long:"test-apis" description:"Enable APIs exclusively used while running catalog tests"` + EnableSchemaInference bool `long:"enable-schema-inference" description:"Enable schema inference for capture tasks" ` } `group:"flow" namespace:"flow" env-namespace:"FLOW"` } @@ -118,7 +119,7 @@ func (f *FlowConsumer) StartReadingMessages(shard consumer.Shard, store consumer var tp = f.Timepoint.Now f.Timepoint.Mu.Unlock() - store.(Application).StartReadingMessages(shard, checkpoint, tp, envOrErr) + store.(Application).StartReadingMessages(shard, checkpoint, tp, envOrErr, f.Config.Flow.EnableSchemaInference) } // ReplayRange delegates to the Application. diff --git a/go/runtime/interfaces.go b/go/runtime/interfaces.go index 591a67a9b8..f2cc2e5489 100644 --- a/go/runtime/interfaces.go +++ b/go/runtime/interfaces.go @@ -20,7 +20,7 @@ type Application interface { FinalizeTxn(consumer.Shard, *message.Publisher) error FinishedTxn(consumer.Shard, consumer.OpFuture) - StartReadingMessages(consumer.Shard, pc.Checkpoint, *flow.Timepoint, chan<- consumer.EnvelopeOrError) + StartReadingMessages(consumer.Shard, pc.Checkpoint, *flow.Timepoint, chan<- consumer.EnvelopeOrError, bool) ReplayRange(_ consumer.Shard, _ pb.Journal, begin, end pb.Offset) message.Iterator ReadThrough(pb.Offsets) (pb.Offsets, error) } diff --git a/go/runtime/task_term.go b/go/runtime/task_term.go index 388c455ba1..b5544bc3f2 100644 --- a/go/runtime/task_term.go +++ b/go/runtime/task_term.go @@ -244,6 +244,8 @@ func (r *taskReader) StartReadingMessages( cp pc.Checkpoint, tp *flow.Timepoint, ch chan<- consumer.EnvelopeOrError, + // We'll just ignore this here, we don't want to do schema inference when reading from collections + enableSchemaInference bool, ) { shuffle.StartReadingMessages(shard.Context(), r.readBuilder, cp, tp, ch) }