Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve schema limits #1157

Merged
merged 8 commits into from
Aug 24, 2023
2 changes: 1 addition & 1 deletion crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ mod test {
}

#[tokio::test]
#[serial_test::parallel]
#[serial_test::serial]
async fn test_source_capture_validation() {
let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<doc::Pointer> {

for (ptr, _, prop_shape, _) in shape.locations() {
if prop_shape.annotations.contains_key("x-collection-name") {
return Ok(doc::Pointer::from_str(&ptr));
return Ok(ptr);
}
}
Err(anyhow::anyhow!(
Expand Down
8 changes: 4 additions & 4 deletions crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{new_validator, DebugJson, DocCounter, JsonError, StatsAccumulator};
use anyhow::Context;
use bytes::Buf;
use doc::shape::{limits::enforce_field_count_limits, schema::to_schema};
use doc::shape::{limits::{enforce_shape_complexity_limit, DEFAULT_SCHEMA_COMPLEXITY_LIMIT}, schema::to_schema};
use prost::Message;
use proto_flow::flow::combine_api::{self, Code};

Expand Down Expand Up @@ -291,7 +291,7 @@ pub fn drain_chunk(
doc::LazyNode::Heap(h) => shape.widen(h),
};
if changed {
enforce_field_count_limits(shape, json::Location::Root);
enforce_shape_complexity_limit(shape, DEFAULT_SCHEMA_COMPLEXITY_LIMIT);
*did_change = true;
}
}
Expand Down Expand Up @@ -742,8 +742,8 @@ pub mod test {

// Test projection fields == their pointer.
flow::Projection {
field: ptr.clone(),
ptr: ptr,
field: ptr.to_string(),
ptr: ptr.to_string(),
inference: Some(flow::Inference {
default_json: shape
.default
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/src/ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<'t> std::fmt::Display for Token {
}

/// Pointer is a parsed JSON pointer.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Pointer(pub Vec<Token>);

impl Pointer {
Expand Down
8 changes: 6 additions & 2 deletions crates/doc/src/reduce/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use super::{count_nodes, Cursor, Error, Result};
use crate::{shape::limits, shape::schema::to_schema, AsNode, HeapNode, Shape};
use crate::{
shape::limits,
shape::{limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, schema::to_schema},
AsNode, HeapNode, Shape,
};
use json::schema::index::IndexBuilder;

pub fn json_schema_merge<'alloc, L: AsNode, R: AsNode>(
Expand Down Expand Up @@ -32,7 +36,7 @@ pub fn json_schema_merge<'alloc, L: AsNode, R: AsNode>(
let right = shape_from_node(rhs).map_err(|e| Error::with_location(e, loc))?;

let mut merged_shape = Shape::union(left, right);
limits::enforce_field_count_limits(&mut merged_shape, json::Location::Root);
limits::enforce_shape_complexity_limit(&mut merged_shape, DEFAULT_SCHEMA_COMPLEXITY_LIMIT);

// Union together the LHS and RHS, and convert back from `Shape` into `HeapNode`.
let merged_doc = serde_json::to_value(to_schema(merged_shape))
Expand Down
Loading
Loading