Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phil/discover merge #1291

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion crates/agent-sql/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ pub async fn update_tag_fields(
endpoint_spec_schema: Box<RawValue>,
protocol: String,
resource_spec_schema: Box<RawValue>,
resource_path_pointers: Vec<String>,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<()> {
sqlx::query!(
r#"update connector_tags set
documentation_url = $2,
endpoint_spec_schema = $3,
protocol = $4,
resource_spec_schema = $5
resource_spec_schema = $5,
resource_path_pointers = $6
where id = $1
returning 1 as "must_exist";
"#,
Expand All @@ -149,9 +151,35 @@ pub async fn update_tag_fields(
Json(endpoint_spec_schema) as Json<Box<RawValue>>,
protocol,
Json(resource_spec_schema) as Json<Box<RawValue>>,
resource_path_pointers as Vec<String>,
)
.fetch_one(&mut *txn)
.await?;

Ok(())
}

/// Returns the `resource_path_pointers` for the given image and tag. Returns
/// `None` if there are no matching rows, or if the `resource_path_pointers`
/// column value is null.
pub async fn fetch_resource_path_pointers(
image_name: &str,
image_tag: &str,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<Vec<String>> {
let row = sqlx::query!(
r#"
select ct.resource_path_pointers as "pointers: Vec<String>"
from connectors c
join connector_tags ct on c.id = ct.connector_id
where c.image_name = $1
and ct.image_tag = $2
"#,
image_name,
image_tag
)
.fetch_optional(txn)
.await?;

Ok(row.and_then(|r| r.pointers).unwrap_or_default())
}
194 changes: 155 additions & 39 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::time::Duration;

use super::{jobs, logs, Handler, HandlerStatus, Id};
use agent_sql::connector_tags::Row;
use anyhow::Context;
use proto_flow::flow;
use runtime::{LogHandler, Runtime, RuntimeProtocol};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use tracing::info;
Expand All @@ -19,15 +23,13 @@ pub enum JobStatus {
/// A TagHandler is a Handler which evaluates tagged connector images.
pub struct TagHandler {
connector_network: String,
bindir: String,
logs_tx: logs::Tx,
}

impl TagHandler {
pub fn new(connector_network: &str, bindir: &str, logs_tx: &logs::Tx) -> Self {
pub fn new(connector_network: &str, logs_tx: &logs::Tx) -> Self {
Self {
connector_network: connector_network.to_string(),
bindir: bindir.to_string(),
logs_tx: logs_tx.clone(),
}
}
Expand Down Expand Up @@ -96,57 +98,171 @@ impl TagHandler {
}
}

// Fetch its connector specification.
let spec = jobs::run_with_output(
"spec",
&self.logs_tx,
row.logs_token,
async_process::Command::new(format!("{}/flowctl-go", &self.bindir))
.arg("api")
.arg("spec")
.arg("--image")
.arg(&image_composed)
.arg("--network")
.arg(&self.connector_network),
)
.await?;
let proto_type = match runtime::flow_runtime_protocol(&image_composed).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI no action -- we've not really talked about it IIRC -- but I've been thinking / endeavoring to keep the protobuf tags and names equal so that Spec request / response can be a "universal" RPC that's applied without knowing its type a priori.

This is still pretty half baked. I think it would mean, for example, arbitrarily picking Runtime::unary_capture to get a spec response even though you have no idea what the actual connector type is.

(And Derivation connectors do also have a Spec RPC)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, and this code makes the problem pretty apparent. In the case of image connectors, we can know the protocol up front by inspecting the image. But for other (local) connectors, there's no way to tell ahead of time unless we store the configuration elsewhere.

I think it would be nice if we could come up with a way to make spec a "universal" RPC. Another idea would be to pull the protocol from the connector_tags table instead of the other way around. Basically, make whoever deploys the connector specify what type of connector it is.

Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
};
let log_handler =
logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token);
let runtime = Runtime::new(
false, // Don't allow local
self.connector_network.clone(),
log_handler,
None, // no need to change log level
"ops/connector-tags-job".to_string(),
);

if !spec.0.success() {
return Ok((row.tag_id, JobStatus::SpecFailed));
}
let spec_result = match proto_type {
RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await,
RuntimeProtocol::Materialization => {
spec_materialization(&image_composed, runtime).await
}
};

/// Spec is the output shape of the `flowctl api spec` command.
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Spec {
protocol: String,
documentation_url: String,
config_schema: Box<RawValue>,
resource_config_schema: Box<RawValue>,
oauth2: Option<Box<RawValue>>,
}
let Spec {
documentation_url,
config_schema,
protocol,
let spec = match spec_result {
Ok(s) => s,
Err(err) => {
tracing::warn!(error = ?err, image = %image_composed, "connector Spec RPC failed");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
};

let ConnectorSpec {
endpoint_config_schema,
resource_config_schema,
documentation_url,
oauth2,
} = serde_json::from_slice(&spec.1).context("parsing connector spec output")?;
resource_path_pointers,
} = spec;

if proto_type == RuntimeProtocol::Capture && resource_path_pointers.is_empty() {
tracing::warn!(image = %image_composed, "capture connector spec omits resource_path_pointers");
}

agent_sql::connector_tags::update_tag_fields(
row.tag_id,
documentation_url,
config_schema,
protocol,
resource_config_schema,
endpoint_config_schema.into(),
proto_type.to_string(),
resource_config_schema.into(),
resource_path_pointers,
txn,
)
.await?;

if let Some(oauth2) = oauth2 {
agent_sql::connector_tags::update_oauth2_spec(row.connector_id, oauth2, txn).await?;
agent_sql::connector_tags::update_oauth2_spec(row.connector_id, oauth2.into(), txn)
.await?;
}

return Ok((row.tag_id, JobStatus::Success));
}
}

struct ConnectorSpec {
documentation_url: String,
endpoint_config_schema: Box<RawValue>,
resource_config_schema: Box<RawValue>,
resource_path_pointers: Vec<String>,
oauth2: Option<Box<RawValue>>,
}

async fn spec_materialization(
image: &str,
runtime: Runtime<impl LogHandler>,
) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let req = materialize::Request {
spec: Some(materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
.unwrap(),
}),
..Default::default()
};

let spec = runtime
.unary_materialize(req, SPEC_TIMEOUT)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;

let materialize::response::Spec {
protocol: _,
config_schema_json,
resource_config_schema_json,
documentation_url,
oauth2,
} = spec;

let oauth = if let Some(oa) = oauth2 {
Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config"))
} else {
None
};
Ok(ConnectorSpec {
documentation_url,
endpoint_config_schema: RawValue::from_string(config_schema_json)
.context("parsing endpoint config schema")?,
resource_config_schema: RawValue::from_string(resource_config_schema_json)
.context("parsing resource config schema")?,

// materialization connectors don't currently specify resrouce_path_pointers, though perhaps they should
resource_path_pointers: Vec::new(),
oauth2: oauth,
})
}

