Skip to content

Commit

Permalink
models/protocol: add backfill counter and stateKey
Browse files Browse the repository at this point in the history
Add `backfill` counter to models for capture & materialization bindings
as well as derivation transforms.

Propagate the `backfill` value through the coresponding Validate request
types and into built task specifications.

Refactor `journal_read_suffix` encoding logic for resource paths to be
aware of the `backfill` version. When `backfill` is zero, the encoded
resource path is unchanged. Larger values of `backfill` add a new `.v1`,
`.v2`, and so on suffix to the encoded resource path.

Attach encoded resource paths to built specifications as a new
`state_key` field, which connectors can use to key connector states.

Also re-define the current `journal_read_suffix` fields in terms of the
computed `state_key`.

We may eventually remove `journal_read_suffix`, but not yet.

Issue #1276
  • Loading branch information
jgraettinger committed Nov 21, 2023
1 parent 7294430 commit f830670
Show file tree
Hide file tree
Showing 57 changed files with 1,617 additions and 931 deletions.
1 change: 1 addition & 0 deletions crates/agent/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub fn merge_capture(
target: models::Collection::new(format!("{capture_prefix}/{recommended_name}")),
disable: discovered_binding.disable,
resource: models::RawValue::from_value(&resource),
backfill: 0,
});
filtered_bindings.push(discovered_binding);
}
Expand Down
1 change: 1 addition & 0 deletions crates/agent/src/publications/linked_materializations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ async fn update_linked_materialization(
disable: false,
fields: Default::default(),
priority: Default::default(),
backfill: 0,
};
materialization.bindings.push(binding);
}
Expand Down
58 changes: 38 additions & 20 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,21 +526,25 @@ fn push_partitions(fields: &BTreeMap<String, Vec<Value>>, out: &mut Vec<broker::
}
}

