Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Only accept URIs in basin+stream args #100

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,30 @@ impl ServiceError {
}
}

#[derive(Debug, Error, Diagnostic)]
pub enum BasinNameOrUriParseError {
#[error(transparent)]
#[diagnostic(help("Are you trying to operate on an invalid basin?"))]
BasinName(#[from] ConvertError),
#[derive(Debug, Error)]
pub enum S2UriParseError {
#[error("S2 URI must begin with `s2://`")]
MissingUriScheme,
#[error("Invalid S2 URI scheme `{0}://`. Must be `s2://`")]
InvalidUriScheme(String),
#[error("{0}")]
InvalidBasinName(ConvertError),
#[error("Only basin name expected but found both basin and stream names")]
UnexpectedStreamName,
#[error("Missing stream name in S2 URI")]
MissingStreamName,
}

#[error("Invalid S2 URI: {0}")]
#[diagnostic(transparent)]
InvalidUri(miette::Report),
#[cfg(test)]
impl PartialEq for S2UriParseError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::MissingUriScheme, Self::MissingUriScheme) => true,
(Self::InvalidUriScheme(s), Self::InvalidUriScheme(o)) if s.eq(o) => true,
(Self::InvalidBasinName(_), Self::InvalidBasinName(_)) => true,
(Self::MissingStreamName, Self::MissingStreamName) => true,
(Self::UnexpectedStreamName, Self::UnexpectedStreamName) => true,
_ => false,
}
}
}
100 changes: 50 additions & 50 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use tokio_stream::{
use tracing::trace;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{
BasinConfig, BasinNameAndMaybeStreamUri, BasinNameAndStreamArgs, BasinNameOnlyUri,
StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH,
BasinConfig, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StreamConfig,
RETENTION_POLICY_PATH, STORAGE_CLASS_PATH,
};

mod account;
Expand Down Expand Up @@ -96,7 +96,7 @@ enum Commands {
/// Create a basin.
CreateBasin {
/// Name of the basin to create.
basin: BasinNameOnlyUri,
basin: S2BasinUri,

#[command(flatten)]
config: BasinConfig,
Expand All @@ -105,19 +105,19 @@ enum Commands {
/// Delete a basin.
DeleteBasin {
/// Name of the basin to delete.
basin: BasinNameOnlyUri,
basin: S2BasinUri,
},

/// Get basin config.
GetBasinConfig {
/// Basin name to get config for.
basin: BasinNameOnlyUri,
basin: S2BasinUri,
},

/// Reconfigure a basin.
ReconfigureBasin {
/// Name of the basin to reconfigure.
basin: BasinNameOnlyUri,
basin: S2BasinUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -129,7 +129,7 @@ enum Commands {
ListStreams {
/// Name of the basin to manage or S2 URI with basin and prefix.
#[arg(value_name = "BASIN|S2_URI")]
basin: BasinNameAndMaybeStreamUri,
uri: S2BasinAndMaybeStreamUri,

/// Filter to stream names that begin with this prefix.
#[arg(short = 'p', long)]
Expand All @@ -146,8 +146,8 @@ enum Commands {

/// Create a stream.
CreateStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -157,20 +157,20 @@ enum Commands {
/// Delete a stream.
#[command(alias = "rm")]
DeleteStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Get stream config.
GetStreamConfig {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Reconfigure a stream.
ReconfigureStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -179,17 +179,17 @@ enum Commands {

/// Get the next sequence number that will be assigned by a stream.
CheckTail {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Set the trim point for the stream.
///
/// Trimming is eventually consistent, and trimmed records may be visible
/// for a brief period.
Trim {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Earliest sequence number that should be retained.
/// This sequence number is only allowed to advance,
Expand All @@ -213,8 +213,8 @@ enum Commands {
/// Note that fencing is a cooperative mechanism,
/// and it is only enforced when a token is provided.
Fence {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// New fencing token specified in hex.
/// It may be upto 16 bytes, and can be empty.
Expand All @@ -234,8 +234,8 @@ enum Commands {
///
/// Currently, only newline delimited records are supported.
Append {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Enforce fencing token specified in hex.
#[arg(short = 'f', long, value_parser = parse_fencing_token)]
Expand All @@ -257,8 +257,8 @@ enum Commands {
/// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream.
/// If a limit is not specified, the reader will keep tailing and wait for new records.
Read {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Starting sequence number (inclusive).
#[arg(short = 's', long, default_value_t = 0)]
Expand All @@ -280,8 +280,8 @@ enum Commands {

/// Ping the stream to get append acknowledgement and end-to-end latencies.
Ping {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Send a batch after this interval.
///
Expand Down Expand Up @@ -532,15 +532,15 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::ListStreams {
basin,
uri,
prefix,
start_after,
limit,
} => {
let BasinNameAndMaybeStreamUri {
let S2BasinAndMaybeStreamUri {
basin,
stream: maybe_prefix,
} = basin;
} = uri;
let prefix = match (maybe_prefix, prefix) {
(Some(_), Some(_)) => {
return Err(S2CliError::InvalidArgs(miette::miette!(
Expand Down Expand Up @@ -582,8 +582,8 @@ async fn run() -> Result<(), S2CliError> {
}
}

Commands::CreateStream { args, config } => {
let (basin, stream) = args.try_into_parts()?;
Commands::CreateStream { uri, config } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -593,8 +593,8 @@ async fn run() -> Result<(), S2CliError> {
eprintln!("{}", "✓ Stream created".green().bold());
}

Commands::DeleteStream { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::DeleteStream { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -604,8 +604,8 @@ async fn run() -> Result<(), S2CliError> {
eprintln!("{}", "✓ Stream deletion requested".green().bold());
}

Commands::GetStreamConfig { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::GetStreamConfig { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -616,8 +616,8 @@ async fn run() -> Result<(), S2CliError> {
println!("{}", serde_json::to_string_pretty(&config)?);
}

Commands::ReconfigureStream { args, config } => {
let (basin, stream) = args.try_into_parts()?;
Commands::ReconfigureStream { uri, config } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -640,8 +640,8 @@ async fn run() -> Result<(), S2CliError> {
println!("{}", serde_json::to_string_pretty(&config)?);
}

Commands::CheckTail { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::CheckTail { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -650,12 +650,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Trim {
args,
uri,
trim_point,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -675,12 +675,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Fence {
args,
uri,
new_fencing_token,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -700,12 +700,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Append {
args,
uri,
input,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand Down Expand Up @@ -763,13 +763,13 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Read {
args,
uri,
start_seq_num,
output,
limit_count,
limit_bytes,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand Down Expand Up @@ -889,12 +889,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Ping {
args,
uri,
interval,
batch_bytes,
num_batches,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream));
Expand Down
Loading
Loading