async fn spec_capture(
image: &str,
runtime: Runtime<impl LogHandler>,
) -> anyhow::Result<ConnectorSpec> {
use proto_flow::capture;
let req = capture::Request {
spec: Some(capture::request::Spec {
connector_type: flow::capture_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config": {}}))
.unwrap(),
}),
..Default::default()
};

let spec = runtime
.unary_capture(req, SPEC_TIMEOUT)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;
let capture::response::Spec {
// protocol here is the numeric version of the capture protocol
protocol: _,
config_schema_json,
resource_config_schema_json,
documentation_url,
oauth2,
resource_path_pointers,
} = spec;

let oauth = if let Some(oa) = oauth2 {
Some(
RawValue::from_string(serde_json::to_string(&oa).expect("can serialize oauth2 config"))
.expect("serialization of oauth2 config cannot fail"),
)
} else {
None
};
Ok(ConnectorSpec {
documentation_url,
endpoint_config_schema: RawValue::from_string(config_schema_json)
.context("parsing endpoint config schema")?,
resource_config_schema: RawValue::from_string(resource_config_schema_json)
.context("parsing resource config schema")?,
resource_path_pointers,
oauth2: oauth,
})
}

const SPEC_TIMEOUT: Duration = Duration::from_secs(10);
11 changes: 11 additions & 0 deletions crates/agent/src/discovers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ impl DiscoverHandler {
return Ok(Err(errors));
}

// TODO: As of 2023-11, resource_path_pointers are allowed to be empty.
// `merge_capture` will just log a warning if they are. But we plan to
// soon require that they are never empty.
let resource_path_pointers =
agent_sql::connector_tags::fetch_resource_path_pointers(image_name, image_tag, txn)
.await?;
if resource_path_pointers.is_empty() {
tracing::warn!(%image_name, %image_tag, %capture_name, "merging bindings using legacy behavior because resource_path_pointers are missing");
}

// Deeply merge the capture and its bindings.
let capture_name = models::Capture::new(capture_name);
let (capture, discovered_bindings) = specs::merge_capture(
Expand All @@ -280,6 +290,7 @@ impl DiscoverHandler {
discovered_bindings,
catalog.captures.remove(&capture_name),
update_only,
&resource_path_pointers,
);
let targets = capture
.bindings
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
source: crates/agent/src/discovers/specs.rs
expression: json!(out)
---
[
{
"bindings": [
{
"resource": {
"modified": 1,
"stream": "foo"
},
"target": "acmeCo/renamed"
},
{
"resource": {
"modified": 2,
"namespace": "spacename",
"stream": "foo"
},
"target": "acmeCo/renamed-namepaced"
},
{
"disable": true,
"resource": {
"modified": "yup",
"stream": "disabled"
},
"target": "test/collection/disabled"
}
],
"endpoint": {
"connector": {
"config": {
"discovered": 1
},
"image": "new/image"
}
},
"interval": "34s",
"shards": {
"maxTxnDuration": "12s"
}
},
[
{
"documentSchema": {
"const": "discovered"
},
"recommendedName": "suggested",
"resourceConfig": {
"modified": 0,
"stream": "foo"
}
},
{
"documentSchema": {
"const": "discovered-namepaced"
},
"recommendedName": "suggested2",
"resourceConfig": {
"modified": 0,
"namespace": "spacename",
"stream": "foo"
}
},
{
"documentSchema": false,
"recommendedName": "other",
"resourceConfig": {
"modified": 0,
"stream": "disabled"
}
}
]
]
Loading