diff --git a/crates/agent-sql/src/connector_tags.rs b/crates/agent-sql/src/connector_tags.rs index f14a66fbaa..7cb414175d 100644 --- a/crates/agent-sql/src/connector_tags.rs +++ b/crates/agent-sql/src/connector_tags.rs @@ -133,6 +133,7 @@ pub async fn update_tag_fields( endpoint_spec_schema: Box, protocol: String, resource_spec_schema: Box, + resource_path_pointers: Vec, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> sqlx::Result<()> { sqlx::query!( @@ -140,7 +141,8 @@ pub async fn update_tag_fields( documentation_url = $2, endpoint_spec_schema = $3, protocol = $4, - resource_spec_schema = $5 + resource_spec_schema = $5, + resource_path_pointers = $6 where id = $1 returning 1 as "must_exist"; "#, @@ -149,9 +151,35 @@ pub async fn update_tag_fields( Json(endpoint_spec_schema) as Json>, protocol, Json(resource_spec_schema) as Json>, + resource_path_pointers as Vec, ) .fetch_one(&mut *txn) .await?; Ok(()) } + +/// Returns the `resource_path_pointers` for the given image and tag. Returns +/// `None` if there are no matching rows, or if the `resource_path_pointers` +/// column value is null. +pub async fn fetch_resource_path_pointers( + image_name: &str, + image_tag: &str, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> sqlx::Result> { + let row = sqlx::query!( + r#" + select ct.resource_path_pointers as "pointers: Vec" + from connectors c + join connector_tags ct on c.id = ct.connector_id + where c.image_name = $1 + and ct.image_tag = $2 + "#, + image_name, + image_tag + ) + .fetch_optional(txn) + .await?; + + Ok(row.and_then(|r| r.pointers).unwrap_or_default()) +} diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index b57b1c2a3f..880b547fe5 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -1,6 +1,10 @@ +use std::time::Duration; + use super::{jobs, logs, Handler, HandlerStatus, Id}; use agent_sql::connector_tags::Row; use anyhow::Context; +use proto_flow::flow; +use runtime::{LogHandler, Runtime, RuntimeProtocol}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use tracing::info; @@ -19,15 +23,13 @@ pub enum JobStatus { /// A TagHandler is a Handler which evaluates tagged connector images. pub struct TagHandler { connector_network: String, - bindir: String, logs_tx: logs::Tx, } impl TagHandler { - pub fn new(connector_network: &str, bindir: &str, logs_tx: &logs::Tx) -> Self { + pub fn new(connector_network: &str, logs_tx: &logs::Tx) -> Self { Self { connector_network: connector_network.to_string(), - bindir: bindir.to_string(), logs_tx: logs_tx.clone(), } } @@ -96,57 +98,171 @@ impl TagHandler { } } - // Fetch its connector specification. - let spec = jobs::run_with_output( - "spec", - &self.logs_tx, - row.logs_token, - async_process::Command::new(format!("{}/flowctl-go", &self.bindir)) - .arg("api") - .arg("spec") - .arg("--image") - .arg(&image_composed) - .arg("--network") - .arg(&self.connector_network), - ) - .await?; + let proto_type = match runtime::flow_runtime_protocol(&image_composed).await { + Ok(ct) => ct, + Err(err) => { + tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); + return Ok((row.tag_id, JobStatus::SpecFailed)); + } + }; + let log_handler = + logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token); + let runtime = Runtime::new( + false, // Don't allow local + self.connector_network.clone(), + log_handler, + None, // no need to change log level + "ops/connector-tags-job".to_string(), + ); - if !spec.0.success() { - return Ok((row.tag_id, JobStatus::SpecFailed)); - } + let spec_result = match proto_type { + RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, + RuntimeProtocol::Materialization => { + spec_materialization(&image_composed, runtime).await + } + }; - /// Spec is the output shape of the `flowctl api spec` command. - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct Spec { - protocol: String, - documentation_url: String, - config_schema: Box, - resource_config_schema: Box, - oauth2: Option>, - } - let Spec { - documentation_url, - config_schema, - protocol, + let spec = match spec_result { + Ok(s) => s, + Err(err) => { + tracing::warn!(error = ?err, image = %image_composed, "connector Spec RPC failed"); + return Ok((row.tag_id, JobStatus::SpecFailed)); + } + }; + + let ConnectorSpec { + endpoint_config_schema, resource_config_schema, + documentation_url, oauth2, - } = serde_json::from_slice(&spec.1).context("parsing connector spec output")?; + resource_path_pointers, + } = spec; + + if proto_type == RuntimeProtocol::Capture && resource_path_pointers.is_empty() { + tracing::warn!(image = %image_composed, "capture connector spec omits resource_path_pointers"); + } agent_sql::connector_tags::update_tag_fields( row.tag_id, documentation_url, - config_schema, - protocol, - resource_config_schema, + endpoint_config_schema.into(), + proto_type.to_string(), + resource_config_schema.into(), + resource_path_pointers, txn, ) .await?; if let Some(oauth2) = oauth2 { - agent_sql::connector_tags::update_oauth2_spec(row.connector_id, oauth2, txn).await?; + agent_sql::connector_tags::update_oauth2_spec(row.connector_id, oauth2.into(), txn) + .await?; } return Ok((row.tag_id, JobStatus::Success)); } } + +struct ConnectorSpec { + documentation_url: String, + endpoint_config_schema: Box, + resource_config_schema: Box, + resource_path_pointers: Vec, + oauth2: Option>, +} + +async fn spec_materialization( + image: &str, + runtime: Runtime, +) -> anyhow::Result { + use proto_flow::materialize; + + let req = materialize::Request { + spec: Some(materialize::request::Spec { + connector_type: flow::materialization_spec::ConnectorType::Image as i32, + config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) + .unwrap(), + }), + ..Default::default() + }; + + let spec = runtime + .unary_materialize(req, SPEC_TIMEOUT) + .await? + .spec + .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; + + let materialize::response::Spec { + protocol: _, + config_schema_json, + resource_config_schema_json, + documentation_url, + oauth2, + } = spec; + + let oauth = if let Some(oa) = oauth2 { + Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config")) + } else { + None + }; + Ok(ConnectorSpec { + documentation_url, + endpoint_config_schema: RawValue::from_string(config_schema_json) + .context("parsing endpoint config schema")?, + resource_config_schema: RawValue::from_string(resource_config_schema_json) + .context("parsing resource config schema")?, + + // materialization connectors don't currently specify resrouce_path_pointers, though perhaps they should + resource_path_pointers: Vec::new(), + oauth2: oauth, + }) +} + +async fn spec_capture( + image: &str, + runtime: Runtime, +) -> anyhow::Result { + use proto_flow::capture; + let req = capture::Request { + spec: Some(capture::request::Spec { + connector_type: flow::capture_spec::ConnectorType::Image as i32, + config_json: serde_json::to_string(&serde_json::json!({"image": image, "config": {}})) + .unwrap(), + }), + ..Default::default() + }; + + let spec = runtime + .unary_capture(req, SPEC_TIMEOUT) + .await? + .spec + .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; + let capture::response::Spec { + // protocol here is the numeric version of the capture protocol + protocol: _, + config_schema_json, + resource_config_schema_json, + documentation_url, + oauth2, + resource_path_pointers, + } = spec; + + let oauth = if let Some(oa) = oauth2 { + Some( + RawValue::from_string(serde_json::to_string(&oa).expect("can serialize oauth2 config")) + .expect("serialization of oauth2 config cannot fail"), + ) + } else { + None + }; + Ok(ConnectorSpec { + documentation_url, + endpoint_config_schema: RawValue::from_string(config_schema_json) + .context("parsing endpoint config schema")?, + resource_config_schema: RawValue::from_string(resource_config_schema_json) + .context("parsing resource config schema")?, + resource_path_pointers, + oauth2: oauth, + }) +} + +const SPEC_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/crates/agent/src/discovers.rs b/crates/agent/src/discovers.rs index 88c8692e92..c649c34dc6 100644 --- a/crates/agent/src/discovers.rs +++ b/crates/agent/src/discovers.rs @@ -272,6 +272,16 @@ impl DiscoverHandler { return Ok(Err(errors)); } + // TODO: As of 2023-11, resource_path_pointers are allowed to be empty. + // `merge_capture` will just log a warning if they are. But we plan to + // soon require that they are never empty. + let resource_path_pointers = + agent_sql::connector_tags::fetch_resource_path_pointers(image_name, image_tag, txn) + .await?; + if resource_path_pointers.is_empty() { + tracing::warn!(%image_name, %image_tag, %capture_name, "merging bindings using legacy behavior because resource_path_pointers are missing"); + } + // Deeply merge the capture and its bindings. let capture_name = models::Capture::new(capture_name); let (capture, discovered_bindings) = specs::merge_capture( @@ -280,6 +290,7 @@ impl DiscoverHandler { discovered_bindings, catalog.captures.remove(&capture_name), update_only, + &resource_path_pointers, ); let targets = capture .bindings diff --git a/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__capture_merge_resource_paths_update.snap b/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__capture_merge_resource_paths_update.snap new file mode 100644 index 0000000000..95aa1783ab --- /dev/null +++ b/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__capture_merge_resource_paths_update.snap @@ -0,0 +1,76 @@ +--- +source: crates/agent/src/discovers/specs.rs +expression: json!(out) +--- +[ + { + "bindings": [ + { + "resource": { + "modified": 1, + "stream": "foo" + }, + "target": "acmeCo/renamed" + }, + { + "resource": { + "modified": 2, + "namespace": "spacename", + "stream": "foo" + }, + "target": "acmeCo/renamed-namepaced" + }, + { + "disable": true, + "resource": { + "modified": "yup", + "stream": "disabled" + }, + "target": "test/collection/disabled" + } + ], + "endpoint": { + "connector": { + "config": { + "discovered": 1 + }, + "image": "new/image" + } + }, + "interval": "34s", + "shards": { + "maxTxnDuration": "12s" + } + }, + [ + { + "documentSchema": { + "const": "discovered" + }, + "recommendedName": "suggested", + "resourceConfig": { + "modified": 0, + "stream": "foo" + } + }, + { + "documentSchema": { + "const": "discovered-namepaced" + }, + "recommendedName": "suggested2", + "resourceConfig": { + "modified": 0, + "namespace": "spacename", + "stream": "foo" + } + }, + { + "documentSchema": false, + "recommendedName": "other", + "resourceConfig": { + "modified": 0, + "stream": "disabled" + } + } + ] +] diff --git a/crates/agent/src/discovers/specs.rs b/crates/agent/src/discovers/specs.rs index 8e3aec4f35..18df28fbcd 100644 --- a/crates/agent/src/discovers/specs.rs +++ b/crates/agent/src/discovers/specs.rs @@ -1,5 +1,5 @@ use proto_flow::capture::{response::discovered::Binding, response::Discovered}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; pub fn parse_response( endpoint_config: &serde_json::value::RawValue, @@ -33,12 +33,51 @@ pub fn parse_response( )) } +type ResourcePath = Vec; + +/// Extracts the value of each of the given `resource_path_pointers` and encodes +/// them into a `ResourcePath`. Each pointer always gets mapped to a string +/// value, with the string `undefined` being used to represent a missing value. +/// All other values are encoded as JSON strings. This allows the `ResourcePath` +/// to be used as a key in hashmaps, which would not be allowed if we used an +/// array of `serde_json::Value`. +fn resource_path( + resource_path_pointers: &[doc::Pointer], + resource: &serde_json::Value, +) -> ResourcePath { + resource_path_pointers + .iter() + .map(|pointer| match pointer.query(resource) { + Some(j) => { + serde_json::to_string(j).expect("serializing resource path component cannot fail") + } + None => String::from("undefined"), + }) + .collect() +} + +fn index_fetched_bindings<'a>( + resource_path_pointers: &'_ [doc::Pointer], + bindings: &'a [models::CaptureBinding], +) -> HashMap { + bindings + .iter() + .map(|binding| { + let resource = serde_json::from_str(binding.resource.get()) + .expect("parsing resource config json cannot fail"); + let rp = resource_path(resource_path_pointers, &resource); + (rp, binding) + }) + .collect() +} + pub fn merge_capture( capture_name: &str, endpoint: models::CaptureEndpoint, discovered_bindings: Vec, fetched_capture: Option, update_only: bool, + resource_path_pointers: &[String], ) -> (models::CaptureDef, Vec) { let capture_prefix = capture_name.rsplit_once("/").unwrap().0; @@ -62,6 +101,17 @@ pub fn merge_capture( ), }; + let pointers = resource_path_pointers + .iter() + .map(|p| doc::Pointer::from_str(p.as_str())) + .collect::>(); + + let fetched_bindings_by_path = if !pointers.is_empty() { + index_fetched_bindings(&pointers, &fetched_bindings) + } else { + Default::default() + }; + let mut capture_bindings = Vec::new(); let mut filtered_bindings = Vec::new(); @@ -78,12 +128,25 @@ pub fn merge_capture( // spec is a strict subset of the fetched resource spec. In other // words the fetched resource spec may have _extra_ locations, // but all locations of the discovered resource spec must be equal. - let fetched_binding = fetched_bindings - .iter() - .filter(|fetched| { - doc::diff(Some(&serde_json::json!(&fetched.resource)), Some(&resource)).is_empty() - }) - .next(); + let fetched_binding = if resource_path_pointers.is_empty() { + // TODO(phil): Legacy matching behavior, to be removed + fetched_bindings + .iter() + .filter(|fetched| { + doc::diff(Some(&serde_json::json!(&fetched.resource)), Some(&resource)) + .is_empty() + }) + .next() + } else { + // New matching behavior + let discovered_resource = serde_json::from_str(&resource_config_json) + .expect("resource config must be valid json"); + let discovered_resource_path = resource_path(&pointers, &discovered_resource); + + fetched_bindings_by_path + .get(&discovered_resource_path) + .map(|b| *b) + }; if let Some(fetched_binding) = fetched_binding { // Preserve the fetched version of a matched CaptureBinding. @@ -194,6 +257,7 @@ fn normalize_recommended_name(name: &str) -> String { #[cfg(test)] mod tests { use super::{normalize_recommended_name, BTreeMap, Binding}; + use proto_flow::capture::response::discovered; use serde_json::json; #[test] @@ -309,10 +373,64 @@ mod tests { insta::assert_display_snapshot!(serde_json::to_string_pretty(&out).unwrap()); } + #[test] + fn test_capture_merge_resource_paths_update() { + // This is meant to test our merge behavior in the presense of additional fields in the + // `resource` that are not part of the resource path. + // Fixture is an update of an existing capture, which uses a non-suggested collection name. + // There is also a disabled binding, which is expected to remain disabled after the merge. + // Additional discovered bindings are filtered. + // Note that fields apart from stream and namespace are modified to demonstrate them being + // ignored for the purposes of matching up discovered and live bindings (since it's done + // by resource_path_pointers now) + let (discovered_endpoint, discovered_bindings, fetched_capture) = + serde_json::from_value::<(models::CaptureEndpoint, Vec, Option)>(json!([ + { "connector": { "config": { "discovered": 1 }, "image": "new/image" } }, + [ + { "recommendedName": "suggested", "resourceConfig": { "stream": "foo", "modified": 0 }, "documentSchema": { "const": "discovered" } }, + { "recommendedName": "suggested2", "resourceConfig": { "stream": "foo", "namespace": "spacename", "modified": 0 }, "documentSchema": { "const": "discovered-namepaced" } }, + { "recommendedName": "other", "resourceConfig": { "stream": "bar", "modified": 0 }, "documentSchema": false }, + { "recommendedName": "other", "resourceConfig": { "stream": "disabled", "modified": 0 }, "documentSchema": false }, + ], + { + "bindings": [ + { "resource": { "stream": "foo", "modified": 1 }, "target": "acmeCo/renamed" }, + { "resource": { "stream": "foo", "namespace": "spacename", "modified": 2 }, "target": "acmeCo/renamed-namepaced" }, + { "resource": { "stream": "removed" }, "target": "acmeCo/discarded" }, + { "resource": { "stream": "disabled", "modified": "yup" }, "disable": true, "target": "test/collection/disabled" }, + ], + "endpoint": { "connector": { "config": { "fetched": 1 }, "image": "old/image" } }, + // Extra fields which are passed-through. + "interval": "34s", + "shards": { + "maxTxnDuration": "12s" + }, + }, + ])) + .unwrap(); + + let out = super::merge_capture( + "acmeCo/my-capture", + discovered_endpoint.clone(), + discovered_bindings.clone(), + fetched_capture.clone(), + true, + &["/stream".to_string(), "/namespace".to_string()], + ); + + // Expect we: + // * Preserved the modified binding configuration. + // * Dropped the removed binding. + // * Updated the endpoint configuration. + // * Preserved unrelated fields of the capture (shard template and interval). + // * The resources that specify a namespace are treated separately + insta::assert_json_snapshot!(json!(out)); + } + #[test] fn test_capture_merge_create() { let (discovered_endpoint, discovered_bindings) = - serde_json::from_value(json!([ + serde_json::from_value::<(models::CaptureEndpoint, Vec)>(json!([ { "connector": { "config": { "discovered": 1 }, "image": "new/image" } }, [ { "recommendedName": "foo", "resourceConfig": { "stream": "foo" }, "key": ["/foo-key"], "documentSchema": { "const": "foo" } }, @@ -322,14 +440,31 @@ mod tests { .unwrap(); let out = super::merge_capture( + "acmeCo/my/capture", + discovered_endpoint.clone(), + discovered_bindings.clone(), + None, + false, + &[], + ); + + insta::assert_json_snapshot!(json!(out)); + // assert that the results of the merge are unchanged when using a valid + // slice of resource path pointers. + let path_merge_out = super::merge_capture( "acmeCo/my/capture", discovered_endpoint, discovered_bindings, None, false, + &["/stream".to_string()], ); - insta::assert_json_snapshot!(json!(out)); + assert_eq!( + json!(out), + json!(path_merge_out), + "resource_path_pointers merge output was different" + ); } #[test] @@ -338,7 +473,7 @@ mod tests { // There is also a disabled binding, which is expected to remain disabled after the merge. // Additional discovered bindings are filtered. let (discovered_endpoint, discovered_bindings, fetched_capture) = - serde_json::from_value(json!([ + serde_json::from_value::<(models::CaptureEndpoint, Vec, Option)>(json!([ { "connector": { "config": { "discovered": 1 }, "image": "new/image" } }, [ { "recommendedName": "suggested", "resourceConfig": { "stream": "foo" }, "documentSchema": { "const": "discovered" } }, @@ -363,10 +498,11 @@ mod tests { let out = super::merge_capture( "acmeCo/my-capture", - discovered_endpoint, - discovered_bindings, - fetched_capture, + discovered_endpoint.clone(), + discovered_bindings.clone(), + fetched_capture.clone(), true, + &[], ); // Expect we: @@ -375,13 +511,29 @@ mod tests { // * Updated the endpoint configuration. // * Preserved unrelated fields of the capture (shard template and interval). insta::assert_json_snapshot!(json!(out)); + // assert that the results of the merge are unchanged when using a valid + // slice of resource path pointers. + let path_merge_out = super::merge_capture( + "acmeCo/my-capture", + discovered_endpoint, + discovered_bindings, + fetched_capture, + true, + &["/stream".to_string()], + ); + + assert_eq!( + json!(out), + json!(path_merge_out), + "resource_path_pointers merge output was different" + ); } #[test] fn test_capture_merge_upsert() { // Fixture is an upsert of an existing capture which uses a non-suggested collection name. let (discovered_endpoint, discovered_bindings, fetched_capture) = - serde_json::from_value(json!([ + serde_json::from_value::<(models::CaptureEndpoint, Vec, Option)>(json!([ { "connector": { "config": { "discovered": 1 }, "image": "new/image" } }, [ { "recommendedName": "foo", "resourceConfig": { "stream": "foo" }, "documentSchema": { "const": 1 }, "disable": true }, @@ -400,10 +552,11 @@ mod tests { let out = super::merge_capture( "acmeCo/my-capture", - discovered_endpoint, - discovered_bindings, - fetched_capture, + discovered_endpoint.clone(), + discovered_bindings.clone(), + fetched_capture.clone(), false, + &[], ); // Expect we: @@ -411,6 +564,23 @@ mod tests { // * Added the new binding. // * Updated the endpoint configuration. insta::assert_json_snapshot!(json!(out)); + + // assert that the results of the merge are unchanged when using a valid + // slice of resource path pointers. + let path_merge_out = super::merge_capture( + "acmeCo/my-capture", + discovered_endpoint, + discovered_bindings, + fetched_capture, + false, + &["/stream".to_string()], + ); + + assert_eq!( + json!(out), + json!(path_merge_out), + "resource_path_pointers merge output was different" + ); } #[test] diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index 3b6953d20b..1ef1ec5085 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -108,11 +108,7 @@ async fn main() -> Result<(), anyhow::Error> { &logs_tx, Some(&pg_pool), )), - Box::new(agent::TagHandler::new( - &args.connector_network, - &bindir, - &logs_tx, - )), + Box::new(agent::TagHandler::new(&args.connector_network, &logs_tx)), Box::new(agent::DiscoverHandler::new( &args.connector_network, &bindir, diff --git a/crates/flowctl/src/raw/spec.rs b/crates/flowctl/src/raw/spec.rs index 11f4919246..24d63c43aa 100644 --- a/crates/flowctl/src/raw/spec.rs +++ b/crates/flowctl/src/raw/spec.rs @@ -73,7 +73,7 @@ pub async fn do_spec( } let spec_response = runtime::Runtime::new( - true, // All local. + true, // Allow local. network.clone(), ops::tracing_log_handler, None, diff --git a/crates/proto-flow/src/capture.rs b/crates/proto-flow/src/capture.rs index 20a22fe794..b333206202 100644 --- a/crates/proto-flow/src/capture.rs +++ b/crates/proto-flow/src/capture.rs @@ -188,6 +188,18 @@ pub mod response { /// Optional OAuth2 configuration. #[prost(message, optional, tag = "5")] pub oauth2: ::core::option::Option, + /// One or more JSON pointers, which are used to extract the `resource_path` + /// from a given `resource` of this connector. For example, a database + /// capture connector might have a `resource` that's represented like: + /// `{"schema": "foo", "table": "bar", "otherConfig": true}`. In that case + /// it could use `resource_path_pointers: ["/schema", "/table"]`, which + /// would result in a `resource_path` of `["foo", "bar"]`. This allows + /// `otherConfig` to be changed by the user without impacting the identity of + /// the resource. + #[prost(string, repeated, tag = "6")] + pub resource_path_pointers: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, } /// Discovered responds to Request.Discover. #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/crates/proto-flow/src/capture.serde.rs b/crates/proto-flow/src/capture.serde.rs index 466059a023..1d2c1ae3b8 100644 --- a/crates/proto-flow/src/capture.serde.rs +++ b/crates/proto-flow/src/capture.serde.rs @@ -1928,6 +1928,9 @@ impl serde::Serialize for response::Spec { if self.oauth2.is_some() { len += 1; } + if !self.resource_path_pointers.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("capture.Response.Spec", len)?; if self.protocol != 0 { struct_ser.serialize_field("protocol", &self.protocol)?; @@ -1944,6 +1947,9 @@ impl serde::Serialize for response::Spec { if let Some(v) = self.oauth2.as_ref() { struct_ser.serialize_field("oauth2", v)?; } + if !self.resource_path_pointers.is_empty() { + struct_ser.serialize_field("resourcePathPointers", &self.resource_path_pointers)?; + } struct_ser.end() } } @@ -1962,6 +1968,8 @@ impl<'de> serde::Deserialize<'de> for response::Spec { "documentation_url", "documentationUrl", "oauth2", + "resource_path_pointers", + "resourcePathPointers", ]; #[allow(clippy::enum_variant_names)] @@ -1971,6 +1979,7 @@ impl<'de> serde::Deserialize<'de> for response::Spec { ResourceConfigSchemaJson, DocumentationUrl, Oauth2, + ResourcePathPointers, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1997,6 +2006,7 @@ impl<'de> serde::Deserialize<'de> for response::Spec { "resourceConfigSchema" | "resource_config_schema_json" => Ok(GeneratedField::ResourceConfigSchemaJson), "documentationUrl" | "documentation_url" => Ok(GeneratedField::DocumentationUrl), "oauth2" => Ok(GeneratedField::Oauth2), + "resourcePathPointers" | "resource_path_pointers" => Ok(GeneratedField::ResourcePathPointers), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -2021,6 +2031,7 @@ impl<'de> serde::Deserialize<'de> for response::Spec { let mut resource_config_schema_json__ : Option> = None; let mut documentation_url__ = None; let mut oauth2__ = None; + let mut resource_path_pointers__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::Protocol => { @@ -2055,6 +2066,12 @@ impl<'de> serde::Deserialize<'de> for response::Spec { } oauth2__ = map.next_value()?; } + GeneratedField::ResourcePathPointers => { + if resource_path_pointers__.is_some() { + return Err(serde::de::Error::duplicate_field("resourcePathPointers")); + } + resource_path_pointers__ = Some(map.next_value()?); + } } } Ok(response::Spec { @@ -2063,6 +2080,7 @@ impl<'de> serde::Deserialize<'de> for response::Spec { resource_config_schema_json: resource_config_schema_json__.map(|r| Box::::from(r).into()).unwrap_or_default(), documentation_url: documentation_url__.unwrap_or_default(), oauth2: oauth2__, + resource_path_pointers: resource_path_pointers__.unwrap_or_default(), }) } } diff --git a/crates/proto-flow/tests/regression.rs b/crates/proto-flow/tests/regression.rs index 715645ecae..127dd55129 100644 --- a/crates/proto-flow/tests/regression.rs +++ b/crates/proto-flow/tests/regression.rs @@ -395,6 +395,7 @@ fn ex_capture_response() -> capture::Response { resource_config_schema_json: json!({"resource": "schema"}).to_string(), documentation_url: "https://example/docs".to_string(), oauth2: Some(ex_oauth2()), + resource_path_pointers: vec!["/stream".to_string()], }), discovered: Some(capture::response::Discovered { bindings: vec![capture::response::discovered::Binding { diff --git a/crates/proto-flow/tests/snapshots/regression__capture_response_json.snap b/crates/proto-flow/tests/snapshots/regression__capture_response_json.snap index 0fb6a5f70e..3d6c6ebf8d 100644 --- a/crates/proto-flow/tests/snapshots/regression__capture_response_json.snap +++ b/crates/proto-flow/tests/snapshots/regression__capture_response_json.snap @@ -31,7 +31,10 @@ expression: json_test(msg) "refreshTokenResponseMap": { "access": "here" } - } + }, + "resourcePathPointers": [ + "/stream" + ] }, "discovered": { "bindings": [ diff --git a/crates/proto-flow/tests/snapshots/regression__capture_response_proto.snap b/crates/proto-flow/tests/snapshots/regression__capture_response_proto.snap index 427c82e041..da6cec88ad 100644 --- a/crates/proto-flow/tests/snapshots/regression__capture_response_proto.snap +++ b/crates/proto-flow/tests/snapshots/regression__capture_response_proto.snap @@ -2,7 +2,7 @@ source: crates/proto-flow/tests/regression.rs expression: proto_test(msg) --- -|0ab00208 d787b901 12137b22 636f6e66| ..........{"conf 00000000 +|0ab90208 d787b901 12137b22 636f6e66| ..........{"conf 00000000 |6967223a 22736368 656d6122 7d1a157b| ig":"schema"}..{ 00000010 |22726573 6f757263 65223a22 73636865| "resource":"sche 00000020 |6d61227d 22146874 7470733a 2f2f6578| ma"}".https://ex 00000030 @@ -21,16 +21,16 @@ expression: proto_test(msg) |65651211 7b227265 66726573 68223a22| ee..{"refresh":" 00000100 |68647222 7d52100a 06616363 65737312| hdr"}R...access. 00000110 |06226865 7265225a 04504f53 54620450| ."here"Z.POSTb.P 00000120 -|4f535412 4b0a490a 10726563 6f6d6d65| OST.K.I..recomme 00000130 -|6e646564 206e616d 6512117b 22726573| nded name..{"res 00000140 -|6f757263 65223a31 3233347d 1a107b22| ource":1234}..{" 00000150 -|646f6322 3a227363 68656d61 227d2208| doc":"schema"}". 00000160 -|2f6b6579 2f707472 28013204 31323334| /key/ptr(.2.1234 00000170 -|1a0e0a0c 0a04736f 6d650a04 70617468| ......some..path 00000180 -|22120a10 49206469 6420736f 6d652073| "...I did some s 00000190 -|74756666 2a020801 32160802 12127b22| tuff*...2.....{" 000001a0 -|63617074 75726564 223a2264 6f63227d| captured":"doc"} 000001b0 -|3a180a16 0a127b22 73746174 65223a22| :.....{"state":" 000001c0 -|75706461 7465227d 1001a206 06120248| update"}.......H 000001d0 -|691801| i.. 000001e0 - 000001e3 +|4f535432 072f7374 7265616d 124b0a49| OST2./stream.K.I 00000130 +|0a107265 636f6d6d 656e6465 64206e61| ..recommended na 00000140 +|6d651211 7b227265 736f7572 6365223a| me..{"resource": 00000150 +|31323334 7d1a107b 22646f63 223a2273| 1234}..{"doc":"s 00000160 +|6368656d 61227d22 082f6b65 792f7074| chema"}"./key/pt 00000170 +|72280132 04313233 341a0e0a 0c0a0473| r(.2.1234......s 00000180 +|6f6d650a 04706174 6822120a 10492064| ome..path"...I d 00000190 +|69642073 6f6d6520 73747566 662a0208| id some stuff*.. 000001a0 +|01321608 0212127b 22636170 74757265| .2.....{"capture 000001b0 +|64223a22 646f6322 7d3a180a 160a127b| d":"doc"}:.....{ 000001c0 +|22737461 7465223a 22757064 61746522| "state":"update" 000001d0 +|7d1001a2 06061202 48691801| }.......Hi.. 000001e0 + 000001ec diff --git a/crates/runtime/src/container.rs b/crates/runtime/src/container.rs index 1287ebee25..90de78b5a1 100644 --- a/crates/runtime/src/container.rs +++ b/crates/runtime/src/container.rs @@ -1,3 +1,4 @@ +use crate::RuntimeProtocol; use anyhow::Context; use futures::channel::oneshot; use proto_flow::{flow, runtime}; @@ -11,6 +12,32 @@ use tokio::io::AsyncBufReadExt; // connectors. const CONNECTOR_INIT_PORT: u16 = 49092; +/// Determines the protocol of an image. If the image has a `FLOW_RUNTIME_PROTOCOL` label, +/// then it's value is used. Otherwise, this will apply a simple heuristic based on the image name, +/// for backward compatibility purposes. An error will be returned if it fails to inspect the image +/// or parse the label. +pub async fn flow_runtime_protocol(image: &str) -> anyhow::Result { + let inspect_output = docker_cmd(&["inspect", image]) + .await + .context("inspecting image")?; + let inspect_json: serde_json::Value = serde_json::from_slice(&inspect_output)?; + + if let Some(label) = inspect_json + .pointer("/Config/Labels/FLOW_RUNTIME_PROTOCOL") + .and_then(|v| v.as_str()) + { + RuntimeProtocol::try_from(label).map_err(|unknown| { + anyhow::anyhow!("image labels specify unknown protocol FLOW_RUNTIME_PROTOCOL={unknown}") + }) + } else { + if image.starts_with("ghcr.io/estuary/materialize-") { + Ok(RuntimeProtocol::Materialization) + } else { + Ok(RuntimeProtocol::Capture) + } + } +} + /// Start an image connector container, returning its description and a dialed tonic Channel. /// The container is attached to the given `network`, and its logs are dispatched to `log_handler`. /// `task_name` and `task_type` are used only to label the container. diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 5a64095dfe..c613dff48a 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,4 +1,5 @@ use futures::TryStreamExt; +use std::fmt::{self, Display}; use std::sync::Arc; mod capture; @@ -15,6 +16,7 @@ mod unary; mod unseal; pub mod uuid; +pub use container::flow_runtime_protocol; pub use task_service::TaskService; pub use tokio_context::TokioContext; @@ -29,6 +31,37 @@ pub const UUID_PLACEHOLDER: &str = "DocUUIDPlaceholder-329Bb50aa48EAa9ef"; /// documents without yielding, so it should not be *too* small. pub const CHANNEL_BUFFER: usize = 8; +/// Describes the basic type of runtime protocol. This corresponds to the +/// `FLOW_RUNTIME_PROTOCOL` label that's used on docker images. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum RuntimeProtocol { + Capture, + Materialization, + // Derivation, // eventually, maybe +} + +impl<'a> TryFrom<&'a str> for RuntimeProtocol { + type Error = &'a str; + + fn try_from(value: &'a str) -> Result { + match value { + "capture" => Ok(RuntimeProtocol::Capture), + "materialization" => Ok(RuntimeProtocol::Materialization), + other => Err(other), + } + } +} + +impl Display for RuntimeProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + RuntimeProtocol::Capture => "capture", + RuntimeProtocol::Materialization => "materialization", + }; + f.write_str(s) + } +} + fn anyhow_to_status(err: anyhow::Error) -> tonic::Status { tonic::Status::internal(format!("{err:?}")) } diff --git a/go/protocols/capture/capture.pb.go b/go/protocols/capture/capture.pb.go index cc7a0151ad..108be504b1 100644 --- a/go/protocols/capture/capture.pb.go +++ b/go/protocols/capture/capture.pb.go @@ -475,10 +475,19 @@ type Response_Spec struct { // URL for connector's documention. DocumentationUrl string `protobuf:"bytes,4,opt,name=documentation_url,json=documentationUrl,proto3" json:"documentation_url,omitempty"` // Optional OAuth2 configuration. - Oauth2 *flow.OAuth2 `protobuf:"bytes,5,opt,name=oauth2,proto3" json:"oauth2,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Oauth2 *flow.OAuth2 `protobuf:"bytes,5,opt,name=oauth2,proto3" json:"oauth2,omitempty"` + // One or more JSON pointers, which are used to extract the `resource_path` + // from a given `resource` of this connector. For example, a database + // capture connector might have a `resource` that's represented like: + // `{"schema": "foo", "table": "bar", "otherConfig": true}`. In that case + // it could use `resource_path_pointers: ["/schema", "/table"]`, which + // would result in a `resource_path` of `["foo", "bar"]`. This allows + // `otherConfig` to be changed by the user without impacting the identity of + // the resource. + ResourcePathPointers []string `protobuf:"bytes,6,rep,name=resource_path_pointers,json=resourcePathPointers,proto3" json:"resource_path_pointers,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Response_Spec) Reset() { *m = Response_Spec{} } @@ -908,77 +917,79 @@ func init() { } var fileDescriptor_841a70e6e6288f13 = []byte{ - // 1116 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xcb, 0x6e, 0x23, 0x45, - 0x17, 0x9e, 0xb6, 0x1d, 0xbb, 0x7d, 0x9c, 0x64, 0x92, 0x52, 0xfe, 0x9f, 0x4e, 0x27, 0x4a, 0x42, - 0x26, 0x48, 0x99, 0x0b, 0xf6, 0xe0, 0x68, 0x60, 0x86, 0xcb, 0x0c, 0xb9, 0x30, 0x12, 0x23, 0x41, - 0x50, 0x85, 0x8b, 0x60, 0x63, 0x75, 0xaa, 0x6a, 0xec, 0x26, 0xed, 0xae, 0xa6, 0xbb, 0x9d, 0x60, - 0x89, 0x17, 0x60, 0xc1, 0x82, 0x0d, 0x3b, 0x24, 0xf6, 0x88, 0xe7, 0x20, 0x4b, 0x9e, 0x20, 0x12, - 0xc3, 0x16, 0xf1, 0x00, 0xb3, 0x42, 0x75, 0x6b, 0x77, 0xd2, 0x71, 0x64, 0xd0, 0x2c, 0xd8, 0x24, - 0xae, 0x73, 0xbe, 0xaf, 0x2e, 0xdf, 0xa9, 0xf3, 0x55, 0xc3, 0x7a, 0x97, 0xb7, 0xa2, 0x98, 0xa7, - 0x9c, 0xf0, 0x20, 0x69, 0x11, 0x2f, 0x4a, 0x07, 0x31, 0x33, 0xff, 0x9b, 0x32, 0x83, 0x6a, 0x7a, - 0xe8, 0x2e, 0x9f, 0x03, 0x3f, 0x0d, 0xf8, 0x89, 0xfc, 0xa3, 0x60, 0xee, 0x42, 0x97, 0x77, 0xb9, - 0xfc, 0xd9, 0x12, 0xbf, 0x54, 0x74, 0xfd, 0x4f, 0x80, 0x1a, 0x66, 0x5f, 0x0d, 0x58, 0x92, 0xa2, - 0x9b, 0x50, 0x49, 0x22, 0x46, 0x1c, 0x6b, 0xcd, 0xda, 0x6c, 0xb4, 0xff, 0xd7, 0x34, 0xcb, 0xe8, - 0x7c, 0xf3, 0x20, 0x62, 0x04, 0x4b, 0x08, 0xba, 0x07, 0x36, 0xf5, 0x13, 0xc2, 0x8f, 0x59, 0xec, - 0x94, 0x24, 0x7c, 0xb1, 0x00, 0xdf, 0xd3, 0x00, 0x9c, 0x41, 0x05, 0xed, 0xd8, 0x0b, 0x7c, 0xea, - 0xa5, 0xcc, 0x29, 0x8f, 0xa1, 0x7d, 0xaa, 0x01, 0x38, 0x83, 0xa2, 0x3b, 0x30, 0xe5, 0x45, 0x51, - 0x30, 0x74, 0x2a, 0x92, 0xf3, 0xff, 0x02, 0x67, 0x5b, 0x64, 0xb1, 0x02, 0x89, 0x63, 0xf0, 0x88, - 0x85, 0xce, 0xd4, 0x98, 0x63, 0xec, 0x47, 0x2c, 0xc4, 0x12, 0x82, 0x1e, 0x42, 0xc3, 0x23, 0x47, - 0x21, 0x3f, 0x09, 0x18, 0xed, 0x32, 0xa7, 0x2a, 0x19, 0xcb, 0xc5, 0xe9, 0x47, 0x18, 0x9c, 0x27, - 0xa0, 0x25, 0xb0, 0xfd, 0x30, 0x65, 0x71, 0xe8, 0x05, 0x0e, 0x5d, 0xb3, 0x36, 0xa7, 0x71, 0x7d, - 0xc3, 0x04, 0xdc, 0x6f, 0x2d, 0xa8, 0x08, 0xc9, 0xd0, 0x63, 0x98, 0x25, 0x3c, 0x0c, 0x19, 0x49, - 0x79, 0xdc, 0x49, 0x87, 0x11, 0x93, 0x0a, 0xcf, 0xb6, 0x57, 0x9b, 0xb2, 0x3c, 0xbb, 0x6a, 0x35, - 0x01, 0x6d, 0xee, 0x1a, 0xdc, 0xc7, 0xc3, 0x88, 0xe1, 0x19, 0x92, 0x1f, 0xa2, 0x07, 0xd0, 0x20, - 0x3c, 0x7c, 0xea, 0x77, 0x3b, 0x5f, 0x26, 0x3c, 0x94, 0xba, 0xd7, 0x77, 0x96, 0x9f, 0x9f, 0xad, - 0x3a, 0x2c, 0x24, 0x9c, 0xfa, 0x61, 0xb7, 0x25, 0x12, 0x4d, 0xec, 0x9d, 0x7c, 0xc0, 0x92, 0xc4, - 0xeb, 0x32, 0x5c, 0x55, 0x04, 0xf7, 0x3b, 0x0b, 0x6c, 0x53, 0x8f, 0xff, 0xc2, 0x7e, 0x7e, 0x2e, - 0x83, 0x6d, 0x0a, 0x8d, 0xde, 0x87, 0x4a, 0xe8, 0xf5, 0xd5, 0x2e, 0xea, 0x3b, 0xf7, 0x9e, 0x9f, - 0xad, 0xbe, 0xd6, 0xf5, 0xd3, 0xde, 0xe0, 0xb0, 0x49, 0x78, 0xbf, 0xc5, 0x92, 0x74, 0xe0, 0xc5, - 0x43, 0x75, 0x9f, 0x0b, 0x37, 0xdc, 0xec, 0x16, 0xcb, 0x29, 0x2e, 0x39, 0x5a, 0xe9, 0x45, 0x1c, - 0xad, 0x3c, 0xf9, 0xd1, 0xd0, 0x3b, 0x60, 0x1f, 0xfa, 0xa1, 0x80, 0x24, 0x4e, 0x65, 0xad, 0xbc, - 0xd9, 0x68, 0xbf, 0x3c, 0xf6, 0x8e, 0x37, 0x77, 0x14, 0x12, 0x67, 0x14, 0xf7, 0x7b, 0x0b, 0x6a, - 0x3a, 0x8a, 0x9e, 0xc0, 0x42, 0xcc, 0x12, 0x3e, 0x88, 0x09, 0xeb, 0xe4, 0xb7, 0x63, 0x4d, 0xb0, - 0x9d, 0x59, 0xc3, 0xdc, 0x55, 0xdb, 0x7a, 0x13, 0x80, 0xf0, 0x20, 0x60, 0x24, 0xf5, 0x75, 0xad, - 0x1a, 0xed, 0x05, 0xad, 0x4a, 0x16, 0x17, 0xc2, 0xec, 0x54, 0x4e, 0xcf, 0x56, 0xaf, 0xe1, 0x1c, - 0xda, 0xed, 0xc2, 0x94, 0xec, 0x30, 0x74, 0x1b, 0x8c, 0xd9, 0x68, 0x93, 0x98, 0x2f, 0xe8, 0x8a, - 0x0d, 0x02, 0x39, 0x50, 0x3b, 0x66, 0x71, 0x62, 0x96, 0xab, 0x63, 0x33, 0x44, 0x2f, 0x41, 0x8d, - 0xc6, 0xc3, 0x4e, 0x3c, 0x50, 0xca, 0xda, 0xb8, 0x4a, 0xe3, 0x21, 0x1e, 0x84, 0xee, 0x2f, 0x16, - 0x54, 0x44, 0x7b, 0xbe, 0xa8, 0x85, 0x5e, 0x81, 0xa9, 0xd8, 0x0b, 0xbb, 0xc6, 0x6c, 0xae, 0xab, - 0x49, 0xb0, 0x08, 0xc9, 0x29, 0x54, 0x16, 0xbd, 0x01, 0x90, 0xa4, 0x5e, 0xca, 0x94, 0xba, 0x95, - 0x09, 0xd4, 0x9d, 0x92, 0x78, 0xb7, 0x05, 0x8d, 0x9c, 0x37, 0xa0, 0x35, 0x68, 0x90, 0x1e, 0x23, - 0x47, 0x11, 0xf7, 0xc3, 0x34, 0x91, 0x3b, 0x9f, 0xc1, 0xf9, 0xd0, 0xfa, 0xaf, 0x0d, 0xb0, 0x31, - 0x4b, 0x22, 0x1e, 0x26, 0x0c, 0xdd, 0x3a, 0xe7, 0xb7, 0x79, 0x57, 0x53, 0x80, 0xbc, 0xe1, 0xbe, - 0x0d, 0x60, 0x5c, 0x94, 0x51, 0x5d, 0xbe, 0xe5, 0x22, 0x63, 0x2f, 0xc3, 0xe0, 0x1c, 0x1e, 0x3d, - 0x80, 0xba, 0x31, 0x53, 0xaa, 0xb5, 0x58, 0x2a, 0x92, 0xcd, 0xad, 0xa4, 0x78, 0x84, 0x46, 0x5b, - 0x50, 0x13, 0xb6, 0xea, 0x33, 0xaa, 0xdd, 0x77, 0xb1, 0x48, 0xdc, 0x56, 0x00, 0x6c, 0x90, 0xe8, - 0x2e, 0x54, 0x85, 0xbf, 0x32, 0xaa, 0x4d, 0xd8, 0x29, 0x72, 0xf6, 0x65, 0x1e, 0x6b, 0x1c, 0x7a, - 0x1d, 0x6c, 0x0d, 0xa1, 0xda, 0x86, 0xdd, 0x22, 0x47, 0x57, 0x9f, 0xe2, 0x0c, 0x2b, 0x74, 0x19, - 0xe9, 0xeb, 0xd4, 0xc6, 0xe9, 0xb2, 0x9b, 0x61, 0x70, 0x0e, 0x7f, 0xb5, 0x7f, 0xff, 0x50, 0xd2, - 0xfe, 0xed, 0x82, 0x6d, 0x4c, 0x47, 0xd7, 0x34, 0x1b, 0xa3, 0xc7, 0x80, 0x74, 0x67, 0x26, 0xa4, - 0xc7, 0xfa, 0xde, 0xe4, 0x56, 0x38, 0xad, 0x78, 0x07, 0x92, 0x86, 0x3e, 0x83, 0xa5, 0x8b, 0xad, - 0x9e, 0x9f, 0x70, 0x12, 0x03, 0x5a, 0x38, 0xdf, 0xf1, 0x7a, 0xe2, 0xdb, 0x30, 0x4f, 0x39, 0x19, - 0xf4, 0x59, 0x98, 0x7a, 0xa2, 0x99, 0x3b, 0x83, 0x38, 0x50, 0x57, 0x1c, 0xcf, 0x9d, 0x4b, 0x7c, - 0x12, 0x07, 0x68, 0x03, 0xaa, 0xdc, 0x1b, 0xa4, 0xbd, 0xb6, 0xae, 0xdb, 0xb4, 0x6a, 0x98, 0xfd, - 0x6d, 0x11, 0xc3, 0x3a, 0xe7, 0xfe, 0x55, 0x02, 0x18, 0x5d, 0x34, 0xf4, 0x6e, 0xce, 0xf0, 0x2c, - 0x69, 0x78, 0x1b, 0x57, 0x5d, 0xcc, 0x4b, 0x3c, 0xef, 0xc7, 0xd2, 0xc8, 0xf3, 0x6e, 0xc2, 0x5c, - 0xcc, 0x08, 0xef, 0xf7, 0x59, 0x48, 0x19, 0xed, 0x8c, 0x1e, 0x06, 0x7c, 0x3d, 0x17, 0xff, 0x50, - 0x98, 0xfd, 0x38, 0x7b, 0x2c, 0xfd, 0x0b, 0x7b, 0x7c, 0x02, 0x0b, 0x46, 0x8d, 0x7f, 0x2c, 0xfc, - 0xac, 0x61, 0x6a, 0xc9, 0xe7, 0xa0, 0x7c, 0xc4, 0x86, 0xd2, 0xfc, 0xeb, 0x58, 0xfc, 0x14, 0x0e, - 0x45, 0xfd, 0xc4, 0x3b, 0x0c, 0x98, 0x14, 0xd6, 0xc6, 0x66, 0x88, 0x6e, 0xc0, 0x4c, 0x76, 0x86, - 0xc8, 0x4b, 0x7b, 0x4e, 0x55, 0xb2, 0xa6, 0x4d, 0xf0, 0x23, 0x2f, 0xed, 0xb9, 0xdf, 0x40, 0x3d, - 0xeb, 0x4d, 0xf4, 0xa8, 0x20, 0xf7, 0x8d, 0x2b, 0x5a, 0xf9, 0x12, 0xb5, 0x9b, 0x23, 0xb1, 0x0b, - 0xab, 0x5b, 0x97, 0xac, 0x7e, 0x1f, 0x6a, 0xba, 0xc1, 0xd1, 0xab, 0x80, 0x3c, 0xf9, 0x24, 0x74, - 0x28, 0x4b, 0x48, 0xec, 0x47, 0xf2, 0x31, 0x51, 0xe5, 0x99, 0x57, 0x99, 0xbd, 0x51, 0xc2, 0x7d, - 0x0f, 0xaa, 0xaa, 0xcd, 0xd1, 0x5b, 0xb0, 0xc8, 0xbe, 0x8e, 0x02, 0x9f, 0xf8, 0x69, 0x27, 0xf7, - 0x01, 0x25, 0x44, 0x53, 0x3e, 0x69, 0x63, 0xc7, 0x00, 0xb6, 0x2f, 0xe4, 0xdd, 0xcf, 0xc1, 0x36, - 0x9d, 0x2f, 0x94, 0xd4, 0x07, 0xd1, 0xad, 0x68, 0x86, 0x68, 0x0b, 0x6c, 0xca, 0xc9, 0xe4, 0x37, - 0xa0, 0x4c, 0x39, 0x71, 0xef, 0x03, 0x8c, 0xac, 0x01, 0xdd, 0x02, 0xe5, 0xeb, 0xda, 0x91, 0xb3, - 0xe7, 0x51, 0x7f, 0x19, 0x1c, 0x88, 0x9c, 0xb6, 0xfe, 0xf6, 0x23, 0xa8, 0x67, 0x09, 0xd4, 0x86, - 0x9a, 0xde, 0x21, 0x9a, 0xbb, 0xf8, 0xd8, 0xbb, 0xf3, 0x85, 0xf2, 0x6c, 0x5a, 0x77, 0xad, 0x9d, - 0x87, 0xa7, 0xbf, 0xaf, 0x5c, 0x3b, 0x7d, 0xb6, 0x62, 0xfd, 0xf6, 0x6c, 0xc5, 0xfa, 0xe9, 0x8f, - 0x15, 0xeb, 0x8b, 0x3b, 0x13, 0x7d, 0xf5, 0xe8, 0xc9, 0x0e, 0xab, 0x32, 0xb4, 0xf5, 0x77, 0x00, - 0x00, 0x00, 0xff, 0xff, 0x21, 0xc7, 0x8f, 0xfd, 0x23, 0x0c, 0x00, 0x00, + // 1138 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x6e, 0x1b, 0x45, + 0x14, 0xee, 0xda, 0x8e, 0xbd, 0x3e, 0x4e, 0xd2, 0x64, 0x14, 0xca, 0x66, 0x13, 0x25, 0x21, 0x0d, + 0x52, 0xfa, 0x83, 0x5d, 0x1c, 0x0a, 0x2d, 0x3f, 0x2d, 0xf9, 0xa1, 0x12, 0x95, 0x20, 0xd5, 0x84, + 0x1f, 0xc1, 0x8d, 0xb5, 0x99, 0x99, 0xda, 0x4b, 0xd6, 0x3b, 0xcb, 0xee, 0x3a, 0xc1, 0x12, 0x2f, + 0xc0, 0x05, 0x17, 0x3c, 0x00, 0x12, 0xf7, 0x88, 0xf7, 0xc8, 0x05, 0x17, 0x3c, 0x41, 0x24, 0xca, + 0x2d, 0xe2, 0x01, 0x2a, 0x21, 0xa1, 0xf9, 0x5b, 0x6f, 0xb2, 0x49, 0x64, 0x50, 0x2f, 0xb8, 0xb1, + 0x77, 0xce, 0xf9, 0xce, 0xcc, 0x99, 0xef, 0xcc, 0xf9, 0x66, 0x60, 0xb5, 0xcb, 0x5b, 0x51, 0xcc, + 0x53, 0x4e, 0x78, 0x90, 0xb4, 0x88, 0x17, 0xa5, 0x83, 0x98, 0x99, 0xff, 0xa6, 0xf4, 0xa0, 0x9a, + 0x1e, 0xba, 0x8b, 0xa7, 0xc0, 0x4f, 0x03, 0x7e, 0x24, 0x7f, 0x14, 0xcc, 0x9d, 0xeb, 0xf2, 0x2e, + 0x97, 0x9f, 0x2d, 0xf1, 0xa5, 0xac, 0xab, 0x7f, 0x02, 0xd4, 0x30, 0xfb, 0x7a, 0xc0, 0x92, 0x14, + 0xdd, 0x80, 0x4a, 0x12, 0x31, 0xe2, 0x58, 0x2b, 0xd6, 0x7a, 0xa3, 0xfd, 0x52, 0xd3, 0x2c, 0xa3, + 0xfd, 0xcd, 0xbd, 0x88, 0x11, 0x2c, 0x21, 0xe8, 0x2e, 0xd8, 0xd4, 0x4f, 0x08, 0x3f, 0x64, 0xb1, + 0x53, 0x92, 0xf0, 0xf9, 0x02, 0x7c, 0x47, 0x03, 0x70, 0x06, 0x15, 0x61, 0x87, 0x5e, 0xe0, 0x53, + 0x2f, 0x65, 0x4e, 0xf9, 0x82, 0xb0, 0xcf, 0x34, 0x00, 0x67, 0x50, 0x74, 0x1b, 0x26, 0xbc, 0x28, + 0x0a, 0x86, 0x4e, 0x45, 0xc6, 0x5c, 0x2b, 0xc4, 0x6c, 0x0a, 0x2f, 0x56, 0x20, 0xb1, 0x0d, 0x1e, + 0xb1, 0xd0, 0x99, 0xb8, 0x60, 0x1b, 0xbb, 0x11, 0x0b, 0xb1, 0x84, 0xa0, 0x07, 0xd0, 0xf0, 0xc8, + 0x41, 0xc8, 0x8f, 0x02, 0x46, 0xbb, 0xcc, 0xa9, 0xca, 0x88, 0xc5, 0xe2, 0xf4, 0x23, 0x0c, 0xce, + 0x07, 0xa0, 0x05, 0xb0, 0xfd, 0x30, 0x65, 0x71, 0xe8, 0x05, 0x0e, 0x5d, 0xb1, 0xd6, 0x27, 0x71, + 0x7d, 0xcd, 0x18, 0xdc, 0xef, 0x2c, 0xa8, 0x08, 0xca, 0xd0, 0x23, 0x98, 0x26, 0x3c, 0x0c, 0x19, + 0x49, 0x79, 0xdc, 0x49, 0x87, 0x11, 0x93, 0x0c, 0x4f, 0xb7, 0x97, 0x9b, 0xb2, 0x3c, 0xdb, 0x6a, + 0x35, 0x01, 0x6d, 0x6e, 0x1b, 0xdc, 0x27, 0xc3, 0x88, 0xe1, 0x29, 0x92, 0x1f, 0xa2, 0xfb, 0xd0, + 0x20, 0x3c, 0x7c, 0xea, 0x77, 0x3b, 0x5f, 0x25, 0x3c, 0x94, 0xbc, 0xd7, 0xb7, 0x16, 0x9f, 0x9f, + 0x2c, 0x3b, 0x2c, 0x24, 0x9c, 0xfa, 0x61, 0xb7, 0x25, 0x1c, 0x4d, 0xec, 0x1d, 0x7d, 0xc4, 0x92, + 0xc4, 0xeb, 0x32, 0x5c, 0x55, 0x01, 0xee, 0xf7, 0x16, 0xd8, 0xa6, 0x1e, 0xff, 0x87, 0x7c, 0x7e, + 0x2e, 0x83, 0x6d, 0x0a, 0x8d, 0x3e, 0x84, 0x4a, 0xe8, 0xf5, 0x55, 0x16, 0xf5, 0xad, 0xbb, 0xcf, + 0x4f, 0x96, 0x5f, 0xef, 0xfa, 0x69, 0x6f, 0xb0, 0xdf, 0x24, 0xbc, 0xdf, 0x62, 0x49, 0x3a, 0xf0, + 0xe2, 0xa1, 0x3a, 0xcf, 0x85, 0x13, 0x6e, 0xb2, 0xc5, 0x72, 0x8a, 0x73, 0xb6, 0x56, 0x7a, 0x11, + 0x5b, 0x2b, 0x8f, 0xbf, 0x35, 0xf4, 0x1e, 0xd8, 0xfb, 0x7e, 0x28, 0x20, 0x89, 0x53, 0x59, 0x29, + 0xaf, 0x37, 0xda, 0xaf, 0x5c, 0x78, 0xc6, 0x9b, 0x5b, 0x0a, 0x89, 0xb3, 0x10, 0xf7, 0x07, 0x0b, + 0x6a, 0xda, 0x8a, 0x1e, 0xc3, 0x5c, 0xcc, 0x12, 0x3e, 0x88, 0x09, 0xeb, 0xe4, 0xd3, 0xb1, 0xc6, + 0x48, 0x67, 0xda, 0x44, 0x6e, 0xab, 0xb4, 0xde, 0x06, 0x20, 0x3c, 0x08, 0x18, 0x49, 0x7d, 0x5d, + 0xab, 0x46, 0x7b, 0x4e, 0xb3, 0x92, 0xd9, 0x05, 0x31, 0x5b, 0x95, 0xe3, 0x93, 0xe5, 0x2b, 0x38, + 0x87, 0x76, 0xbb, 0x30, 0x21, 0x3b, 0x0c, 0xdd, 0x02, 0x23, 0x36, 0x5a, 0x24, 0x66, 0x0b, 0xbc, + 0x62, 0x83, 0x40, 0x0e, 0xd4, 0x0e, 0x59, 0x9c, 0x98, 0xe5, 0xea, 0xd8, 0x0c, 0xd1, 0xcb, 0x50, + 0xa3, 0xf1, 0xb0, 0x13, 0x0f, 0x14, 0xb3, 0x36, 0xae, 0xd2, 0x78, 0x88, 0x07, 0xa1, 0xfb, 0x8b, + 0x05, 0x15, 0xd1, 0x9e, 0x2f, 0x6a, 0xa1, 0x57, 0x61, 0x22, 0xf6, 0xc2, 0xae, 0x11, 0x9b, 0xab, + 0x6a, 0x12, 0x2c, 0x4c, 0x72, 0x0a, 0xe5, 0x45, 0x6f, 0x01, 0x24, 0xa9, 0x97, 0x32, 0xc5, 0x6e, + 0x65, 0x0c, 0x76, 0x27, 0x24, 0xde, 0x6d, 0x41, 0x23, 0xa7, 0x0d, 0x68, 0x05, 0x1a, 0xa4, 0xc7, + 0xc8, 0x41, 0xc4, 0xfd, 0x30, 0x4d, 0x64, 0xe6, 0x53, 0x38, 0x6f, 0x5a, 0xfd, 0xbb, 0x01, 0x36, + 0x66, 0x49, 0xc4, 0xc3, 0x84, 0xa1, 0x9b, 0xa7, 0xf4, 0x36, 0xaf, 0x6a, 0x0a, 0x90, 0x17, 0xdc, + 0x77, 0x01, 0x8c, 0x8a, 0x32, 0xaa, 0xcb, 0xb7, 0x58, 0x8c, 0xd8, 0xc9, 0x30, 0x38, 0x87, 0x47, + 0xf7, 0xa1, 0x6e, 0xc4, 0x94, 0x6a, 0x2e, 0x16, 0x8a, 0xc1, 0xe6, 0x54, 0x52, 0x3c, 0x42, 0xa3, + 0x0d, 0xa8, 0x09, 0x59, 0xf5, 0x19, 0xd5, 0xea, 0x3b, 0x5f, 0x0c, 0xdc, 0x54, 0x00, 0x6c, 0x90, + 0xe8, 0x0e, 0x54, 0x85, 0xbe, 0x32, 0xaa, 0x45, 0xd8, 0x29, 0xc6, 0xec, 0x4a, 0x3f, 0xd6, 0x38, + 0xf4, 0x26, 0xd8, 0x1a, 0x42, 0xb5, 0x0c, 0xbb, 0xc5, 0x18, 0x5d, 0x7d, 0x8a, 0x33, 0xac, 0xe0, + 0x65, 0xc4, 0xaf, 0x53, 0xbb, 0x88, 0x97, 0xed, 0x0c, 0x83, 0x73, 0xf8, 0xcb, 0xf5, 0xfb, 0xd7, + 0x92, 0xd6, 0x6f, 0x17, 0x6c, 0x23, 0x3a, 0xba, 0xa6, 0xd9, 0x18, 0x3d, 0x02, 0xa4, 0x3b, 0x33, + 0x21, 0x3d, 0xd6, 0xf7, 0xc6, 0x97, 0xc2, 0x49, 0x15, 0xb7, 0x27, 0xc3, 0xd0, 0xe7, 0xb0, 0x70, + 0xb6, 0xd5, 0xf3, 0x13, 0x8e, 0x23, 0x40, 0x73, 0xa7, 0x3b, 0x5e, 0x4f, 0x7c, 0x0b, 0x66, 0x29, + 0x27, 0x83, 0x3e, 0x0b, 0x53, 0x4f, 0x34, 0x73, 0x67, 0x10, 0x07, 0xea, 0x88, 0xe3, 0x99, 0x53, + 0x8e, 0x4f, 0xe3, 0x00, 0xad, 0x41, 0x95, 0x7b, 0x83, 0xb4, 0xd7, 0xd6, 0x75, 0x9b, 0x54, 0x0d, + 0xb3, 0xbb, 0x29, 0x6c, 0x58, 0xfb, 0xd0, 0x1b, 0x70, 0x2d, 0xcb, 0x35, 0xf2, 0xd2, 0x5e, 0x47, + 0x92, 0xc9, 0xe2, 0xc4, 0xa9, 0xae, 0x94, 0xd7, 0xeb, 0xa3, 0x44, 0x9e, 0x78, 0x69, 0xef, 0x89, + 0xf6, 0xb9, 0x7f, 0x95, 0x00, 0x46, 0xc7, 0x13, 0xbd, 0x9f, 0x93, 0x49, 0x4b, 0xca, 0xe4, 0xda, + 0x65, 0xc7, 0xf9, 0x1c, 0xa5, 0xfc, 0xb1, 0x34, 0x52, 0xca, 0x1b, 0x30, 0x13, 0x33, 0xc2, 0xfb, + 0x7d, 0x16, 0x52, 0x46, 0x3b, 0xa3, 0xeb, 0x04, 0x5f, 0xcd, 0xd9, 0x3f, 0x16, 0x57, 0xc4, 0x45, + 0xa2, 0x5a, 0xfa, 0x0f, 0xa2, 0xfa, 0x18, 0xe6, 0x0c, 0x87, 0xff, 0xba, 0x5c, 0xd3, 0x26, 0x52, + 0x17, 0x6a, 0x06, 0xca, 0x07, 0x6c, 0x28, 0xaf, 0x8c, 0x3a, 0x16, 0x9f, 0x42, 0xd7, 0xa8, 0x9f, + 0x78, 0xfb, 0x01, 0x93, 0xe5, 0xb0, 0xb1, 0x19, 0xa2, 0xeb, 0x30, 0x75, 0xaa, 0x02, 0x9a, 0xf8, + 0xc9, 0x3c, 0xf1, 0xee, 0xb7, 0x50, 0xcf, 0x3a, 0x1a, 0x3d, 0x2c, 0xd0, 0x7d, 0xfd, 0x12, 0x01, + 0x38, 0x87, 0xed, 0xe6, 0x88, 0xec, 0xc2, 0xea, 0xd6, 0x39, 0xab, 0xdf, 0x83, 0x9a, 0x96, 0x05, + 0xf4, 0x1a, 0x20, 0x4f, 0x5e, 0x24, 0x1d, 0xca, 0x12, 0x12, 0xfb, 0x91, 0xbc, 0x82, 0x54, 0x79, + 0x66, 0x95, 0x67, 0x67, 0xe4, 0x70, 0x3f, 0x80, 0xaa, 0x12, 0x07, 0xf4, 0x0e, 0xcc, 0xb3, 0x6f, + 0xa2, 0xc0, 0x27, 0x7e, 0xda, 0xc9, 0x3d, 0xbb, 0x04, 0x69, 0x4a, 0x5d, 0x6d, 0xec, 0x18, 0xc0, + 0xe6, 0x19, 0xbf, 0xfb, 0x05, 0xd8, 0x46, 0x2f, 0x04, 0x93, 0x7a, 0x23, 0xba, 0x81, 0xcd, 0x10, + 0x6d, 0x80, 0x4d, 0x39, 0x19, 0xff, 0x04, 0x94, 0x29, 0x27, 0xee, 0x3d, 0x80, 0x91, 0xa0, 0xa0, + 0x9b, 0xa0, 0x6e, 0x03, 0xad, 0xe3, 0xd9, 0xa5, 0xaa, 0xdf, 0x13, 0x7b, 0xc2, 0xa7, 0x2f, 0x8c, + 0xf6, 0x43, 0xa8, 0x67, 0x0e, 0xd4, 0x86, 0x9a, 0xce, 0x10, 0xcd, 0x9c, 0x7d, 0x22, 0xb8, 0xb3, + 0x85, 0xf2, 0xac, 0x5b, 0x77, 0xac, 0xad, 0x07, 0xc7, 0xbf, 0x2f, 0x5d, 0x39, 0x7e, 0xb6, 0x64, + 0xfd, 0xf6, 0x6c, 0xc9, 0xfa, 0xe9, 0x8f, 0x25, 0xeb, 0xcb, 0xdb, 0x63, 0xbd, 0x95, 0xf4, 0x64, + 0xfb, 0x55, 0x69, 0xda, 0xf8, 0x27, 0x00, 0x00, 0xff, 0xff, 0x2e, 0x31, 0xb5, 0xe6, 0x59, 0x0c, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1680,6 +1691,15 @@ func (m *Response_Spec) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.ResourcePathPointers) > 0 { + for iNdEx := len(m.ResourcePathPointers) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ResourcePathPointers[iNdEx]) + copy(dAtA[i:], m.ResourcePathPointers[iNdEx]) + i = encodeVarintCapture(dAtA, i, uint64(len(m.ResourcePathPointers[iNdEx]))) + i-- + dAtA[i] = 0x32 + } + } if m.Oauth2 != nil { { size, err := m.Oauth2.MarshalToSizedBuffer(dAtA[:i]) @@ -2335,6 +2355,12 @@ func (m *Response_Spec) ProtoSize() (n int) { l = m.Oauth2.ProtoSize() n += 1 + l + sovCapture(uint64(l)) } + if len(m.ResourcePathPointers) > 0 { + for _, s := range m.ResourcePathPointers { + l = len(s) + n += 1 + l + sovCapture(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -4208,6 +4234,38 @@ func (m *Response_Spec) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResourcePathPointers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCapture + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCapture + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCapture + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResourcePathPointers = append(m.ResourcePathPointers, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipCapture(dAtA[iNdEx:]) diff --git a/go/protocols/capture/capture.proto b/go/protocols/capture/capture.proto index 595ec18c6d..980afd0ef1 100644 --- a/go/protocols/capture/capture.proto +++ b/go/protocols/capture/capture.proto @@ -192,6 +192,16 @@ message Response { string documentation_url = 4; // Optional OAuth2 configuration. flow.OAuth2 oauth2 = 5; + + // One or more JSON pointers, which are used to extract the `resource_path` + // from a given `resource` of this connector. For example, a database + // capture connector might have a `resource` that's represented like: + // `{"schema": "foo", "table": "bar", "otherConfig": true}`. In that case + // it could use `resource_path_pointers: ["/schema", "/table"]`, which + // would result in a `resource_path` of `["foo", "bar"]`. This allows + // `otherConfig` to be changed by the user without impacting the identity of + // the resource. + repeated string resource_path_pointers = 6; } Spec spec = 1; @@ -290,4 +300,4 @@ message Response { // Reserved for internal use. bytes internal = 100 [ json_name = "$internal" ]; -} \ No newline at end of file +} diff --git a/supabase/migrations/40_resource_path_pointers.sql b/supabase/migrations/40_resource_path_pointers.sql new file mode 100644 index 0000000000..674c755fb8 --- /dev/null +++ b/supabase/migrations/40_resource_path_pointers.sql @@ -0,0 +1,15 @@ + +begin; + +create domain json_pointer as text check(value = '' or (value ^@ '/' and length(value) > 1)); + + +alter table connector_tags + add column resource_path_pointers json_pointer[] + check(array_length(resource_path_pointers, 1) > 0); + +comment on column connector_tags.resource_path_pointers is +'The resource_path that was returned from the connector spec response'; + +commit; + diff --git a/supabase/tests/json.test.sql b/supabase/tests/json.test.sql index 358d240a74..348023b04d 100644 --- a/supabase/tests/json.test.sql +++ b/supabase/tests/json.test.sql @@ -107,4 +107,17 @@ begin '{"a":1,"b":{"c":{}}}'); end; -$$ language plpgsql; \ No newline at end of file +$$ language plpgsql; + +create function tests.test_json_pointers() +returns setof text as $$ +begin + + return query select lives_ok('select ''/foo''::json_pointer'); + return query select lives_ok('select ''''::json_pointer'); + return query select throws_like( + 'select ''"not a pointer"''::json_pointer', + '% violates check constraint "json_pointer_check"' + ); +end; +$$ language plpgsql;