diff --git a/crates/flowctl/src/auth/mod.rs b/crates/flowctl/src/auth/mod.rs index 4f4c9acb38..17ead583a8 100644 --- a/crates/flowctl/src/auth/mod.rs +++ b/crates/flowctl/src/auth/mod.rs @@ -2,8 +2,6 @@ mod roles; use anyhow::Context; -use crate::controlplane; - #[derive(Debug, clap::Args)] #[clap(rename_all = "kebab-case")] pub struct Auth { @@ -47,21 +45,6 @@ pub enum Command { /// Unlike 'read' or 'write', the subject of an 'admin' grant also inherits /// capabilities granted to the object role from still-other roles. Roles(roles::Roles), - - /// Fetches and prints an auth token that can be used to access a Flow data plane. - /// - /// The returned token can be used to access the Flow data plane with 3rd party tools. - /// For example, you can use curl to access a private port of a running task by running: - /// ```ignore - /// curl -H "Authorization: Bearer $(flowctl auth data-plane-access-token --prefix myTenant/)" https://myPort.myHost.data-plane.example/ - /// ``` - DataPlaneAccessToken(DataPlaneAccessToken), -} - -#[derive(Debug, clap::Args)] -pub struct DataPlaneAccessToken { - #[clap(long, required = true)] - prefix: Vec, } #[derive(Debug, clap::Args)] @@ -76,12 +59,11 @@ impl Auth { match &self.cmd { Command::Login => do_login(ctx).await, Command::Token(Token { token }) => { - controlplane::configure_new_access_token(ctx, token.clone()).await?; + ctx.config.user_access_token = Some(token.clone()); println!("Configured access token."); Ok(()) } Command::Roles(roles) => roles.run(ctx).await, - Command::DataPlaneAccessToken(args) => do_data_plane_access_token(ctx, args).await, } } } @@ -89,7 +71,11 @@ impl Auth { async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { use crossterm::tty::IsTty; - let url = ctx.config().get_dashboard_url("/admin/api")?.to_string(); + let url = ctx + .config + .get_dashboard_url() + .join("/admin/api")? + .to_string(); println!("\nOpening browser to: {url}"); if let Err(_) = open::that(&url) { @@ -118,7 +104,7 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { // Copied credentials will often accidentally contain extra whitespace characters. let token = token.trim().to_string(); - controlplane::configure_new_access_token(ctx, token).await?; + ctx.config.user_access_token = Some(token); println!("\nConfigured access token."); Ok(()) } else { @@ -131,14 +117,3 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { ); } } - -async fn do_data_plane_access_token( - ctx: &mut crate::CliContext, - args: &DataPlaneAccessToken, -) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let access = - crate::dataplane::fetch_data_plane_access_token(client, args.prefix.clone()).await?; - println!("{}", access.auth_token); - Ok(()) -} diff --git a/crates/flowctl/src/auth/roles.rs b/crates/flowctl/src/auth/roles.rs index 132da95a8e..41710db471 100644 --- a/crates/flowctl/src/auth/roles.rs +++ b/crates/flowctl/src/auth/roles.rs @@ -138,8 +138,7 @@ pub async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("combined_grants_ext") .select( vec![ @@ -177,8 +176,7 @@ pub async fn do_grant( // Upsert user grants to `user_grants` and role grants to `role_grants`. let rows: Vec = if let Some(subject_user_id) = subject_user_id { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("user_grants") .select(grant_revoke_columns()) .upsert( @@ -195,8 +193,7 @@ pub async fn do_grant( .await? } else if let Some(subject_role) = subject_role { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("role_grants") .select(grant_revoke_columns()) .upsert( @@ -231,8 +228,7 @@ pub async fn do_revoke( // Revoke user grants from `user_grants` and role grants from `role_grants`. let rows: Vec = if let Some(subject_user_id) = subject_user_id { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("user_grants") .select(grant_revoke_columns()) .eq("user_id", subject_user_id.to_string()) @@ -242,8 +238,7 @@ pub async fn do_revoke( .await? } else if let Some(subject_role) = subject_role { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("role_grants") .select(grant_revoke_columns()) .eq("subject_role", subject_role) diff --git a/crates/flowctl/src/catalog/delete.rs b/crates/flowctl/src/catalog/delete.rs index 6cb84be8fd..e501377632 100644 --- a/crates/flowctl/src/catalog/delete.rs +++ b/crates/flowctl/src/catalog/delete.rs @@ -69,9 +69,8 @@ pub async fn do_delete( type_selector: type_selector.clone(), }; - let client = ctx.controlplane_client().await?; let specs = catalog::fetch_live_specs::( - client.clone(), + &ctx.client, &list_args, vec![ "id", @@ -98,7 +97,7 @@ pub async fn do_delete( anyhow::bail!("delete operation cancelled"); } - let draft = draft::create_draft(client.clone()) + let draft = draft::create_draft(&ctx.client) .await .context("failed to create draft")?; println!( @@ -121,8 +120,7 @@ pub async fn do_delete( .collect::>(); api_exec::>( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs") //.select("catalog_name,spec_type") .upsert(serde_json::to_string(&draft_specs).unwrap()) @@ -131,7 +129,7 @@ pub async fn do_delete( .await?; tracing::debug!("added deletions to draft"); - draft::publish(client.clone(), "", draft.id, false).await?; + draft::publish(&ctx.client, "", draft.id, false).await?; // extra newline before, since `publish` will output a bunch of logs println!("\nsuccessfully deleted {} spec(s)", draft_specs.len()); diff --git a/crates/flowctl/src/catalog/mod.rs b/crates/flowctl/src/catalog/mod.rs index 2fc3ebd70d..4b27ad6c3c 100644 --- a/crates/flowctl/src/catalog/mod.rs +++ b/crates/flowctl/src/catalog/mod.rs @@ -4,7 +4,7 @@ mod pull_specs; mod test; use crate::{ - api_exec, api_exec_paginated, controlplane, + api_exec, api_exec_paginated, output::{to_table_row, CliOutput, JsonCell}, }; use anyhow::Context; @@ -226,7 +226,7 @@ impl Catalog { /// # Panics /// If the name_selector `name` and `prefix` are both non-empty. pub async fn fetch_live_specs( - cp_client: controlplane::Client, + client: &crate::Client, list: &List, columns: Vec<&'static str>, ) -> anyhow::Result> @@ -242,7 +242,7 @@ where panic!("cannot specify both 'name' and 'prefix' for filtering live specs"); } - let builder = cp_client.from("live_specs_ext").select(columns.join(",")); + let builder = client.from("live_specs_ext").select(columns.join(",")); let builder = list.type_selector.add_spec_type_filters(builder); // Drive the actual request(s) based on the name selector, since the arguments there may @@ -448,8 +448,7 @@ async fn do_list(ctx: &mut crate::CliContext, list_args: &List) -> anyhow::Resul columns.push("reads_from"); columns.push("writes_to"); } - let client = ctx.controlplane_client().await?; - let rows = fetch_live_specs::(client, list_args, columns).await?; + let rows = fetch_live_specs::(&ctx.client, list_args, columns).await?; ctx.write_all(rows, list_args.flows) } @@ -499,8 +498,7 @@ async fn do_history(ctx: &mut crate::CliContext, History { name }: &History) -> } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("publication_specs_ext") .like("catalog_name", format!("{name}%")) .select( @@ -531,7 +529,7 @@ async fn do_draft( publication_id, }: &Draft, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; + let draft_id = ctx.config.selected_draft()?; #[derive(Deserialize)] struct Row { @@ -550,8 +548,7 @@ async fn do_draft( spec_type, } = if let Some(publication_id) = publication_id { api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("publication_specs_ext") .eq("catalog_name", name) .eq("pub_id", publication_id.to_string()) @@ -561,8 +558,7 @@ async fn do_draft( .await? } else { api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("live_specs") .eq("catalog_name", name) .not("is", "spec_type", "null") @@ -596,8 +592,7 @@ async fn do_draft( tracing::debug!(?draft_spec, "inserting draft"); let rows: Vec = api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs") .select("catalog_name,spec_type") .upsert(serde_json::to_string(&draft_spec).unwrap()) diff --git a/crates/flowctl/src/catalog/publish.rs b/crates/flowctl/src/catalog/publish.rs index 2e620235e4..ec3b6f9ec7 100644 --- a/crates/flowctl/src/catalog/publish.rs +++ b/crates/flowctl/src/catalog/publish.rs @@ -1,4 +1,4 @@ -use crate::{catalog::SpecSummaryItem, controlplane, draft, local_specs, CliContext}; +use crate::{catalog::SpecSummaryItem, draft, local_specs, CliContext}; use anyhow::Context; #[derive(Debug, clap::Args)] @@ -24,19 +24,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< // in common error scenarios. For example, we don't create the draft until after bundling, because // then we'd have to clean up the empty draft if the bundling fails. The very first thing is to create the client, // since that can fail due to missing/expired credentials. - let client = ctx.controlplane_client().await?; - anyhow::ensure!(args.auto_approve || std::io::stdin().is_tty(), "The publish command must be run interactively unless the `--auto-approve` flag is provided"); let (draft_catalog, _validations) = - local_specs::load_and_validate(client.clone(), &args.source).await?; + local_specs::load_and_validate(&ctx.client, &args.source).await?; - let draft = draft::create_draft(client.clone()).await?; + let draft = draft::create_draft(&ctx.client).await?; println!("Created draft: {}", &draft.id); tracing::info!(draft_id = %draft.id, "created draft"); - draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?; + draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?; - let removed = draft::remove_unchanged(&client, draft.id).await?; + let removed = draft::remove_unchanged(&ctx.client, draft.id).await?; if !removed.is_empty() { println!("The following specs are identical to the currently published specs, and have been pruned from the draft:"); for name in removed.iter() { @@ -50,7 +48,7 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< if summary.is_empty() { println!("No specs would be changed by this publication, nothing to publish."); - try_delete_draft(client, draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; return Ok(()); } @@ -59,17 +57,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< if !(args.auto_approve || prompt_to_continue().await) { println!("\nCancelling"); - try_delete_draft(client.clone(), draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; anyhow::bail!("publish cancelled"); } println!("Proceeding to publish..."); let publish_result = - draft::publish(client.clone(), &args.default_data_plane, draft.id, false).await; + draft::publish(&ctx.client, &args.default_data_plane, draft.id, false).await; // The draft will have been deleted automatically if the publish was successful. if let Err(err) = publish_result.as_ref() { tracing::error!(draft_id = %draft.id, error = %err, "publication error"); - try_delete_draft(client, draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; } publish_result.context("Publish failed")?; println!("\nPublish successful"); @@ -90,8 +88,8 @@ async fn prompt_to_continue() -> bool { } } -async fn try_delete_draft(client: controlplane::Client, draft_id: models::Id) { - if let Err(del_err) = draft::delete_draft(client.clone(), draft_id).await { +async fn try_delete_draft(client: &crate::Client, draft_id: models::Id) { + if let Err(del_err) = draft::delete_draft(client, draft_id).await { tracing::error!(draft_id = %draft_id, error = %del_err, "failed to delete draft"); } } diff --git a/crates/flowctl/src/catalog/pull_specs.rs b/crates/flowctl/src/catalog/pull_specs.rs index 45ab0d0936..1edf371893 100644 --- a/crates/flowctl/src/catalog/pull_specs.rs +++ b/crates/flowctl/src/catalog/pull_specs.rs @@ -23,10 +23,9 @@ pub struct PullSpecs { } pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; // Retrieve identified live specifications. let live_specs = fetch_live_specs::( - client.clone(), + &ctx.client, &List { flows: false, name_selector: args.name_selector.clone(), @@ -58,7 +57,7 @@ pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Re let sources = local_specs::indirect_and_write_resources(sources)?; println!("Wrote {count} specifications under {target}."); - let () = local_specs::generate_files(client, sources).await?; + let () = local_specs::generate_files(&ctx.client, sources).await?; Ok(()) } diff --git a/crates/flowctl/src/catalog/test.rs b/crates/flowctl/src/catalog/test.rs index 41e9126c89..b5a01f7f53 100644 --- a/crates/flowctl/src/catalog/test.rs +++ b/crates/flowctl/src/catalog/test.rs @@ -16,24 +16,22 @@ pub struct TestArgs { /// and discoverable to users. There's also no need for any confirmation steps, since we're not /// actually modifying the published specs. pub async fn do_test(ctx: &mut CliContext, args: &TestArgs) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let (draft_catalog, _validations) = - local_specs::load_and_validate(client.clone(), &args.source).await?; + local_specs::load_and_validate(&ctx.client, &args.source).await?; - let draft = draft::create_draft(client.clone()).await?; + let draft = draft::create_draft(&ctx.client).await?; println!("Created draft: {}", &draft.id); tracing::info!(draft_id = %draft.id, "created draft"); - let spec_rows = draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?; + let spec_rows = draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?; println!("Running tests for catalog items:"); ctx.write_all(spec_rows, ())?; println!("Starting tests..."); // Technically, test is just a publish with the dry-run flag set to true. let publish_result = - draft::publish(client.clone(), &args.default_data_plane, draft.id, true).await; + draft::publish(&ctx.client, &args.default_data_plane, draft.id, true).await; - if let Err(del_err) = draft::delete_draft(client.clone(), draft.id).await { + if let Err(del_err) = draft::delete_draft(&ctx.client, draft.id).await { tracing::error!(draft_id = %draft.id, error = %del_err, "failed to delete draft"); } publish_result.context("Tests failed")?; diff --git a/crates/flowctl/src/client.rs b/crates/flowctl/src/client.rs new file mode 100644 index 0000000000..2f1fe540c3 --- /dev/null +++ b/crates/flowctl/src/client.rs @@ -0,0 +1,209 @@ +/// Client encapsulates sub-clients for various control-plane +/// and data-plane services that `flowctl` interacts with. +#[derive(Clone)] +pub struct Client { + // URL of the control-plane agent API. + agent_endpoint: url::Url, + // HTTP client to use for REST requests. + http_client: reqwest::Client, + // PostgREST client. + pg_client: postgrest::Postgrest, + // User's access token, if authenticated. + user_access_token: Option, + // Base shard client which is cloned to build token-specific clients. + shard_client: gazette::shard::Client, + // Base journal client which is cloned to build token-specific clients. + journal_client: gazette::journal::Client, +} + +impl Client { + /// Build a new Client from the Config. + pub fn new(config: &crate::config::Config) -> Self { + let user_access_token = config.user_access_token.clone(); + + let mut pg_client = postgrest::Postgrest::new(config.get_pg_url().as_str()) + .insert_header("apikey", config.get_pg_public_token()); + + if let Some(token) = user_access_token.as_ref() { + pg_client = pg_client.insert_header("Authorization", &format!("Bearer {token}")); + } + + // Build journal and shard clients with an empty default service address. + // We'll use their with_endpoint_and_metadata() routines to cheaply clone + // new clients using dynamic addresses and access tokens, while re-using + // underlying connections. + let router = gazette::Router::new("local"); + + let journal_client = gazette::journal::Client::new( + String::new(), + gazette::Metadata::default(), + router.clone(), + ); + let shard_client = gazette::shard::Client::new( + String::new(), + gazette::Metadata::default(), + router.clone(), + ); + + Self { + agent_endpoint: config.get_agent_url().clone(), + http_client: reqwest::Client::new(), + journal_client, + pg_client, + shard_client, + user_access_token, + } + } + + pub fn from(&self, table: &str) -> postgrest::Builder { + self.pg_client.from(table) + } + + pub fn rpc(&self, function: &str, params: String) -> postgrest::Builder { + self.pg_client.rpc(function, params) + } + + pub fn is_authenticated(&self) -> bool { + self.user_access_token.is_some() + } + + pub async fn agent_unary( + &self, + path: &str, + request: &Request, + ) -> anyhow::Result + where + Request: serde::Serialize, + Response: serde::de::DeserializeOwned, + { + let mut builder = self + .http_client + .post(self.agent_endpoint.join(path)?) + .json(request); + + if let Some(token) = &self.user_access_token { + builder = builder.bearer_auth(token); + } + + let response = self + .http_client + .execute(builder.build()?) + .await? + .error_for_status()? + .json() + .await?; + + Ok(response) + } +} + +#[tracing::instrument(skip(client), err)] +pub async fn fetch_task_authorization( + client: &Client, + task: &str, +) -> anyhow::Result<( + String, + String, + String, + gazette::shard::Client, + gazette::journal::Client, +)> { + let started_unix = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let models::authorizations::UserTaskAuthorization { + broker_address, + broker_token, + ops_logs_journal, + ops_stats_journal, + reactor_address, + reactor_token, + shard_id_prefix, + retry_millis: _, + } = loop { + let response: models::authorizations::UserTaskAuthorization = client + .agent_unary( + "/authorize/user/task", + &models::authorizations::UserTaskAuthorizationRequest { + started_unix, + task: models::Name::new(task), + }, + ) + .await?; + + if response.retry_millis != 0 { + tracing::debug!(response.retry_millis, "sleeping before retrying request"); + () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; + continue; + } + break response; + }; + + let mut md = gazette::Metadata::default(); + md.bearer_token(&reactor_token)?; + + let shard_client = client + .shard_client + .with_endpoint_and_metadata(reactor_address, md); + + let mut md = gazette::Metadata::default(); + md.bearer_token(&broker_token)?; + + let journal_client = client + .journal_client + .with_endpoint_and_metadata(broker_address, md); + + Ok(( + shard_id_prefix, + ops_logs_journal, + ops_stats_journal, + shard_client, + journal_client, + )) +} + +#[tracing::instrument(skip(client), err)] +pub async fn fetch_collection_authorization( + client: &Client, + collection: &str, +) -> anyhow::Result<(String, gazette::journal::Client)> { + let started_unix = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let models::authorizations::UserCollectionAuthorization { + broker_address, + broker_token, + journal_name_prefix, + retry_millis: _, + } = loop { + let response: models::authorizations::UserCollectionAuthorization = client + .agent_unary( + "/authorize/user/collection", + &models::authorizations::UserCollectionAuthorizationRequest { + started_unix, + collection: models::Collection::new(collection), + }, + ) + .await?; + + if response.retry_millis != 0 { + tracing::debug!(response.retry_millis, "sleeping before retrying request"); + () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; + continue; + } + break response; + }; + + let mut md = gazette::Metadata::default(); + md.bearer_token(&broker_token)?; + + let journal_client = client + .journal_client + .with_endpoint_and_metadata(broker_address, md); + + Ok((journal_name_prefix, journal_client)) +} diff --git a/crates/flowctl/src/collection/mod.rs b/crates/flowctl/src/collection/mod.rs index 80adf777ca..6088a947f3 100644 --- a/crates/flowctl/src/collection/mod.rs +++ b/crates/flowctl/src/collection/mod.rs @@ -6,7 +6,6 @@ use proto_flow::flow; use proto_gazette::broker; use time::OffsetDateTime; -use crate::dataplane::journal_client_for; use crate::output::{to_table_row, CliOutput, JsonCell}; use self::read::ReadArgs; @@ -30,12 +29,13 @@ fn parse_partition_selector(arg: &str) -> Result broker::LabelSelector { + pub fn build_label_selector(&self, journal_name_prefix: String) -> broker::LabelSelector { assemble::journal_selector( + // Synthesize a minimal CollectionSpec to satisfy `journal_selector()`. &flow::CollectionSpec { name: self.collection.to_string(), partition_template: Some(broker::JournalSpec { - name: self.collection.to_string(), + name: journal_name_prefix, ..Default::default() }), ..Default::default() @@ -110,13 +110,15 @@ pub struct ListFragmentsArgs { #[clap(flatten)] pub selector: CollectionJournalSelector, - /// If provided, then the frament listing will include a pre-signed URL for each fragment, which is valid for the given duration. + /// If provided, then the fragment listing will include a pre-signed URL for each fragment, + /// which is valid for the given duration. /// This can be used to fetch fragment data directly from cloud storage. #[clap(long)] pub signature_ttl: Option, /// Only include fragments which were written within the provided duration from the present. - /// For example, `--since 10m` will only output fragments that have been written within the last 10 minutes. + /// For example, `--since 10m` will only output fragments that have been written within + /// the last 10 minutes. #[clap(long)] pub since: Option, } @@ -176,22 +178,23 @@ impl CliOutput for broker::fragments_response::Fragment { async fn do_list_fragments( ctx: &mut crate::CliContext, - args: &ListFragmentsArgs, + ListFragmentsArgs { + selector, + signature_ttl, + since, + }: &ListFragmentsArgs, ) -> Result<(), anyhow::Error> { - let client = journal_client_for( - ctx.controlplane_client().await?, - vec![args.selector.collection.clone()], - ) - .await?; + let (journal_name_prefix, client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; let list_resp = client .list(broker::ListRequest { - selector: Some(args.selector.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await?; - let start_time = if let Some(since) = args.since { + let start_time = if let Some(since) = *since { let timepoint = OffsetDateTime::now_utc() - *since; tracing::debug!(%since, begin_mod_time = %timepoint, "resolved --since to begin_mod_time"); timepoint.unix_timestamp() @@ -199,9 +202,7 @@ async fn do_list_fragments( 0 }; - let signature_ttl = args - .signature_ttl - .map(|ttl| std::time::Duration::from(*ttl).into()); + let signature_ttl = signature_ttl.map(|ttl| std::time::Duration::from(*ttl).into()); let mut fragments = Vec::with_capacity(32); for journal in list_resp.journals { let req = broker::FragmentsRequest { @@ -216,22 +217,19 @@ async fn do_list_fragments( fragments.extend(frag_resp.fragments); } - ctx.write_all(fragments, args.signature_ttl.is_some()) + ctx.write_all(fragments, signature_ttl.is_some()) } async fn do_list_journals( ctx: &mut crate::CliContext, - args: &CollectionJournalSelector, + selector: &CollectionJournalSelector, ) -> Result<(), anyhow::Error> { - let client = journal_client_for( - ctx.controlplane_client().await?, - vec![args.collection.clone()], - ) - .await?; + let (journal_name_prefix, client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; let list_resp = client .list(broker::ListRequest { - selector: Some(args.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await?; diff --git a/crates/flowctl/src/collection/read/mod.rs b/crates/flowctl/src/collection/read/mod.rs index 2ebc4ae953..81171247bd 100644 --- a/crates/flowctl/src/collection/read/mod.rs +++ b/crates/flowctl/src/collection/read/mod.rs @@ -1,11 +1,10 @@ -use crate::dataplane::{self}; use crate::{collection::CollectionJournalSelector, output::OutputType}; use anyhow::Context; use futures::StreamExt; use gazette::journal::ReadJsonLine; use proto_gazette::broker; +use std::io::Write; use time::OffsetDateTime; -use tokio::io::AsyncWriteExt; #[derive(clap::Args, Default, Debug, Clone)] pub struct ReadArgs { @@ -20,8 +19,6 @@ pub struct ReadArgs { /// the default. #[clap(long)] pub uncommitted: bool, - #[clap(skip)] - pub auth_prefixes: Vec, } /// Common definition for arguments specifying the begin and and bounds of a read command. @@ -42,18 +39,21 @@ pub struct ReadBounds { /// - Only uncommitted reads are supported /// - Any acknowledgements (documents with `/_meta/ack` value `true`) are also printed /// These limitations should all be addressed in the future when we add support for committed reads. -pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> anyhow::Result<()> { - if !args.uncommitted { +pub async fn read_collection( + ctx: &mut crate::CliContext, + ReadArgs { + selector, + bounds, + uncommitted, + }: &ReadArgs, +) -> anyhow::Result<()> { + if !uncommitted { anyhow::bail!("missing the `--uncommitted` flag. This flag is currently required, though a future release will add support for committed reads, which will be the default."); } // output can be either None or Some(OutputType::Json), but cannot be explicitly set to // anything else. _Eventually_, we may want to support outputting collection data as yaml // or a table, but certainly not right now. - if let Some(naughty_output_type) = ctx - .output_args() - .output - .filter(|ot| *ot != OutputType::Json) - { + if let Some(naughty_output_type) = ctx.output.output.filter(|ot| *ot != OutputType::Json) { let clap_enum = clap::ValueEnum::to_possible_value(&naughty_output_type) .expect("possible value cannot be None"); let name = clap_enum.get_name(); @@ -62,17 +62,12 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an ); } - let auth_prefixes = if args.auth_prefixes.is_empty() { - vec![args.selector.collection.clone()] - } else { - args.auth_prefixes.clone() - }; - let cp_client = ctx.controlplane_client().await?; - let client = dataplane::journal_client_for(cp_client, auth_prefixes).await?; + let (journal_name_prefix, journal_client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; - let list_resp = client + let list_resp = journal_client .list(broker::ListRequest { - selector: Some(args.selector.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await @@ -84,7 +79,7 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an .map(|j| j.spec.unwrap()) .collect::>(); - tracing::debug!(journal_count = journals.len(), collection = %args.selector.collection, "listed journals"); + tracing::debug!(journal_count = journals.len(), collection = %selector.collection, "listed journals"); let maybe_journal = journals.pop(); if !journals.is_empty() { // TODO: implement a sequencer and allow reading from multiple journals @@ -94,11 +89,19 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an let journal = maybe_journal.ok_or_else(|| { anyhow::anyhow!( "collection '{}' does not exist or has never been written to (it has no journals)", - args.selector.collection + selector.collection ) })?; - let begin_mod_time = if let Some(since) = args.bounds.since { + read_collection_journal(journal_client, &journal.name, bounds).await +} + +pub async fn read_collection_journal( + journal_client: gazette::journal::Client, + journal_name: &str, + bounds: &ReadBounds, +) -> anyhow::Result<()> { + let begin_mod_time = if let Some(since) = bounds.since { let start_time = OffsetDateTime::now_utc() - *since; tracing::debug!(%since, begin_mod_time = %start_time, "resolved --since to begin_mod_time"); (start_time - OffsetDateTime::UNIX_EPOCH).as_seconds_f64() as i64 @@ -106,27 +109,23 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an 0 }; - let mut lines = client.read_json_lines( + let mut lines = journal_client.read_json_lines( broker::ReadRequest { - journal: journal.name.clone(), + journal: journal_name.to_string(), offset: 0, - block: args.bounds.follow, + block: bounds.follow, begin_mod_time, ..Default::default() }, 1, ); - tracing::debug!(journal = %journal.name, "starting read of journal"); + tracing::debug!(%journal_name, "starting read of journal"); let policy = doc::SerPolicy::noop(); + let mut stdout = std::io::stdout(); while let Some(line) = lines.next().await { match line { - Err(err) if err.is_transient() => { - tracing::warn!(%err, "error reading collection (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - Err(err) => anyhow::bail!(err), Ok(ReadJsonLine::Meta(_)) => (), Ok(ReadJsonLine::Doc { root, @@ -134,8 +133,16 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an }) => { let mut v = serde_json::to_vec(&policy.on(root.get())).unwrap(); v.push(b'\n'); - tokio::io::stdout().write_all(&v).await?; + () = stdout.write_all(&v)?; + } + Err(gazette::Error::BrokerStatus(broker::Status::OffsetNotYetAvailable)) => { + break; // Graceful EOF of non-blocking read. } + Err(err) if err.is_transient() => { + tracing::warn!(%err, "error reading collection (will retry)"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + Err(err) => anyhow::bail!(err), } } diff --git a/crates/flowctl/src/config.rs b/crates/flowctl/src/config.rs index 6a7959c560..c03d4a6c9f 100644 --- a/crates/flowctl/src/config.rs +++ b/crates/flowctl/src/config.rs @@ -1,29 +1,114 @@ use anyhow::Context; -use serde::{Deserialize, Serialize}; use std::path::PathBuf; -lazy_static::lazy_static! { - static ref DEFAULT_DASHBOARD_URL: url::Url = url::Url::parse("https://dashboard.estuary.dev/").unwrap(); -} - -#[derive(Debug, Serialize, Deserialize, Default)] +/// Configuration of `flowctl`. +/// +/// We generally keep this minimal and prefer to use built-in default +/// or local value fallbacks, because that means we can update these +/// defaults in future releases of flowctl without breaking local +/// User configuration. +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Config { + /// URL endpoint of the Flow control-plane Agent API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub agent_url: Option, /// URL of the Flow UI, which will be used as a base when flowctl generates links to it. + #[serde(default, skip_serializing_if = "Option::is_none")] pub dashboard_url: Option, /// ID of the current draft, or None if no draft is configured. + #[serde(default, skip_serializing_if = "Option::is_none")] pub draft: Option, - // Current access token, or None if no token is set. - pub api: Option, + /// Public (shared) anonymous token of the control-plane API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pg_public_token: Option, + /// URL endpoint of the Flow control-plane PostgREST API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pg_url: Option, + /// Users's access token for the control-plane API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_access_token: Option, + /// User's refresh token for the control-plane API, + /// used to generate access_token when it's unset or expires. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_refresh_token: Option, + + #[serde(skip)] + is_local: bool, + + // Legacy API stanza, which is being phased out. + #[serde(default, skip_serializing)] + api: Option, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct RefreshToken { + pub id: models::Id, + pub secret: String, +} + +#[derive(Debug, serde::Deserialize)] +struct DeprecatedAPISection { + #[allow(dead_code)] + endpoint: url::Url, + #[allow(dead_code)] + public_token: String, + access_token: String, + refresh_token: Option, } impl Config { + pub fn selected_draft(&self) -> anyhow::Result { + self.draft + .ok_or(anyhow::anyhow!("No draft is currently selected")) + } + + pub fn get_agent_url(&self) -> &url::Url { + if let Some(agent_url) = &self.agent_url { + agent_url + } else if self.is_local { + &LOCAL_AGENT_URL + } else { + &DEFAULT_AGENT_URL + } + } + + pub fn get_dashboard_url(&self) -> &url::Url { + if let Some(dashboard_url) = &self.dashboard_url { + dashboard_url + } else if self.is_local { + &LOCAL_DASHBOARD_URL + } else { + &DEFAULT_DASHBOARD_URL + } + } + + pub fn get_pg_public_token(&self) -> &str { + if let Some(pg_public_token) = &self.pg_public_token { + pg_public_token + } else if self.is_local { + LOCAL_PG_PUBLIC_TOKEN + } else { + DEFAULT_PG_PUBLIC_TOKEN + } + } + + pub fn get_pg_url(&self) -> &url::Url { + if let Some(pg_url) = &self.pg_url { + pg_url + } else if self.is_local { + &LOCAL_PG_URL + } else { + &DEFAULT_PG_URL + } + } + /// Loads the config corresponding to the given named `profile`. /// This loads from: /// - $HOME/.config/flowctl/${profile}.json on linux /// - $HOME/Library/Application Support/flowctl/${profile}.json on macos pub fn load(profile: &str) -> anyhow::Result { let config_file = Config::file_path(profile)?; - let config = match std::fs::read(&config_file) { + let mut config = match std::fs::read(&config_file) { Ok(v) => { let cfg = serde_json::from_slice(&v).with_context(|| { format!( @@ -43,9 +128,35 @@ impl Config { Config::default() } Err(err) => { - return Err(err).context("opening config"); + return Err(err).context("failed to read config"); } }; + + // Migrate legacy portions of the config. + if let Some(DeprecatedAPISection { + endpoint: _, + public_token: _, + access_token, + refresh_token, + }) = config.api.take() + { + config.user_access_token = Some(access_token); + config.user_refresh_token = refresh_token; + } + + // If a refresh token is not defined, attempt to parse one from the environment. + if config.user_refresh_token.is_none() { + if let Ok(env_token) = std::env::var(FLOW_AUTH_TOKEN) { + let decoded = base64::decode(env_token).context("FLOW_AUTH_TOKEN is not base64")?; + let token: RefreshToken = + serde_json::from_slice(&decoded).context("FLOW_AUTH_TOKEN is invalid JSON")?; + + tracing::info!("using refresh token from environment variable {FLOW_AUTH_TOKEN}"); + config.user_refresh_token = Some(token); + } + } + config.is_local = profile == "local"; + Ok(config) } @@ -83,86 +194,21 @@ impl Config { let path = Config::config_dir()?.join(format!("{profile}.json")); Ok(path) } - - pub fn cur_draft(&self) -> anyhow::Result { - match self.draft { - Some(draft) => Ok(draft), - None => { - anyhow::bail!("You must create or select a draft"); - } - } - } - - pub fn set_access_token(&mut self, access_token: String) { - // Don't overwrite the other fields of api if they are already present. - if let Some(api) = self.api.as_mut() { - api.access_token = access_token; - } else { - self.api = Some(API::managed(access_token)); - } - } - - pub fn set_refresh_token(&mut self, refresh_token: RefreshToken) { - // Don't overwrite the other fields of api if they are already present. - if let Some(api) = self.api.as_mut() { - api.refresh_token = Some(refresh_token); - } - } - - pub fn get_dashboard_url(&self, path: &str) -> anyhow::Result { - let base = self - .dashboard_url - .as_ref() - .unwrap_or(&*DEFAULT_DASHBOARD_URL); - let url = base.join(path).context( - "failed to join path to configured dashboard_url, the dashboard_url is likely invalid", - )?; - Ok(url) - } -} - -#[derive(Deserialize, Serialize, Debug)] -pub struct RefreshToken { - pub id: String, - pub secret: String, } -impl RefreshToken { - pub fn from_base64(encoded_token: &str) -> anyhow::Result { - let decoded = base64::decode(encoded_token).context("invalid base64")?; - let tk: RefreshToken = serde_json::from_slice(&decoded)?; - Ok(tk) - } - - pub fn to_base64(&self) -> anyhow::Result { - let ser = serde_json::to_vec(self)?; - Ok(base64::encode(&ser)) - } -} +lazy_static::lazy_static! { + static ref DEFAULT_AGENT_URL: url::Url = url::Url::parse("https://agent-api-1084703453822.us-central1.run.app").unwrap(); + static ref DEFAULT_DASHBOARD_URL: url::Url = url::Url::parse("https://dashboard.estuary.dev/").unwrap(); + static ref DEFAULT_PG_URL: url::Url = url::Url::parse("https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1").unwrap(); -#[derive(Debug, Serialize, Deserialize)] -pub struct API { - // URL endpoint of the Flow control-plane Rest API. - pub endpoint: url::Url, - // Public (shared) anonymous token of the control-plane API. - pub public_token: String, - // Secret access token of the control-plane API. - pub access_token: String, - // Secret refresh token of the control-plane API, used to generate access_token when it expires. - pub refresh_token: Option, + // Used only when profile is "local". + static ref LOCAL_AGENT_URL: url::Url = url::Url::parse("http://localhost:8675/").unwrap(); + static ref LOCAL_DASHBOARD_URL: url::Url = url::Url::parse("http://localhost:3000/").unwrap(); + static ref LOCAL_PG_URL: url::Url = url::Url::parse("http://localhost:5431/rest/v1").unwrap(); } -pub const PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; - -pub const ENDPOINT: &str = "https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1"; +const DEFAULT_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; +const LOCAL_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"; -impl API { - fn managed(access_token: String) -> Self { - Self { - endpoint: url::Url::parse(ENDPOINT).unwrap(), - public_token: PUBLIC_TOKEN.to_string(), - access_token, - refresh_token: None, - } - } -} +// Environment variable which is inspected for a base64-encoded refresh token. +const FLOW_AUTH_TOKEN: &str = "FLOW_AUTH_TOKEN"; diff --git a/crates/flowctl/src/controlplane.rs b/crates/flowctl/src/controlplane.rs deleted file mode 100644 index bfaf4cb9ee..0000000000 --- a/crates/flowctl/src/controlplane.rs +++ /dev/null @@ -1,170 +0,0 @@ -use crate::config::{RefreshToken, ENDPOINT, PUBLIC_TOKEN}; -use crate::{api_exec, CliContext}; -use anyhow::Context; -use serde::Deserialize; -use std::fmt::{self, Debug}; -use std::ops::Deref; -use std::sync::Arc; - -/// A wafer-thin wrapper around a `Postgrest` client that makes it -/// cheaply cloneable and implements `Debug`. This allows us to create -/// a client once and then store it in the `CliContext` for future re-use. -/// This client implements `Deref`, so you can use it -/// just like you would the normal `Postgrest` client. -#[derive(Clone)] -pub struct Client(Arc, bool); - -impl Client { - /// Is this client authenticated (versus being an anonymous user)? - pub fn is_authenticated(&self) -> bool { - self.1 - } -} - -impl Debug for Client { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // We can't really give a better debug impl since Postgrest - // keeps all of its members private. - f.write_str("controlplane::Client") - } -} - -impl Deref for Client { - type Target = postgrest::Postgrest; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -#[derive(Deserialize)] -struct AccessTokenResponse { - access_token: String, -} - -/// Creates a new client. **you should instead call `CliContext::controlplane_client(&mut Self)`**, which -/// will re-use the existing client if possible. -// TODO(johnny): This really needs a deep overhaul. We're not updating refresh -// tokens as we should be, and the structure of the Config is problematic. -// I'm resisting refactoring it more substantially right now, but it needs it. -pub(crate) async fn new_client(ctx: &mut CliContext) -> anyhow::Result { - match ctx.config_mut().api { - Some(ref mut api) => { - let client = postgrest::Postgrest::new(api.endpoint.as_str()); - let client = client.insert_header("apikey", &api.public_token); - - // Try to give users a more friendly error message if we know their credentials are expired. - if let Err(e) = check_access_token(&api.access_token) { - if let Some(refresh_token) = &api.refresh_token { - let response = api_exec::(client.rpc( - "generate_access_token", - format!( - r#"{{"refresh_token_id": "{}", "secret": "{}"}}"#, - refresh_token.id, refresh_token.secret - ), - )) - .await?; - api.access_token = response.access_token; - } else { - return Err(e); - } - } - let client = - client.insert_header("Authorization", format!("Bearer {}", &api.access_token)); - Ok(Client(Arc::new(client), true)) - } - None => { - // If there has been no prior login, but FLOW_AUTH_TOKEN is available, we use that to - // generate an access_token and automatically login the user - if let Ok(env_token) = std::env::var(FLOW_AUTH_TOKEN) { - let client = postgrest::Postgrest::new(ENDPOINT); - let client = client.insert_header("apikey", PUBLIC_TOKEN); - - let refresh_token = RefreshToken::from_base64(&env_token)?; - let response = api_exec::(client.rpc( - "generate_access_token", - format!( - r#"{{"refresh_token_id": "{}", "secret": "{}"}}"#, - refresh_token.id, refresh_token.secret - ), - )) - .await?; - - let _jwt = check_access_token(&response.access_token)?; - ctx.config_mut() - .set_access_token(response.access_token.clone()); - - let client = client - .insert_header("Authorization", format!("Bearer {}", response.access_token)); - Ok(Client(Arc::new(client), true)) - } else { - tracing::warn!("You are not authenticated. Run `auth login` to login to Flow."); - - let client = postgrest::Postgrest::new(ENDPOINT); - let client = client.insert_header("apikey", PUBLIC_TOKEN); - Ok(Client(Arc::new(client), false)) - } - } - } -} - -pub async fn configure_new_access_token( - ctx: &mut CliContext, - access_token: String, -) -> anyhow::Result<()> { - let jwt = check_access_token(&access_token)?; - ctx.config_mut().set_access_token(access_token); - let client = ctx.controlplane_client().await?; - - let refresh_token = api_exec::(client.rpc( - "create_refresh_token", - r#"{"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}"#, - )) - .await?; - ctx.config_mut().set_refresh_token(refresh_token); - - let message = if let Some(email) = jwt.email { - format!("Configured access token for user '{email}'") - } else { - "Configured access token".to_string() - }; - println!("{}", message); - Ok(()) -} - -fn check_access_token(access_token: &str) -> anyhow::Result { - let jwt = parse_jwt(access_token).context("invalid access_token")?; - // Try to give users a more friendly error message if we know their credentials are expired. - if jwt.is_expired() { - anyhow::bail!("access token is expired, please re-authenticate and then try again"); - } - Ok(jwt) -} - -const FLOW_AUTH_TOKEN: &str = "FLOW_AUTH_TOKEN"; -#[derive(Deserialize)] -struct JWT { - exp: i64, - email: Option, -} - -impl JWT { - fn is_expired(&self) -> bool { - let exp = time::OffsetDateTime::from_unix_timestamp(self.exp).unwrap_or_else(|err| { - tracing::error!(exp = self.exp, error = %err, "invalid exp in JWT"); - time::OffsetDateTime::UNIX_EPOCH - }); - time::OffsetDateTime::now_utc() >= exp - } -} - -fn parse_jwt(jwt: &str) -> anyhow::Result { - let payload = jwt - .split('.') - .nth(1) - .ok_or_else(|| anyhow::anyhow!("invalid JWT"))?; - let json_data = - base64::decode_config(payload, base64::URL_SAFE_NO_PAD).context("invalid JWT")?; - let data: JWT = serde_json::from_slice(&json_data).context("parsing JWT data")?; - Ok(data) -} diff --git a/crates/flowctl/src/dataplane.rs b/crates/flowctl/src/dataplane.rs deleted file mode 100644 index c10e0c5313..0000000000 --- a/crates/flowctl/src/dataplane.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::controlplane; -use anyhow::Context; -use serde::Deserialize; - -#[derive(Deserialize, Clone, PartialEq, Debug)] -pub struct DataPlaneAccess { - #[serde(rename = "token")] - pub auth_token: String, - pub gateway_url: String, -} - -/// Fetches connection info for accessing a data plane for the given catalog namespace prefixes. -pub async fn fetch_data_plane_access_token( - client: controlplane::Client, - prefixes: Vec, -) -> anyhow::Result { - tracing::debug!(?prefixes, "requesting data-plane access token for prefixes"); - - let body = serde_json::to_string(&serde_json::json!({ - "prefixes": prefixes, - })) - .context("serializing prefix parameters")?; - - let req = client.rpc("gateway_auth_token", body).build(); - tracing::trace!(?req, "built request to execute"); - let resp = req - .send() - .await - .and_then(|r| r.error_for_status()) - .context("requesting data plane gateway auth token")?; - let json: serde_json::Value = resp.json().await?; - tracing::trace!(response_body = ?json, "got response from control-plane"); - let mut auths: Vec = - serde_json::from_value(json).context("failed to decode response")?; - let access = auths.pop().ok_or_else(|| { - anyhow::anyhow!( - "no data-plane access tokens were returned for the given prefixes, access is denied" - ) - })?; - if !auths.is_empty() { - let num_tokens = auths.len() + 1; - anyhow::bail!("received {} tokens for the given set of prefixes: {:?}. This is not yet implemented in flowctl", num_tokens, prefixes); - } - Ok(access) -} - -/// Returns an authenticated journal client that's authorized to the given prefixes. -pub async fn journal_client_for( - cp_client: controlplane::Client, - prefixes: Vec, -) -> anyhow::Result { - let DataPlaneAccess { - auth_token, - gateway_url, - } = fetch_data_plane_access_token(cp_client, prefixes).await?; - tracing::debug!(%gateway_url, "acquired data-plane-gateway access token"); - - let mut metadata = gazette::Metadata::default(); - metadata.bearer_token(&auth_token)?; - - let router = gazette::Router::new(&gateway_url, "local")?; - let client = gazette::journal::Client::new(Default::default(), router, metadata); - - tracing::debug!(%gateway_url, "connected data-plane client"); - Ok(client) -} diff --git a/crates/flowctl/src/draft/author.rs b/crates/flowctl/src/draft/author.rs index b65b105244..1b628bad82 100644 --- a/crates/flowctl/src/draft/author.rs +++ b/crates/flowctl/src/draft/author.rs @@ -1,4 +1,4 @@ -use crate::{api_exec, catalog::SpecSummaryItem, controlplane, local_specs}; +use crate::{api_exec, catalog::SpecSummaryItem, local_specs}; use anyhow::Context; use futures::{stream::FuturesOrdered, StreamExt}; use serde::Serialize; @@ -11,7 +11,7 @@ pub struct Author { source: String, } -pub async fn clear_draft(client: controlplane::Client, draft_id: models::Id) -> anyhow::Result<()> { +pub async fn clear_draft(client: &crate::Client, draft_id: models::Id) -> anyhow::Result<()> { tracing::info!(%draft_id, "clearing existing specs from draft"); api_exec::>( client @@ -25,7 +25,7 @@ pub async fn clear_draft(client: controlplane::Client, draft_id: models::Id) -> } pub async fn upsert_draft_specs( - client: controlplane::Client, + client: &crate::Client, draft_id: models::Id, draft: &tables::DraftCatalog, ) -> anyhow::Result> { @@ -130,12 +130,11 @@ pub async fn do_author( ctx: &mut crate::CliContext, Author { source }: &Author, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let draft_id = ctx.config().cur_draft()?; - let (draft, _) = local_specs::load_and_validate(client.clone(), &source).await?; + let draft_id = ctx.config.selected_draft()?; + let (draft, _) = local_specs::load_and_validate(&ctx.client, &source).await?; - clear_draft(client.clone(), draft_id).await?; - let rows = upsert_draft_specs(client, draft_id, &draft).await?; + clear_draft(&ctx.client, draft_id).await?; + let rows = upsert_draft_specs(&ctx.client, draft_id, &draft).await?; ctx.write_all(rows, ()) } diff --git a/crates/flowctl/src/draft/develop.rs b/crates/flowctl/src/draft/develop.rs index 0d24d8f143..03c46d2d47 100644 --- a/crates/flowctl/src/draft/develop.rs +++ b/crates/flowctl/src/draft/develop.rs @@ -24,10 +24,9 @@ pub async fn do_develop( flat, }: &Develop, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; - let client = ctx.controlplane_client().await?; + let draft_id = ctx.config.selected_draft()?; let rows: Vec = api_exec_paginated( - client + ctx.client .from("draft_specs") .select("catalog_name,spec,spec_type,expect_pub_id") .not("is", "spec_type", "null") @@ -46,7 +45,7 @@ pub async fn do_develop( let sources = local_specs::indirect_and_write_resources(sources)?; println!("Wrote {count} specifications under {target}."); - let () = local_specs::generate_files(client, sources).await?; + let () = local_specs::generate_files(&ctx.client, sources).await?; Ok(()) } diff --git a/crates/flowctl/src/draft/mod.rs b/crates/flowctl/src/draft/mod.rs index f24960b23f..c577146272 100644 --- a/crates/flowctl/src/draft/mod.rs +++ b/crates/flowctl/src/draft/mod.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use crate::{ api_exec, api_exec_paginated, - controlplane::Client, output::{to_table_row, CliOutput, JsonCell}, }; use anyhow::Context; @@ -128,7 +127,7 @@ impl CliOutput for DraftRow { } } -pub async fn create_draft(client: Client) -> Result { +pub async fn create_draft(client: &crate::Client) -> Result { let row: DraftRow = api_exec( client .from("drafts") @@ -141,7 +140,10 @@ pub async fn create_draft(client: Client) -> Result { Ok(row) } -pub async fn delete_draft(client: Client, draft_id: models::Id) -> Result { +pub async fn delete_draft( + client: &crate::Client, + draft_id: models::Id, +) -> Result { let row: DraftRow = api_exec( client .from("drafts") @@ -156,10 +158,9 @@ pub async fn delete_draft(client: Client, draft_id: models::Id) -> Result anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let row = create_draft(client).await?; + let row = create_draft(&ctx.client).await?; - ctx.config_mut().draft = Some(row.id.clone()); + ctx.config.draft = Some(row.id.clone()); ctx.write_all(Some(row), ()) } @@ -181,11 +182,10 @@ async fn do_delete(ctx: &mut crate::CliContext) -> anyhow::Result<()> { to_table_row(self, &["/id", "/updated_at"]) } } - let client = ctx.controlplane_client().await?; - let draft_id = ctx.config().cur_draft()?; - let row = delete_draft(client, draft_id).await?; + let draft_id = ctx.config.selected_draft()?; + let row = delete_draft(&ctx.client, draft_id).await?; - ctx.config_mut().draft.take(); + ctx.config.draft.take(); ctx.write_all(Some(row), ()) } @@ -223,8 +223,7 @@ async fn do_describe(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs_ext") .select( vec![ @@ -237,7 +236,7 @@ async fn do_describe(ctx: &mut crate::CliContext) -> anyhow::Result<()> { ] .join(","), ) - .eq("draft_id", ctx.config().cur_draft()?.to_string()), + .eq("draft_id", ctx.config.selected_draft()?.to_string()), ) .await?; @@ -269,8 +268,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("drafts_ext") .select("created_at,detail,id,num_specs,updated_at"), ) @@ -278,7 +276,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { // Decorate the id to mark the selected draft, but only if we're outputting a table let cur_draft = ctx - .config() + .config .draft .map(|id| id.to_string()) .unwrap_or_default(); @@ -298,7 +296,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { /// that are identical to their live specs, accounting for changes to inferred schemas. /// Returns the set of specs that were removed from the draft (as a `BTreeSet` so they're ordered). pub async fn remove_unchanged( - client: &Client, + client: &crate::Client, draft_id: models::Id, ) -> anyhow::Result> { #[derive(Deserialize)] @@ -321,8 +319,7 @@ async fn do_select( Select { id: select_id }: &Select, ) -> anyhow::Result<()> { let matched: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("drafts") .eq("id", select_id.to_string()) .select("id"), @@ -333,7 +330,7 @@ async fn do_select( anyhow::bail!("draft {select_id} does not exist"); } - ctx.config_mut().draft = Some(select_id.clone()); + ctx.config.draft = Some(select_id.clone()); do_list(ctx).await } @@ -342,19 +339,18 @@ async fn do_publish( data_plane_name: &str, dry_run: bool, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; - let client = ctx.controlplane_client().await?; + let draft_id = ctx.config.selected_draft()?; - publish(client, data_plane_name, draft_id, dry_run).await?; + publish(&ctx.client, data_plane_name, draft_id, dry_run).await?; if !dry_run { - ctx.config_mut().draft.take(); + ctx.config.draft.take(); } Ok(()) } pub async fn publish( - client: Client, + client: &crate::Client, default_data_plane_name: &str, draft_id: models::Id, dry_run: bool, diff --git a/crates/flowctl/src/generate/mod.rs b/crates/flowctl/src/generate/mod.rs index bdf4ea9d59..f30142ce3e 100644 --- a/crates/flowctl/src/generate/mod.rs +++ b/crates/flowctl/src/generate/mod.rs @@ -44,8 +44,7 @@ impl Generate { build::write_files(&project_root, files)?; - let client = ctx.controlplane_client().await?; - let () = local_specs::generate_files(client, draft).await?; + let () = local_specs::generate_files(&ctx.client, draft).await?; Ok(()) } } diff --git a/crates/flowctl/src/lib.rs b/crates/flowctl/src/lib.rs index e967436543..59649aaf11 100644 --- a/crates/flowctl/src/lib.rs +++ b/crates/flowctl/src/lib.rs @@ -5,10 +5,9 @@ use clap::Parser; mod auth; mod catalog; +mod client; mod collection; mod config; -mod controlplane; -mod dataplane; mod draft; mod generate; mod local_specs; @@ -19,8 +18,8 @@ mod poll; mod preview; mod raw; +use client::Client; use output::{Output, OutputType}; -use pagination::into_items; use poll::poll_while_queued; /// A command-line tool for working with Estuary Flow. @@ -97,38 +96,14 @@ pub enum Command { Raw(raw::Advanced), } -#[derive(Debug)] pub struct CliContext { + client: Client, config: config::Config, output: output::Output, - controlplane_client: Option, } impl CliContext { - /// Returns a client to the controlplane, creating a new one if necessary. - /// This function will return an error if the authentication credentials - /// are missing, invalid, or expired. - pub async fn controlplane_client(&mut self) -> anyhow::Result { - if self.controlplane_client.is_none() { - let client = controlplane::new_client(self).await?; - self.controlplane_client = Some(client.clone()) - } - Ok(self.controlplane_client.clone().unwrap()) - } - - pub fn config_mut(&mut self) -> &mut config::Config { - &mut self.config - } - - pub fn config(&self) -> &config::Config { - &self.config - } - - pub fn output_args(&self) -> &output::Output { - &self.output - } - - pub fn write_all(&mut self, items: I, table_alt: T::TableAlt) -> anyhow::Result<()> + fn write_all(&mut self, items: I, table_alt: T::TableAlt) -> anyhow::Result<()> where T: output::CliOutput, I: IntoIterator, @@ -140,7 +115,7 @@ impl CliContext { } } - pub fn get_output_type(&mut self) -> OutputType { + fn get_output_type(&mut self) -> OutputType { use crossterm::tty::IsTty; if let Some(ty) = self.output.output { @@ -157,12 +132,71 @@ impl CliContext { impl Cli { pub async fn run(&self) -> anyhow::Result<()> { - let config = config::Config::load(&self.profile)?; + let mut config = config::Config::load(&self.profile)?; let output = self.output.clone(); + + // If the configured access token has expired then remove it before continuing. + if let Some(token) = &config.user_access_token { + let claims: models::authorizations::ControlClaims = + parse_jwt_claims(token).context("failed to parse control-plane access token")?; + + let now = time::OffsetDateTime::now_utc(); + let exp = time::OffsetDateTime::from_unix_timestamp(claims.exp as i64).unwrap(); + + if now + std::time::Duration::from_secs(60) > exp { + tracing::info!(expired=%exp, "removing expired user access token from configuration"); + config.user_access_token = None; + } + } + + if config.user_access_token.is_some() && config.user_refresh_token.is_some() { + // Authorization is current: nothing to do. + } else if config.user_access_token.is_some() { + // We have an access token but no refresh token. Create one. + let refresh_token = api_exec::( + Client::new(&config).rpc( + "create_refresh_token", + serde_json::json!({"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}) + .to_string(), + ), + ) + .await?; + + config.user_refresh_token = Some(refresh_token); + + tracing::info!("created new refresh token"); + } else if let Some(config::RefreshToken { id, secret }) = &config.user_refresh_token { + // We have a refresh token but no access token. Generate one. + + #[derive(serde::Deserialize)] + struct Response { + access_token: String, + refresh_token: Option, // Set iff the token was single-use. + } + let Response { + access_token, + refresh_token: next_refresh_token, + } = api_exec::(Client::new(&config).rpc( + "generate_access_token", + serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), + )) + .await + .context("failed to obtain access token")?; + + if next_refresh_token.is_some() { + config.user_refresh_token = next_refresh_token; + } + config.user_access_token = Some(access_token); + + tracing::info!("generated a new access token"); + } else { + tracing::warn!("You are not authenticated. Run `auth login` to login to Flow."); + } + let mut context = CliContext { + client: Client::new(&config), config, output, - controlplane_client: None, }; match &self.cmd { @@ -176,7 +210,7 @@ impl Cli { Command::Raw(advanced) => advanced.run(&mut context).await, }?; - context.config().write(&self.profile)?; + context.config.write(&self.profile)?; Ok(()) } @@ -213,7 +247,7 @@ where { use futures::TryStreamExt; - let pages = into_items(b).try_collect().await?; + let pages = pagination::into_items(b).try_collect().await?; Ok(pages) } @@ -258,3 +292,12 @@ fn format_user(email: Option, full_name: Option, id: Option(token: &str) -> anyhow::Result { + let claims = token + .split('.') + .nth(1) + .ok_or_else(|| anyhow::anyhow!("malformed token"))?; + let claims = base64::decode_config(claims, base64::URL_SAFE_NO_PAD)?; + anyhow::Result::Ok(serde_json::from_slice(&claims)?) +} diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index 9ea708bf16..1f4ef366df 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -6,7 +6,7 @@ use tables::CatalogResolver; /// Load and validate sources and derivation connectors (only). /// Capture and materialization connectors are not validated. pub(crate) async fn load_and_validate( - client: crate::controlplane::Client, + client: &crate::Client, source: &str, ) -> anyhow::Result<(tables::DraftCatalog, tables::Validations)> { let source = build::arg_source_to_url(source, false)?; @@ -17,7 +17,7 @@ pub(crate) async fn load_and_validate( /// Load and validate sources and all connectors. pub(crate) async fn load_and_validate_full( - client: crate::controlplane::Client, + client: &crate::Client, source: &str, network: &str, ) -> anyhow::Result<(tables::DraftCatalog, tables::Validations)> { @@ -29,7 +29,7 @@ pub(crate) async fn load_and_validate_full( /// Generate connector files by validating sources with derivation connectors. pub(crate) async fn generate_files( - client: crate::controlplane::Client, + client: &crate::Client, sources: tables::DraftCatalog, ) -> anyhow::Result<()> { let (mut draft, built) = validate(client, true, false, true, sources, "").await; @@ -67,7 +67,7 @@ pub(crate) async fn load(source: &url::Url) -> tables::DraftCatalog { } async fn validate( - client: crate::controlplane::Client, + client: &crate::Client, noop_captures: bool, noop_derivations: bool, noop_materializations: bool, @@ -77,7 +77,11 @@ async fn validate( let source = &draft.fetches[0].resource.clone(); let project_root = build::project_root(source); - let mut live = Resolver { client }.resolve(draft.all_catalog_names()).await; + let mut live = Resolver { + client: client.clone(), + } + .resolve(draft.all_catalog_names()) + .await; let output = if !live.errors.is_empty() { // If there's a live catalog resolution error, surface it through built tables. @@ -191,7 +195,7 @@ pub(crate) fn pick_policy( } pub(crate) struct Resolver { - pub client: crate::controlplane::Client, + pub client: crate::Client, } impl tables::CatalogResolver for Resolver { diff --git a/crates/flowctl/src/ops.rs b/crates/flowctl/src/ops.rs index beb0b5fc9d..7649323d68 100644 --- a/crates/flowctl/src/ops.rs +++ b/crates/flowctl/src/ops.rs @@ -1,9 +1,4 @@ -use serde_json::Value; - -use crate::collection::{ - read::{read_collection, ReadArgs, ReadBounds}, - CollectionJournalSelector, -}; +use crate::collection::read::ReadBounds; #[derive(clap::Args, Debug)] pub struct Logs { @@ -14,17 +9,23 @@ pub struct Logs { pub bounds: ReadBounds, } +/// Selects a Flow task. +#[derive(clap::Args, Debug, Default, Clone)] +pub struct TaskSelector { + /// The name of the task + #[clap(long)] + pub task: String, +} + impl Logs { pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> { - let uncommitted = true; // logs reads are always 'uncommitted' because logs aren't written inside transactions. - let read_args = read_args( + read_task_ops_journal( + &ctx.client, &self.task.task, OpsCollection::Logs, &self.bounds, - uncommitted, - ); - read_collection(ctx, &read_args).await?; - Ok(()) + ) + .await } } @@ -34,161 +35,18 @@ pub enum OpsCollection { Stats, } -pub fn read_args( +pub async fn read_task_ops_journal( + client: &crate::Client, task_name: &str, collection: OpsCollection, bounds: &ReadBounds, - uncommitted: bool, -) -> ReadArgs { - let logs_or_stats = match collection { - OpsCollection::Logs => "logs", - OpsCollection::Stats => "stats", - }; - // Once we implement federated data planes, we'll need to update this to - // fetch the name of the data plane based on the tenant. - let collection = format!("ops.us-central1.v1/{logs_or_stats}"); +) -> anyhow::Result<()> { + let (_shard_id_prefix, ops_logs_journal, ops_stats_journal, _shard_client, journal_client) = + crate::client::fetch_task_authorization(client, task_name).await?; - let mut include = std::collections::BTreeMap::new(); - include.insert( - "name".to_string(), - vec![Value::String(task_name.to_string())], - ); - let selector = CollectionJournalSelector { - collection, - partitions: Some(models::PartitionSelector { - include, - exclude: Default::default(), - }), - ..Default::default() + let journal_name = match collection { + OpsCollection::Logs => ops_logs_journal, + OpsCollection::Stats => ops_stats_journal, }; - ReadArgs { - selector, - uncommitted, - bounds: bounds.clone(), - auth_prefixes: vec![task_name.to_string()], - } -} - -/// Selects one or more Flow tasks within a single tenant. -#[derive(clap::Args, Debug, Default, Clone)] -pub struct TaskSelector { - /// The name of the task - #[clap(long)] - pub task: String, - // Selects all tasks with the given type - // - // Requires the `--tenant ` argument - //#[clap(long, arg_enum, requires("tenant"))] - //pub task_type: Option, - - // Selects all tasks within the given tenant - // - // The `--task-type` may also be specified to further limit the selection to only tasks of the given - // type. - //#[clap(long)] - //pub tenant: Option, -} - -/* -#[derive(Debug, clap::ArgEnum, PartialEq, Eq, Clone, Copy)] -pub enum TaskType { - Capture, - Derivation, - Materialization, -} - -impl TaskType { - fn label_value(&self) -> &'static str { - match self { - TaskType::Capture => "capture", - TaskType::Derivation => "derivation", - TaskType::Materialization => "materialization", - } - } -} - -impl TaskSelector { - fn tenant_name(&self) -> Result<&str, anyhow::Error> { - self.tenant - .as_deref() - .or_else(|| self.task.as_deref().map(tenant)) - .ok_or_else(|| anyhow::anyhow!("missing required task selector argument")) - } -} - -*/ - -#[cfg(test)] -mod test { - // use super::*; - - /* - #[test] - fn logs_translates_into_journals_read_commands() { - assert_logs_command( - TaskSelector { - task: Some(String::from("acmeCo/test/capture")), - ..Default::default() - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/name=acmeCo%2Ftest%2Fcapture", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Capture), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=capture", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Derivation), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=derivation", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Materialization), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=materialization", - ); - assert_logs_command( - TaskSelector { - tenant: Some(String::from("acmeCo")), - ..Default::default() - }, - "estuary.dev/collection=ops/acmeCo/logs", - ); - } - - fn assert_logs_command(selector: TaskSelector, expected_label_selector: &str) { - let args = Args { - task: selector.clone(), - // Any extra arguments should be appended to whatever is generated - other: vec![String::from("an extra arg")], - }; - let cmd = args - .try_into_exec_external() - .expect("failed to convert args"); - let expected = ExecExternal::from(( - GO_FLOWCTL, - vec![ - "journals", - "read", - "--selector", - expected_label_selector, - "an extra arg", - ], - )); - assert_eq!( - expected, cmd, - "expected selector: {:?} to return journal selector: '{}', but got: {:?}", - selector, expected_label_selector, cmd - ); - } - */ + crate::collection::read::read_collection_journal(journal_client, &journal_name, bounds).await } diff --git a/crates/flowctl/src/output.rs b/crates/flowctl/src/output.rs index b43366cccb..e911668db0 100644 --- a/crates/flowctl/src/output.rs +++ b/crates/flowctl/src/output.rs @@ -15,7 +15,7 @@ pub enum OutputType { Json, /// Format output as YAML Yaml, - /// Format the output as a prett-printed table + /// Format the output as a pretty-printed table Table, } diff --git a/crates/flowctl/src/poll.rs b/crates/flowctl/src/poll.rs index 5159159052..07db9b1764 100644 --- a/crates/flowctl/src/poll.rs +++ b/crates/flowctl/src/poll.rs @@ -3,7 +3,7 @@ use serde::Deserialize; // Poll an async task in `table` having `id` until it's no longer queued. // While we're waiting print out logs under `logs_token`. pub async fn poll_while_queued( - client: &postgrest::Postgrest, + client: &crate::Client, table: &str, id: models::Id, logs_token: &str, diff --git a/crates/flowctl/src/preview/journal_reader.rs b/crates/flowctl/src/preview/journal_reader.rs index 9fa27d9234..2181bf2192 100644 --- a/crates/flowctl/src/preview/journal_reader.rs +++ b/crates/flowctl/src/preview/journal_reader.rs @@ -7,7 +7,7 @@ use proto_gazette::{broker, consumer}; /// collection journals. #[derive(Clone)] pub struct Reader { - control_plane: crate::controlplane::Client, + client: crate::Client, delay: std::time::Duration, } @@ -25,9 +25,9 @@ impl Reader { /// /// `delay` is an artificial, injected delay between a read and a subsequent checkpoint. /// It emulates back-pressure and encourages amortized transactions and reductions. - pub fn new(control_plane: crate::controlplane::Client, delay: std::time::Duration) -> Self { + pub fn new(client: &crate::Client, delay: std::time::Duration) -> Self { Self { - control_plane, + client: client.clone(), delay, } } @@ -38,50 +38,42 @@ impl Reader { mut resume: proto_gazette::consumer::Checkpoint, ) -> mpsc::Receiver> { let reader = coroutines::try_coroutine(move |mut co| async move { - // We must be able to access all sourced collections. - let access_prefixes = sources - .iter() - .map(|source| source.collection.clone()) - .collect(); - - let data_plane_client = - crate::dataplane::journal_client_for(self.control_plane, access_prefixes).await?; + // Concurrently fetch authorizations for all sourced collections. + let sources = futures::future::try_join_all(sources.iter().map(|source| { + crate::client::fetch_collection_authorization(&self.client, &source.collection) + .map_ok(move |(_journal_name_prefix, client)| (source, client)) + })) + .await?; // Concurrently list the journals of every Source. - let journals: Vec<(&Source, Vec)> = - futures::future::try_join_all(sources.iter().map(|source| { - Self::list_journals(source, data_plane_client.clone()) - .map_ok(move |l| (source, l)) + let journals: Vec<(&Source, Vec, &gazette::journal::Client)> = + futures::future::try_join_all(sources.iter().map(|(source, client)| { + Self::list_journals(*source, client).map_ok(move |l| (*source, l, client)) })) .await?; - // Flatten into (binding, source, journal). - let journals: Vec<(u32, &Source, String)> = journals - .into_iter() + // Flatten into (binding, source, journal, client). + let journals: Vec<(u32, &Source, String, &gazette::journal::Client)> = journals + .iter() .enumerate() - .flat_map(|(binding, (source, journals))| { + .flat_map(|(binding, (source, journals, client))| { journals.into_iter().map(move |journal| { ( binding as u32, - source, + *source, format!("{};{}", journal.name, source.read_suffix), + *client, ) }) }) .collect(); // Map into a stream that yields lines from across all journals, as they're ready. - let mut journals = - futures::stream::select_all(journals.iter().map(|(binding, source, journal)| { - Self::read_journal_lines( - *binding, - data_plane_client.clone(), - journal, - &resume, - source, - ) - .boxed() - })); + let mut journals = futures::stream::select_all(journals.iter().map( + |(binding, source, journal, client)| { + Self::read_journal_lines(*binding, client, journal, &resume, source).boxed() + }, + )); // Reset-able timer for delivery of delayed checkpoints. let deadline = tokio::time::sleep(std::time::Duration::MAX); @@ -147,7 +139,7 @@ impl Reader { async fn list_journals( source: &Source, - client: gazette::journal::Client, + client: &gazette::journal::Client, ) -> anyhow::Result> { let resp = client .list(broker::ListRequest { @@ -179,7 +171,7 @@ impl Reader { fn read_journal_lines<'s>( binding: u32, - client: gazette::journal::Client, + client: &gazette::journal::Client, journal: &'s String, resume: &consumer::Checkpoint, source: &Source, @@ -198,7 +190,7 @@ impl Reader { .map(|b| b.seconds) .unwrap_or_default(); - let mut lines = client.read_json_lines( + let mut lines = client.clone().read_json_lines( broker::ReadRequest { journal: journal.clone(), offset, diff --git a/crates/flowctl/src/preview/mod.rs b/crates/flowctl/src/preview/mod.rs index 9568d11aa9..3d295d72ec 100644 --- a/crates/flowctl/src/preview/mod.rs +++ b/crates/flowctl/src/preview/mod.rs @@ -90,11 +90,10 @@ impl Preview { } = self; let source = build::arg_source_to_url(source, false)?; - let client = ctx.controlplane_client().await?; // TODO(johnny): validate only `name`, if presented. let (_sources, validations) = - local_specs::load_and_validate_full(client, source.as_str(), &network).await?; + local_specs::load_and_validate_full(&ctx.client, source.as_str(), &network).await?; let runtime = runtime::Runtime::new( true, // Allow local. @@ -134,7 +133,7 @@ impl Preview { } else { None }; - let journal_reader = journal_reader::Reader::new(ctx.controlplane_client().await?, delay); + let journal_reader = journal_reader::Reader::new(&ctx.client, delay); let initial_state = models::RawValue::from_str(initial_state).context("initial state is not valid JSON")?; @@ -274,8 +273,16 @@ async fn preview_capture( output_state: bool, output_apply: bool, ) -> anyhow::Result<()> { - let responses_rx = - runtime::harness::run_capture(delay, runtime, sessions, &spec, state, state_dir, timeout, output_apply); + let responses_rx = runtime::harness::run_capture( + delay, + runtime, + sessions, + &spec, + state, + state_dir, + timeout, + output_apply, + ); tokio::pin!(responses_rx); while let Some(response) = responses_rx.try_next().await? { @@ -303,7 +310,11 @@ async fn preview_capture( internal.checkpoint.unwrap_or_default(); let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); @@ -350,7 +361,11 @@ async fn preview_derivation( tracing::debug!(stats=?ops::DebugJson(stats), "flushed"); } else if let Some(derive::response::StartedCommit { state }) = response.started_commit { let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); } @@ -373,7 +388,14 @@ async fn preview_materialization( output_apply: bool, ) -> anyhow::Result<()> { let responses_rx = runtime::harness::run_materialize( - reader, runtime, sessions, &spec, state, state_dir, timeout, output_apply, + reader, + runtime, + sessions, + &spec, + state, + state_dir, + timeout, + output_apply, ); tokio::pin!(responses_rx); @@ -389,7 +411,11 @@ async fn preview_materialization( } else if let Some(materialize::response::StartedCommit { state }) = response.started_commit { let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); } diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index 9214e11c12..e977e950d1 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -1,5 +1,5 @@ use crate::{ - collection::read::{read_collection, ReadBounds}, + collection::read::ReadBounds, local_specs, ops::{OpsCollection, TaskSelector}, }; @@ -163,14 +163,13 @@ pub struct Stats { impl Stats { pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> { - let read_args = crate::ops::read_args( + crate::ops::read_task_ops_journal( + &ctx.client, &self.task.task, OpsCollection::Stats, &self.bounds, - self.uncommitted, - ); - read_collection(ctx, &read_args).await?; - Ok(()) + ) + .await } } @@ -199,8 +198,7 @@ impl Advanced { } async fn do_get(ctx: &mut crate::CliContext, Get { table, query }: &Get) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.from(table).build().query(query); + let req = ctx.client.from(table).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -211,8 +209,7 @@ async fn do_update( ctx: &mut crate::CliContext, Update { table, query, body }: &Update, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.from(table).update(body).build().query(query); + let req = ctx.client.from(table).update(body).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -227,8 +224,7 @@ async fn do_rpc( body, }: &Rpc, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.rpc(function, body).build().query(query); + let req = ctx.client.rpc(function, body.clone()).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -236,8 +232,9 @@ async fn do_rpc( } async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let resolver = local_specs::Resolver { client }; + let resolver = local_specs::Resolver { + client: ctx.client.clone(), + }; let Build { db_path, @@ -299,8 +296,7 @@ async fn do_combine( ctx: &mut crate::CliContext, Combine { source, collection }: &Combine, ) -> anyhow::Result<()> { - let (_sources, validations) = - local_specs::load_and_validate(ctx.controlplane_client().await?, source).await?; + let (_sources, validations) = local_specs::load_and_validate(&ctx.client, source).await?; let collection = match validations .built_collections diff --git a/crates/flowctl/src/raw/oauth.rs b/crates/flowctl/src/raw/oauth.rs index 112e262e91..1736827d34 100644 --- a/crates/flowctl/src/raw/oauth.rs +++ b/crates/flowctl/src/raw/oauth.rs @@ -54,6 +54,9 @@ pub async fn do_oauth( injected_values, }: &Oauth, ) -> anyhow::Result<()> { + let Some(user_access_token) = &ctx.config.user_access_token else { + anyhow::bail!("This comment can only be run when authenticated"); + }; let source = build::arg_source_to_url(source, false)?; let draft = local_specs::surface_errors(local_specs::load(&source).await.into_result())?; @@ -175,13 +178,8 @@ pub async fn do_oauth( tracing::warn!( "Make sure that your application has {redirect_uri} set as an allowed redirect URL" ); - let api = ctx - .config - .api - .as_ref() - .expect("Cannot connect to edge functions"); - let mut oauth_endpoint = api.endpoint.clone(); + let mut oauth_endpoint = ctx.config.get_pg_url().clone(); oauth_endpoint.set_path("functions/v1/oauth"); #[derive(serde::Deserialize, serde::Serialize)] @@ -192,8 +190,8 @@ pub async fn do_oauth( let authorize_response_bytes = reqwest::Client::new() .post(oauth_endpoint.clone()) - .bearer_auth(api.access_token.to_owned()) - .header("apikey", api.public_token.to_owned()) + .bearer_auth(user_access_token) + .header("apikey", ctx.config.get_pg_public_token()) .json(&serde_json::json!({ "operation": "auth-url", "connector_config": { @@ -253,8 +251,8 @@ pub async fn do_oauth( let code_response = reqwest::Client::new() .post(oauth_endpoint) - .bearer_auth(api.access_token.to_owned()) - .header("apikey", api.public_token.to_owned()) + .bearer_auth(user_access_token) + .header("apikey", ctx.config.get_pg_public_token()) .json(&code_request_body) .send() .await?