diff --git a/src/error.rs b/src/error.rs index 68b5e8b..ab80935 100644 --- a/src/error.rs +++ b/src/error.rs @@ -134,13 +134,30 @@ impl ServiceError { } } -#[derive(Debug, Error, Diagnostic)] -pub enum BasinNameOrUriParseError { - #[error(transparent)] - #[diagnostic(help("Are you trying to operate on an invalid basin?"))] - BasinName(#[from] ConvertError), +#[derive(Debug, Error)] +pub enum S2UriParseError { + #[error("S2 URI must begin with `s2://`")] + MissingUriScheme, + #[error("Invalid S2 URI scheme `{0}://`. Must be `s2://`")] + InvalidUriScheme(String), + #[error("{0}")] + InvalidBasinName(ConvertError), + #[error("Only basin name expected but found both basin and stream names")] + UnexpectedStreamName, + #[error("Missing stream name in S2 URI")] + MissingStreamName, +} - #[error("Invalid S2 URI: {0}")] - #[diagnostic(transparent)] - InvalidUri(miette::Report), +#[cfg(test)] +impl PartialEq for S2UriParseError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::MissingUriScheme, Self::MissingUriScheme) => true, + (Self::InvalidUriScheme(s), Self::InvalidUriScheme(o)) if s.eq(o) => true, + (Self::InvalidBasinName(_), Self::InvalidBasinName(_)) => true, + (Self::MissingStreamName, Self::MissingStreamName) => true, + (Self::UnexpectedStreamName, Self::UnexpectedStreamName) => true, + _ => false, + } + } } diff --git a/src/main.rs b/src/main.rs index 26c9192..99cc9fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,8 +37,8 @@ use tokio_stream::{ use tracing::trace; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt}; use types::{ - BasinConfig, BasinNameAndMaybeStreamUri, BasinNameAndStreamArgs, BasinNameOnlyUri, - StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH, + BasinConfig, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StreamConfig, + RETENTION_POLICY_PATH, STORAGE_CLASS_PATH, }; mod account; @@ -96,7 +96,7 @@ enum Commands { /// Create a basin. CreateBasin { /// Name of the basin to create. - basin: BasinNameOnlyUri, + basin: S2BasinUri, #[command(flatten)] config: BasinConfig, @@ -105,19 +105,19 @@ enum Commands { /// Delete a basin. DeleteBasin { /// Name of the basin to delete. - basin: BasinNameOnlyUri, + basin: S2BasinUri, }, /// Get basin config. GetBasinConfig { /// Basin name to get config for. - basin: BasinNameOnlyUri, + basin: S2BasinUri, }, /// Reconfigure a basin. ReconfigureBasin { /// Name of the basin to reconfigure. - basin: BasinNameOnlyUri, + basin: S2BasinUri, /// Configuration to apply. #[command(flatten)] @@ -129,7 +129,7 @@ enum Commands { ListStreams { /// Name of the basin to manage or S2 URI with basin and prefix. #[arg(value_name = "BASIN|S2_URI")] - basin: BasinNameAndMaybeStreamUri, + uri: S2BasinAndMaybeStreamUri, /// Filter to stream names that begin with this prefix. #[arg(short = 'p', long)] @@ -146,8 +146,8 @@ enum Commands { /// Create a stream. CreateStream { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Configuration to apply. #[command(flatten)] @@ -157,20 +157,20 @@ enum Commands { /// Delete a stream. #[command(alias = "rm")] DeleteStream { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, }, /// Get stream config. GetStreamConfig { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, }, /// Reconfigure a stream. ReconfigureStream { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Configuration to apply. #[command(flatten)] @@ -179,8 +179,8 @@ enum Commands { /// Get the next sequence number that will be assigned by a stream. CheckTail { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, }, /// Set the trim point for the stream. @@ -188,8 +188,8 @@ enum Commands { /// Trimming is eventually consistent, and trimmed records may be visible /// for a brief period. Trim { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Earliest sequence number that should be retained. /// This sequence number is only allowed to advance, @@ -213,8 +213,8 @@ enum Commands { /// Note that fencing is a cooperative mechanism, /// and it is only enforced when a token is provided. Fence { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// New fencing token specified in hex. /// It may be upto 16 bytes, and can be empty. @@ -234,8 +234,8 @@ enum Commands { /// /// Currently, only newline delimited records are supported. Append { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Enforce fencing token specified in hex. #[arg(short = 'f', long, value_parser = parse_fencing_token)] @@ -257,8 +257,8 @@ enum Commands { /// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream. /// If a limit is not specified, the reader will keep tailing and wait for new records. Read { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Starting sequence number (inclusive). #[arg(short = 's', long, default_value_t = 0)] @@ -280,8 +280,8 @@ enum Commands { /// Ping the stream to get append acknowledgement and end-to-end latencies. Ping { - #[command(flatten)] - args: BasinNameAndStreamArgs, + #[arg(value_name = "S2_URI")] + uri: S2BasinAndStreamUri, /// Send a batch after this interval. /// @@ -532,15 +532,15 @@ async fn run() -> Result<(), S2CliError> { } Commands::ListStreams { - basin, + uri, prefix, start_after, limit, } => { - let BasinNameAndMaybeStreamUri { + let S2BasinAndMaybeStreamUri { basin, stream: maybe_prefix, - } = basin; + } = uri; let prefix = match (maybe_prefix, prefix) { (Some(_), Some(_)) => { return Err(S2CliError::InvalidArgs(miette::miette!( @@ -582,8 +582,8 @@ async fn run() -> Result<(), S2CliError> { } } - Commands::CreateStream { args, config } => { - let (basin, stream) = args.try_into_parts()?; + Commands::CreateStream { uri, config } => { + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -593,8 +593,8 @@ async fn run() -> Result<(), S2CliError> { eprintln!("{}", "✓ Stream created".green().bold()); } - Commands::DeleteStream { args } => { - let (basin, stream) = args.try_into_parts()?; + Commands::DeleteStream { uri } => { + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -604,8 +604,8 @@ async fn run() -> Result<(), S2CliError> { eprintln!("{}", "✓ Stream deletion requested".green().bold()); } - Commands::GetStreamConfig { args } => { - let (basin, stream) = args.try_into_parts()?; + Commands::GetStreamConfig { uri } => { + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -616,8 +616,8 @@ async fn run() -> Result<(), S2CliError> { println!("{}", serde_json::to_string_pretty(&config)?); } - Commands::ReconfigureStream { args, config } => { - let (basin, stream) = args.try_into_parts()?; + Commands::ReconfigureStream { uri, config } => { + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -640,8 +640,8 @@ async fn run() -> Result<(), S2CliError> { println!("{}", serde_json::to_string_pretty(&config)?); } - Commands::CheckTail { args } => { - let (basin, stream) = args.try_into_parts()?; + Commands::CheckTail { uri } => { + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -650,12 +650,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Trim { - args, + uri, trim_point, fencing_token, match_seq_num, } => { - let (basin, stream) = args.try_into_parts()?; + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -675,12 +675,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Fence { - args, + uri, new_fencing_token, fencing_token, match_seq_num, } => { - let (basin, stream) = args.try_into_parts()?; + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -700,12 +700,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Append { - args, + uri, input, fencing_token, match_seq_num, } => { - let (basin, stream) = args.try_into_parts()?; + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -763,13 +763,13 @@ async fn run() -> Result<(), S2CliError> { } Commands::Read { - args, + uri, start_seq_num, output, limit_count, limit_bytes, } => { - let (basin, stream) = args.try_into_parts()?; + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -889,12 +889,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Ping { - args, + uri, interval, batch_bytes, num_batches, } => { - let (basin, stream) = args.try_into_parts()?; + let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); diff --git a/src/types.rs b/src/types.rs index cc249d4..6925035 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,124 +5,140 @@ use s2::types::BasinName; use serde::Serialize; use std::{str::FromStr, time::Duration}; -use crate::error::{BasinNameOrUriParseError, S2CliError}; +use crate::error::S2UriParseError; pub const STORAGE_CLASS_PATH: &str = "default_stream_config.storage_class"; pub const RETENTION_POLICY_PATH: &str = "default_stream_config.retention_policy"; #[derive(Debug, Clone)] -pub struct BasinNameOrUri { - pub basin: BasinName, - pub stream: S, +struct S2Uri { + basin: BasinName, + stream: Option, } -impl From> for BasinName { - fn from(value: BasinNameOrUri) -> Self { - value.basin +#[cfg(test)] +impl PartialEq for S2Uri { + fn eq(&self, other: &Self) -> bool { + self.basin.as_ref().eq(other.basin.as_ref()) && self.stream.eq(&other.stream) } } -fn parse_maybe_basin_or_uri( - s: &str, -) -> Result<(BasinName, Option), BasinNameOrUriParseError> { - match BasinName::from_str(s) { - Ok(basin) => { - // Definitely a basin name since a valid basin name cannot have `:` - // which is required for the URI. - Ok((basin, None)) - } - Err(parse_basin_err) => { - // Should definitely be a URI else error. - let uri = http::Uri::from_str(s).map_err(|_| parse_basin_err.clone())?; - - match uri.scheme_str() { - Some("s2") => (), - Some(other) => { - return Err(BasinNameOrUriParseError::InvalidUri(miette::miette!( - help = "Does the URI start with 's2://'?", - "Unsupported URI scheme '{}'", - other - ))); - } - None => { - // It's probably not an attempt to enter a URI. Safe to - // assume this is simply an invalid basin. - return Err(parse_basin_err.into()); - } - }; +impl FromStr for S2Uri { + type Err = S2UriParseError; - let basin = uri.host().ok_or_else(|| { - BasinNameOrUriParseError::InvalidUri(miette::miette!( - help = "Is there an extra '/' after 's2://'?", - "Missing basin name (URI host)" - )) - })?; - let basin = BasinName::from_str(basin)?; + fn from_str(s: &str) -> Result { + let (scheme, s) = s + .split_once("://") + .ok_or(S2UriParseError::MissingUriScheme)?; + if scheme != "s2" { + return Err(S2UriParseError::InvalidUriScheme(scheme.to_owned())); + } - let stream = uri.path().trim_start_matches('/'); + let (basin, stream) = if let Some((basin, stream)) = s.split_once("/") { let stream = if stream.is_empty() { None } else { - Some(stream.to_string()) + Some(stream.to_owned()) }; + (basin, stream) + } else { + (s, None) + }; - Ok((basin, stream)) - } + Ok(S2Uri { + basin: basin.parse().map_err(S2UriParseError::InvalidBasinName)?, + stream, + }) + } +} + +#[derive(Debug, Clone)] +pub struct S2BasinUri(pub BasinName); + +impl From for BasinName { + fn from(value: S2BasinUri) -> Self { + value.0 } } -pub type BasinNameOnlyUri = BasinNameOrUri<()>; +#[cfg(test)] +impl PartialEq for S2BasinUri { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref().eq(other.0.as_ref()) + } +} -impl FromStr for BasinNameOnlyUri { - type Err = BasinNameOrUriParseError; +impl FromStr for S2BasinUri { + type Err = S2UriParseError; fn from_str(s: &str) -> Result { - let (basin, stream) = parse_maybe_basin_or_uri(s)?; - if stream.is_none() { - Ok(Self { basin, stream: () }) - } else { - Err(BasinNameOrUriParseError::InvalidUri(miette::miette!( - help = "Try providing the basin name directly or URI like 's2://basin-name'", - "Must not contain stream name (URI path)" - ))) + match S2Uri::from_str(s) { + Ok(S2Uri { + basin, + stream: None, + }) => Ok(Self( + basin.parse().map_err(S2UriParseError::InvalidBasinName)?, + )), + Ok(S2Uri { + basin: _, + stream: Some(_), + }) => Err(S2UriParseError::UnexpectedStreamName), + Err(S2UriParseError::MissingUriScheme) => { + Ok(Self(s.parse().map_err(S2UriParseError::InvalidBasinName)?)) + } + Err(other) => Err(other), } } } -pub type BasinNameAndMaybeStreamUri = BasinNameOrUri>; +#[derive(Debug, Clone)] +pub struct S2BasinAndMaybeStreamUri { + pub basin: BasinName, + pub stream: Option, +} -impl FromStr for BasinNameAndMaybeStreamUri { - type Err = BasinNameOrUriParseError; +#[cfg(test)] +impl PartialEq for S2BasinAndMaybeStreamUri { + fn eq(&self, other: &Self) -> bool { + self.basin.as_ref().eq(other.basin.as_ref()) && self.stream.eq(&other.stream) + } +} + +impl FromStr for S2BasinAndMaybeStreamUri { + type Err = S2UriParseError; fn from_str(s: &str) -> Result { - let (basin, stream) = parse_maybe_basin_or_uri(s)?; - Ok(Self { basin, stream }) + match S2Uri::from_str(s) { + Ok(S2Uri { basin, stream }) => Ok(Self { basin, stream }), + Err(S2UriParseError::MissingUriScheme) => Ok(Self { + basin: s.parse().map_err(S2UriParseError::InvalidBasinName)?, + stream: None, + }), + Err(other) => Err(other), + } } } -#[derive(Parser, Debug, Clone)] -pub struct BasinNameAndStreamArgs { - /// Name of the basin to manage or S2 URI with basin and stream. - #[arg(value_name = "BASIN|S2_URI")] - uri: BasinNameAndMaybeStreamUri, - /// Name of the stream. - stream: Option, +#[derive(Debug, Clone)] +pub struct S2BasinAndStreamUri { + pub basin: BasinName, + pub stream: String, } -impl BasinNameAndStreamArgs { - pub fn try_into_parts(self) -> Result<(BasinName, String), S2CliError> { - let stream = match (self.stream, self.uri.stream) { - (Some(_), Some(_)) => return Err(S2CliError::InvalidArgs(miette::miette!( - help = "Make sure to provide the stream name once either in URI or as argument", - "Multiple stream names provided" - ))), - (None, None) => return Err(S2CliError::InvalidArgs(miette::miette!( - help = "Try providing the stream name as another argument or in URI like 's2://basin-name/stream/name'", - "Missing stream name" - ))), - (Some(s), None) | (None, Some(s)) => s, - }; - Ok((self.uri.basin, stream)) +#[cfg(test)] +impl PartialEq for S2BasinAndStreamUri { + fn eq(&self, other: &Self) -> bool { + self.basin.as_ref().eq(other.basin.as_ref()) && self.stream == other.stream + } +} + +impl FromStr for S2BasinAndStreamUri { + type Err = S2UriParseError; + + fn from_str(s: &str) -> Result { + let S2Uri { basin, stream } = s.parse()?; + let stream = stream.ok_or(S2UriParseError::MissingStreamName)?; + Ok(Self { basin, stream }) } } @@ -248,42 +264,122 @@ impl From for StreamConfig { #[cfg(test)] mod tests { - use std::str::FromStr; + use crate::{error::S2UriParseError, types::S2BasinAndStreamUri}; - use crate::types::BasinNameOnlyUri; - - use super::BasinNameAndMaybeStreamUri; + use super::{S2BasinAndMaybeStreamUri, S2BasinUri, S2Uri}; #[test] - fn test_basin_name_or_uri_parse() { + fn test_s2_uri_parse() { let test_cases = vec![ - ("valid-basin", Some(("valid-basin", None))), - ("s2://valid-basin", Some(("valid-basin", None))), - ("s2://valid-basin/", Some(("valid-basin", None))), + ( + "valid-basin", + Err(S2UriParseError::MissingUriScheme), + Ok(S2BasinUri("valid-basin".parse().unwrap())), + Err(S2UriParseError::MissingUriScheme), + Ok(S2BasinAndMaybeStreamUri { + basin: "valid-basin".parse().unwrap(), + stream: None, + }), + ), + ( + "s2://valid-basin", + Ok(S2Uri { + basin: "valid-basin".parse().unwrap(), + stream: None, + }), + Ok(S2BasinUri("valid-basin".parse().unwrap())), + Err(S2UriParseError::MissingStreamName), + Ok(S2BasinAndMaybeStreamUri { + basin: "valid-basin".parse().unwrap(), + stream: None, + }), + ), + ( + "s2://valid-basin/", + Ok(S2Uri { + basin: "valid-basin".parse().unwrap(), + stream: None, + }), + Ok(S2BasinUri("valid-basin".parse().unwrap())), + Err(S2UriParseError::MissingStreamName), + Ok(S2BasinAndMaybeStreamUri { + basin: "valid-basin".parse().unwrap(), + stream: None, + }), + ), ( "s2://valid-basin/stream/name", - Some(("valid-basin", Some("stream/name"))), + Ok(S2Uri { + basin: "valid-basin".parse().unwrap(), + stream: Some("stream/name".to_owned()), + }), + Err(S2UriParseError::UnexpectedStreamName), + Ok(S2BasinAndStreamUri { + basin: "valid-basin".parse().unwrap(), + stream: "stream/name".to_owned(), + }), + Ok(S2BasinAndMaybeStreamUri { + basin: "valid-basin".parse().unwrap(), + stream: Some("stream/name".to_owned()), + }), + ), + ( + "-invalid-basin", + Err(S2UriParseError::MissingUriScheme), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::MissingUriScheme), + Err(S2UriParseError::InvalidBasinName("".into())), + ), + ( + "http://valid-basin", + Err(S2UriParseError::InvalidUriScheme("http".to_owned())), + Err(S2UriParseError::InvalidUriScheme("http".to_owned())), + Err(S2UriParseError::InvalidUriScheme("http".to_owned())), + Err(S2UriParseError::InvalidUriScheme("http".to_owned())), + ), + ( + "s2://-invalid-basin", + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + ), + ( + "s2:///stream/name", + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::InvalidBasinName("".into())), + ), + ( + "random:::string", + Err(S2UriParseError::MissingUriScheme), + Err(S2UriParseError::InvalidBasinName("".into())), + Err(S2UriParseError::MissingUriScheme), + Err(S2UriParseError::InvalidBasinName("".into())), ), - ("-invalid-basin", None), - ("http://valid-basin", None), - ("s2://-invalid-basin", None), - ("s2:///stream/name", None), - ("random:::string", None), ]; - for (s, expected) in test_cases { - let b = BasinNameAndMaybeStreamUri::from_str(s); - if let Some((expected_basin, expected_stream)) = expected { - let b = b.unwrap(); - assert_eq!(b.basin.as_ref(), expected_basin); - assert_eq!(b.stream.as_deref(), expected_stream); - assert_eq!( - expected_stream.is_some(), - BasinNameOnlyUri::from_str(s).is_err() - ); - } else { - assert!(b.is_err()); - } + for ( + s, + expected_uri, + expected_basin_uri, + expected_basin_and_stream_uri, + expected_basin_and_maybe_stream_uri, + ) in test_cases + { + assert_eq!(s.parse(), expected_uri, "S2Uri: {s}"); + assert_eq!(s.parse(), expected_basin_uri, "S2BasinUri: {s}"); + assert_eq!( + s.parse(), + expected_basin_and_stream_uri, + "S2BasinAndStreamUri: {s}" + ); + assert_eq!( + s.parse(), + expected_basin_and_maybe_stream_uri, + "S2BasinAndMaybeStreamUri: {s}" + ); } } }