/// `encode_resource_path` encodes path components into a string which is
/// suitable for use within a Gazette path, such as a Journal name or suffix, or
/// a Shard ID.
/// `encode_state_key` encodes resource path components and a backfill counter
/// into a stable string value which is suited for indexing within a persistent
/// binding state, such as a Flow runtime checkpoint or a connector state.
///
/// Paths are restricted to unicode letters and numbers, plus the symbols
/// `-_+/.=%`. All other runes are percent-encoded.
/// State keys have a restricted set of allowed characters, due to the way
/// they're represented within Flow runtime checkpoints and, internal to those
/// checkpoints, as suffixes attached to Gazette Journal names.
///
/// State keys are restricted to unicode letters and numbers, plus the symbols
/// `-_+.=`. All other runes are percent-encoded.
///
/// See Gazette for more details:
/// - Path Tokens: broker/protocol/validator.go
/// - Path Validation Rules: broker/protocol/journal_spec_extensions.go
pub fn encode_resource_path(resource_path: &[impl AsRef<str>]) -> String {
pub fn encode_state_key(resource_path: &[impl AsRef<str>], backfill: u32) -> String {
let mut parts = Vec::new();
parts.extend(resource_path.iter().map(AsRef::as_ref));

let mut name = String::new();
let mut key = String::new();

for c in parts.join("/").chars() {
match c {
Expand All @@ -549,16 +553,20 @@ pub fn encode_resource_path(resource_path: &[impl AsRef<str>]) -> String {
// certain positions, no repeats, etc. As a resource_path
// potentially contains arbitrary user input, we percent encode any
// `/` characters here to avoid duplicating that validation logic.
'-' | '_' | '+' | '.' | '=' => name.push(c),
_ if c.is_alphanumeric() => name.push(c),
c => name.extend(percent_encoding::utf8_percent_encode(
'-' | '_' | '+' | '.' | '=' => key.push(c),
_ if c.is_alphanumeric() => key.push(c),
c => key.extend(percent_encoding::utf8_percent_encode(
&c.to_string(),
percent_encoding::NON_ALPHANUMERIC,
)),
}
}

name
if backfill != 0 {
key.extend(format!(".v{backfill}").chars());
}

key
}

pub fn compression_codec(t: models::CompressionCodec) -> broker::CompressionCodec {
Expand Down Expand Up @@ -650,18 +658,28 @@ mod test {
}

#[test]
fn test_name_escapes() {
let out = encode_resource_path(&vec![
"he!lo৬".to_string(),
"a/part%".to_string(),
"_¾the-=res+.".to_string(),
]);
assert_eq!(&out, "he%21lo৬%2Fa%2Fpart%25%2F_¾the-=res+.");
fn test_state_key_escapes() {
let out = encode_state_key(&["table"], 0);
assert_eq!(&out, "table");
let out = encode_state_key(&["public", "table"], 0);
assert_eq!(&out, "public%2Ftable");
let out = encode_state_key(&["public", "table"], 1);
assert_eq!(&out, "public%2Ftable.v1");

let out = encode_state_key(
&vec![
"he!lo৬".to_string(),
"a/part%".to_string(),
"_¾the-=res+.".to_string(),
],
3,
);
assert_eq!(&out, "he%21lo৬%2Fa%2Fpart%25%2F_¾the-=res+..v3");

let gross_url =
"http://user:password@foo.bar.example.com:9000/hooks///baz?type=critical&test=true";
let out = encode_resource_path(&vec!["prefix".to_string(), gross_url.to_string()]);
assert_eq!(&out, "prefix%2Fhttp%3A%2F%2Fuser%3Apassword%40foo.bar.example.com%3A9000%2Fhooks%2F%2F%2Fbaz%3Ftype=critical%26test=true");
let out = encode_state_key(&vec!["prefix".to_string(), gross_url.to_string()], 42);
assert_eq!(&out, "prefix%2Fhttp%3A%2F%2Fuser%3Apassword%40foo.bar.example.com%3A9000%2Fhooks%2F%2F%2Fbaz%3Ftype=critical%26test=true.v42");
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions crates/derive-sqlite/src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn parse_validate(
collection: source,
lambda_config_json,
shuffle_lambda_config_json: _,
backfill: _,
} = transform;

let source = source.as_ref().unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub async fn do_discover(
target: collection.clone(),
disable: false,
resource: models::RawValue::from_string(binding.resource_config_json)?,
backfill: 0,
});

collections.insert(
Expand Down
3 changes: 3 additions & 0 deletions crates/json/src/schema/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum Format {
Regex,
#[serde(rename = "relative-json-pointer")]
RelativeJsonPointer,
#[serde(alias = "uint32", alias = "uint64")]
Integer,
Number,
}
Expand Down Expand Up @@ -266,6 +267,8 @@ mod test {
("integer", "1_234", true),
("integer", "1.234", false),
("integer", " 1234", false),
("uint32", "1234", true),
("uint64", "1234", true),
("number", "1234", true),
("number", "-1234", true),
("number", "1_234", true),
Expand Down
13 changes: 13 additions & 0 deletions crates/models/src/captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ pub struct CaptureBinding {
// target, we should turn this into a Target enum as has already been done
// with Source (used by Materialization & Derive).
pub target: Collection,
/// # Backfill counter for this binding.
/// Every increment of this counter will result in a new backfill of this
/// binding from the captured endpoint. For example when capturing from a
/// SQL table, incrementing this counter will cause the table to be
/// re-captured in its entirety from the source database.
///
/// Note that a backfill does *not* truncate the target collection,
/// and documents published by a backfilled binding will coexist with
/// (and be ordered after) any documents which were published as part
/// of a preceding backfill.
#[serde(default, skip_serializing_if = "super::is_u32_zero")]
pub backfill: u32,
}

impl CaptureDef {
Expand Down Expand Up @@ -111,6 +123,7 @@ impl CaptureBinding {
resource: serde_json::from_value(json!({"stream": "a_stream"})).unwrap(),
disable: false,
target: Collection::new("target/collection"),
backfill: 0,
}
}
}
12 changes: 11 additions & 1 deletion crates/models/src/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,21 @@ pub struct TransformDef {
/// or as a relative URL to a file containing the lambda.
#[serde(default, skip_serializing_if = "RawValue::is_null")]
pub lambda: RawValue,

/// # Whether to disable this transform.
/// Disabled transforms are completely ignored at runtime and are not validated.
#[serde(default, skip_serializing_if = "super::is_false")]
pub disable: bool,
/// # Backfill counter for this transform.
/// Every increment of this counter will result in a new backfill of this
/// transform. Specifically, the transform's lambda will be re-invoked for
/// every applicable document of its source collection.
///
/// Note that a backfill does *not* truncate the derived collection,
/// and documents published by a backfilled transform will coexist with
/// (and be ordered after) any documents which were published as part
/// of a preceding backfill.
#[serde(default, skip_serializing_if = "super::is_u32_zero")]
pub backfill: u32,
}

/// A Shuffle specifies how a shuffling key is to be extracted from
Expand Down
4 changes: 4 additions & 0 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ fn option_datetime_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::s
fn is_false(b: &bool) -> bool {
!*b
}

fn is_u32_zero(u: &u32) -> bool {
*u == 0
}
9 changes: 9 additions & 0 deletions crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ pub struct MaterializationBinding {
/// # Selected projections for this materialization.
#[serde(default)]
pub fields: MaterializationFields,
/// # Backfill counter for this binding.
/// Every increment of this counter will result in a new backfill of this
/// binding from its source collection to its materialized resource.
/// For example when materializing to a SQL table, incrementing this counter
/// causes the table to be dropped and then rebuilt by re-reading the source
/// collection.
#[serde(default, skip_serializing_if = "super::is_u32_zero")]
pub backfill: u32,
}

/// MaterializationFields defines a selection of projections to materialize,
Expand Down Expand Up @@ -104,6 +112,7 @@ impl MaterializationBinding {
disable: false,
priority: 0,
fields: MaterializationFields::default(),
backfill: 0,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/proto-flow/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub mod request {
pub collection: ::core::option::Option<
super::super::super::flow::CollectionSpec,
>,
/// Backfill counter for this binding.
#[prost(uint32, tag = "3")]
pub backfill: u32,
}
}
/// Apply a capture configuration and bindings to its endpoint.
Expand Down
19 changes: 19 additions & 0 deletions crates/proto-flow/src/capture.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,13 +940,19 @@ impl serde::Serialize for request::validate::Binding {
if self.collection.is_some() {
len += 1;
}
if self.backfill != 0 {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("capture.Request.Validate.Binding", len)?;
if !self.resource_config_json.is_empty() {
struct_ser.serialize_field("resourceConfig", crate::as_raw_json(&self.resource_config_json)?)?;
}
if let Some(v) = self.collection.as_ref() {
struct_ser.serialize_field("collection", v)?;
}
if self.backfill != 0 {
struct_ser.serialize_field("backfill", &self.backfill)?;
}
struct_ser.end()
}
}
Expand All @@ -960,12 +966,14 @@ impl<'de> serde::Deserialize<'de> for request::validate::Binding {
"resource_config_json",
"resourceConfig",
"collection",
"backfill",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
ResourceConfigJson,
Collection,
Backfill,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand All @@ -989,6 +997,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Binding {
match value {
"resourceConfig" | "resource_config_json" => Ok(GeneratedField::ResourceConfigJson),
"collection" => Ok(GeneratedField::Collection),
"backfill" => Ok(GeneratedField::Backfill),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -1010,6 +1019,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Binding {
{
let mut resource_config_json__ : Option<Box<serde_json::value::RawValue>> = None;
let mut collection__ = None;
let mut backfill__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::ResourceConfigJson => {
Expand All @@ -1024,11 +1034,20 @@ impl<'de> serde::Deserialize<'de> for request::validate::Binding {
}
collection__ = map.next_value()?;
}
GeneratedField::Backfill => {
if backfill__.is_some() {
return Err(serde::de::Error::duplicate_field("backfill"));
}
backfill__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
}
}
Ok(request::validate::Binding {
resource_config_json: resource_config_json__.map(|r| Box::<str>::from(r).into()).unwrap_or_default(),
collection: collection__,
backfill: backfill__.unwrap_or_default(),
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/proto-flow/src/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ pub mod request {
/// JSON-encoded object which specifies the lambda configuration.
#[prost(string, tag = "4")]
pub lambda_config_json: ::prost::alloc::string::String,
/// Backfill counter for this transform.
#[prost(uint32, tag = "5")]
pub backfill: u32,
}
}
/// Open a derivation stream.
Expand Down
19 changes: 19 additions & 0 deletions crates/proto-flow/src/derive.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,9 @@ impl serde::Serialize for request::validate::Transform {
if !self.lambda_config_json.is_empty() {
len += 1;
}
if self.backfill != 0 {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("derive.Request.Validate.Transform", len)?;
if !self.name.is_empty() {
struct_ser.serialize_field("name", &self.name)?;
Expand All @@ -1215,6 +1218,9 @@ impl serde::Serialize for request::validate::Transform {
if !self.lambda_config_json.is_empty() {
struct_ser.serialize_field("lambdaConfig", crate::as_raw_json(&self.lambda_config_json)?)?;
}
if self.backfill != 0 {
struct_ser.serialize_field("backfill", &self.backfill)?;
}
struct_ser.end()
}
}
Expand All @@ -1231,6 +1237,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Transform {
"shuffleLambdaConfig",
"lambda_config_json",
"lambdaConfig",
"backfill",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -1239,6 +1246,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Transform {
Collection,
ShuffleLambdaConfigJson,
LambdaConfigJson,
Backfill,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand All @@ -1264,6 +1272,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Transform {
"collection" => Ok(GeneratedField::Collection),
"shuffleLambdaConfig" | "shuffle_lambda_config_json" => Ok(GeneratedField::ShuffleLambdaConfigJson),
"lambdaConfig" | "lambda_config_json" => Ok(GeneratedField::LambdaConfigJson),
"backfill" => Ok(GeneratedField::Backfill),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -1287,6 +1296,7 @@ impl<'de> serde::Deserialize<'de> for request::validate::Transform {
let mut collection__ = None;
let mut shuffle_lambda_config_json__ : Option<Box<serde_json::value::RawValue>> = None;
let mut lambda_config_json__ : Option<Box<serde_json::value::RawValue>> = None;
let mut backfill__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Name => {
Expand All @@ -1313,13 +1323,22 @@ impl<'de> serde::Deserialize<'de> for request::validate::Transform {
}
lambda_config_json__ = Some(map.next_value()?);
}
GeneratedField::Backfill => {
if backfill__.is_some() {
return Err(serde::de::Error::duplicate_field("backfill"));
}
backfill__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
}
}
Ok(request::validate::Transform {
name: name__.unwrap_or_default(),
collection: collection__,
shuffle_lambda_config_json: shuffle_lambda_config_json__.map(|r| Box::<str>::from(r).into()).unwrap_or_default(),
lambda_config_json: lambda_config_json__.map(|r| Box::<str>::from(r).into()).unwrap_or_default(),
backfill: backfill__.unwrap_or_default(),
})
}
}
Expand Down
Loading

0 comments on commit f830670

Please sign in to comment.