Skip to content

Commit

Permalink
flowctl: refactor config and support new collection & task authorizat…
Browse files Browse the repository at this point in the history
…ions

This change introduces the agent API to `flowctl`, which is the
proverbial straw which motivated a deeper refactor of flowctl
configuration.

As a headline feature, `flowctl` supports the new task and collection
authorization APIs and uses them in support of serving existing
subcommands for reading collections, previews, and read ops logs or
stats.

Clean up management of access and refresh tokens by obtaining access
tokens or generating refresh tokens prior to calling into a particular
sub-command. Preserve the ability to run `flowctl` in an unauthenticated
mode.

Make it easier to use `flowctl` against a local stack by introducing
alternative defaults when running under a "local" profile.

Also fix handling of single-use refresh tokens, where we must retain the
updated secret after using it to generate a new access token. We could
now consider having `flowctl` create single-use refresh tokens rather
than multi-use ones, but I didn't want to take that step just yet.

Also fix mis-ordering of output when reading journals.

Also fix OffsetNotYetAvailable error when reading a journal in non-blocking mode.

Issue #1627
  • Loading branch information
jgraettinger committed Sep 17, 2024
1 parent becb718 commit ca30917
Show file tree
Hide file tree
Showing 26 changed files with 671 additions and 779 deletions.
39 changes: 7 additions & 32 deletions crates/flowctl/src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ mod roles;

use anyhow::Context;

use crate::controlplane;

#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
pub struct Auth {
Expand Down Expand Up @@ -47,21 +45,6 @@ pub enum Command {
/// Unlike 'read' or 'write', the subject of an 'admin' grant also inherits
/// capabilities granted to the object role from still-other roles.
Roles(roles::Roles),

/// Fetches and prints an auth token that can be used to access a Flow data plane.
///
/// The returned token can be used to access the Flow data plane with 3rd party tools.
/// For example, you can use curl to access a private port of a running task by running:
/// ```ignore
/// curl -H "Authorization: Bearer $(flowctl auth data-plane-access-token --prefix myTenant/)" https://myPort.myHost.data-plane.example/
/// ```
DataPlaneAccessToken(DataPlaneAccessToken),
}

#[derive(Debug, clap::Args)]
pub struct DataPlaneAccessToken {
#[clap(long, required = true)]
prefix: Vec<String>,
}

#[derive(Debug, clap::Args)]
Expand All @@ -76,20 +59,23 @@ impl Auth {
match &self.cmd {
Command::Login => do_login(ctx).await,
Command::Token(Token { token }) => {
controlplane::configure_new_access_token(ctx, token.clone()).await?;
ctx.config.user_access_token = Some(token.clone());
println!("Configured access token.");
Ok(())
}
Command::Roles(roles) => roles.run(ctx).await,
Command::DataPlaneAccessToken(args) => do_data_plane_access_token(ctx, args).await,
}
}
}

async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
use crossterm::tty::IsTty;

let url = ctx.config().get_dashboard_url("/admin/api")?.to_string();
let url = ctx
.config
.get_dashboard_url()
.join("/admin/api")?
.to_string();

println!("\nOpening browser to: {url}");
if let Err(_) = open::that(&url) {
Expand Down Expand Up @@ -118,7 +104,7 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {

// Copied credentials will often accidentally contain extra whitespace characters.
let token = token.trim().to_string();
controlplane::configure_new_access_token(ctx, token).await?;
ctx.config.user_access_token = Some(token);
println!("\nConfigured access token.");
Ok(())
} else {
Expand All @@ -131,14 +117,3 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
);
}
}

async fn do_data_plane_access_token(
ctx: &mut crate::CliContext,
args: &DataPlaneAccessToken,
) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;
let access =
crate::dataplane::fetch_data_plane_access_token(client, args.prefix.clone()).await?;
println!("{}", access.auth_token);
Ok(())
}
15 changes: 5 additions & 10 deletions crates/flowctl/src/auth/roles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ pub async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
}
}
let rows: Vec<Row> = api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("combined_grants_ext")
.select(
vec![
Expand Down Expand Up @@ -177,8 +176,7 @@ pub async fn do_grant(
// Upsert user grants to `user_grants` and role grants to `role_grants`.
let rows: Vec<GrantRevokeRow> = if let Some(subject_user_id) = subject_user_id {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("user_grants")
.select(grant_revoke_columns())
.upsert(
Expand All @@ -195,8 +193,7 @@ pub async fn do_grant(
.await?
} else if let Some(subject_role) = subject_role {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("role_grants")
.select(grant_revoke_columns())
.upsert(
Expand Down Expand Up @@ -231,8 +228,7 @@ pub async fn do_revoke(
// Revoke user grants from `user_grants` and role grants from `role_grants`.
let rows: Vec<GrantRevokeRow> = if let Some(subject_user_id) = subject_user_id {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("user_grants")
.select(grant_revoke_columns())
.eq("user_id", subject_user_id.to_string())
Expand All @@ -242,8 +238,7 @@ pub async fn do_revoke(
.await?
} else if let Some(subject_role) = subject_role {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("role_grants")
.select(grant_revoke_columns())
.eq("subject_role", subject_role)
Expand Down
10 changes: 4 additions & 6 deletions crates/flowctl/src/catalog/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ pub async fn do_delete(
type_selector: type_selector.clone(),
};

let client = ctx.controlplane_client().await?;
let specs = catalog::fetch_live_specs::<catalog::LiveSpecRow>(
client.clone(),
&ctx.client,
&list_args,
vec![
"id",
Expand All @@ -98,7 +97,7 @@ pub async fn do_delete(
anyhow::bail!("delete operation cancelled");
}

let draft = draft::create_draft(client.clone())
let draft = draft::create_draft(&ctx.client)
.await
.context("failed to create draft")?;
println!(
Expand All @@ -121,8 +120,7 @@ pub async fn do_delete(
.collect::<Vec<DraftSpec>>();

api_exec::<Vec<serde_json::Value>>(
ctx.controlplane_client()
.await?
ctx.client
.from("draft_specs")
//.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&draft_specs).unwrap())
Expand All @@ -131,7 +129,7 @@ pub async fn do_delete(
.await?;
tracing::debug!("added deletions to draft");

draft::publish(client.clone(), "", draft.id, false).await?;
draft::publish(&ctx.client, "", draft.id, false).await?;

// extra newline before, since `publish` will output a bunch of logs
println!("\nsuccessfully deleted {} spec(s)", draft_specs.len());
Expand Down
23 changes: 9 additions & 14 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod pull_specs;
mod test;

use crate::{
api_exec, api_exec_paginated, controlplane,
api_exec, api_exec_paginated,
output::{to_table_row, CliOutput, JsonCell},
};
use anyhow::Context;
Expand Down Expand Up @@ -226,7 +226,7 @@ impl Catalog {
/// # Panics
/// If the name_selector `name` and `prefix` are both non-empty.
pub async fn fetch_live_specs<T>(
cp_client: controlplane::Client,
client: &crate::Client,
list: &List,
columns: Vec<&'static str>,
) -> anyhow::Result<Vec<T>>
Expand All @@ -242,7 +242,7 @@ where
panic!("cannot specify both 'name' and 'prefix' for filtering live specs");
}

let builder = cp_client.from("live_specs_ext").select(columns.join(","));
let builder = client.from("live_specs_ext").select(columns.join(","));
let builder = list.type_selector.add_spec_type_filters(builder);

// Drive the actual request(s) based on the name selector, since the arguments there may
Expand Down Expand Up @@ -448,8 +448,7 @@ async fn do_list(ctx: &mut crate::CliContext, list_args: &List) -> anyhow::Resul
columns.push("reads_from");
columns.push("writes_to");
}
let client = ctx.controlplane_client().await?;
let rows = fetch_live_specs::<LiveSpecRow>(client, list_args, columns).await?;
let rows = fetch_live_specs::<LiveSpecRow>(&ctx.client, list_args, columns).await?;

ctx.write_all(rows, list_args.flows)
}
Expand Down Expand Up @@ -499,8 +498,7 @@ async fn do_history(ctx: &mut crate::CliContext, History { name }: &History) ->
}
}
let rows: Vec<Row> = api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("publication_specs_ext")
.like("catalog_name", format!("{name}%"))
.select(
Expand Down Expand Up @@ -531,7 +529,7 @@ async fn do_draft(
publication_id,
}: &Draft,
) -> anyhow::Result<()> {
let draft_id = ctx.config().cur_draft()?;
let draft_id = ctx.config.selected_draft()?;

#[derive(Deserialize)]
struct Row {
Expand All @@ -550,8 +548,7 @@ async fn do_draft(
spec_type,
} = if let Some(publication_id) = publication_id {
api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("publication_specs_ext")
.eq("catalog_name", name)
.eq("pub_id", publication_id.to_string())
Expand All @@ -561,8 +558,7 @@ async fn do_draft(
.await?
} else {
api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("live_specs")
.eq("catalog_name", name)
.not("is", "spec_type", "null")
Expand Down Expand Up @@ -596,8 +592,7 @@ async fn do_draft(
tracing::debug!(?draft_spec, "inserting draft");

let rows: Vec<SpecSummaryItem> = api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&draft_spec).unwrap())
Expand Down
24 changes: 11 additions & 13 deletions crates/flowctl/src/catalog/publish.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{catalog::SpecSummaryItem, controlplane, draft, local_specs, CliContext};
use crate::{catalog::SpecSummaryItem, draft, local_specs, CliContext};
use anyhow::Context;

#[derive(Debug, clap::Args)]
Expand All @@ -24,19 +24,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<
// in common error scenarios. For example, we don't create the draft until after bundling, because
// then we'd have to clean up the empty draft if the bundling fails. The very first thing is to create the client,
// since that can fail due to missing/expired credentials.
let client = ctx.controlplane_client().await?;

anyhow::ensure!(args.auto_approve || std::io::stdin().is_tty(), "The publish command must be run interactively unless the `--auto-approve` flag is provided");

let (draft_catalog, _validations) =
local_specs::load_and_validate(client.clone(), &args.source).await?;
local_specs::load_and_validate(&ctx.client, &args.source).await?;

let draft = draft::create_draft(client.clone()).await?;
let draft = draft::create_draft(&ctx.client).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?;
draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;

let removed = draft::remove_unchanged(&client, draft.id).await?;
let removed = draft::remove_unchanged(&ctx.client, draft.id).await?;
if !removed.is_empty() {
println!("The following specs are identical to the currently published specs, and have been pruned from the draft:");
for name in removed.iter() {
Expand All @@ -50,7 +48,7 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<

if summary.is_empty() {
println!("No specs would be changed by this publication, nothing to publish.");
try_delete_draft(client, draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
return Ok(());
}

Expand All @@ -59,17 +57,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<

if !(args.auto_approve || prompt_to_continue().await) {
println!("\nCancelling");
try_delete_draft(client.clone(), draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
anyhow::bail!("publish cancelled");
}
println!("Proceeding to publish...");

let publish_result =
draft::publish(client.clone(), &args.default_data_plane, draft.id, false).await;
draft::publish(&ctx.client, &args.default_data_plane, draft.id, false).await;
// The draft will have been deleted automatically if the publish was successful.
if let Err(err) = publish_result.as_ref() {
tracing::error!(draft_id = %draft.id, error = %err, "publication error");
try_delete_draft(client, draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
}
publish_result.context("Publish failed")?;
println!("\nPublish successful");
Expand All @@ -90,8 +88,8 @@ async fn prompt_to_continue() -> bool {
}
}

async fn try_delete_draft(client: controlplane::Client, draft_id: models::Id) {
if let Err(del_err) = draft::delete_draft(client.clone(), draft_id).await {
async fn try_delete_draft(client: &crate::Client, draft_id: models::Id) {
if let Err(del_err) = draft::delete_draft(client, draft_id).await {
tracing::error!(draft_id = %draft_id, error = %del_err, "failed to delete draft");
}
}
5 changes: 2 additions & 3 deletions crates/flowctl/src/catalog/pull_specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ pub struct PullSpecs {
}

pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;
// Retrieve identified live specifications.
let live_specs = fetch_live_specs::<LiveSpecRow>(
client.clone(),
&ctx.client,
&List {
flows: false,
name_selector: args.name_selector.clone(),
Expand Down Expand Up @@ -58,7 +57,7 @@ pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Re
let sources = local_specs::indirect_and_write_resources(sources)?;

println!("Wrote {count} specifications under {target}.");
let () = local_specs::generate_files(client, sources).await?;
let () = local_specs::generate_files(&ctx.client, sources).await?;

Ok(())
}
12 changes: 5 additions & 7 deletions crates/flowctl/src/catalog/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@ pub struct TestArgs {
/// and discoverable to users. There's also no need for any confirmation steps, since we're not
/// actually modifying the published specs.
pub async fn do_test(ctx: &mut CliContext, args: &TestArgs) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;

let (draft_catalog, _validations) =
local_specs::load_and_validate(client.clone(), &args.source).await?;
local_specs::load_and_validate(&ctx.client, &args.source).await?;

let draft = draft::create_draft(client.clone()).await?;
let draft = draft::create_draft(&ctx.client).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
let spec_rows = draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?;
let spec_rows = draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;
println!("Running tests for catalog items:");
ctx.write_all(spec_rows, ())?;
println!("Starting tests...");

// Technically, test is just a publish with the dry-run flag set to true.
let publish_result =
draft::publish(client.clone(), &args.default_data_plane, draft.id, true).await;
draft::publish(&ctx.client, &args.default_data_plane, draft.id, true).await;

if let Err(del_err) = draft::delete_draft(client.clone(), draft.id).await {
if let Err(del_err) = draft::delete_draft(&ctx.client, draft.id).await {
tracing::error!(draft_id = %draft.id, error = %del_err, "failed to delete draft");
}
publish_result.context("Tests failed")?;
Expand Down
Loading

0 comments on commit ca30917

Please sign in to comment.