Skip to content

Commit

Permalink
Refactor to fix NOAUTH propagation
Browse files Browse the repository at this point in the history
* Removed ConnectionManager which introduced unnecessary constraints.
* Split RedisError into RedisError (upstream) and TransformError.
* Replaced Subject+Credential with UsernamePasswordToken.
* Re-enabled authenticated connection multiplexing.
  • Loading branch information
XA21X committed Jun 17, 2021
1 parent b203341 commit 9348d6c
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 366 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ wasmer-runtime = "0.17.1"
sodiumoxide = "0.2.5"
rusoto_kms = "0.46.0"
rusoto_signature = "0.46.0"
crossbeam-utils = "0.8.4"

[dev-dependencies]
criterion = "0.3"
Expand Down
11 changes: 3 additions & 8 deletions src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub struct Wrapper<'a> {
// pub next_transform: usize,
transforms: Vec<&'a mut Transforms>,
pub client_details: String,
chain_name: String
chain_name: String,
}

impl<'a> Clone for Wrapper<'a> {
Expand All @@ -268,7 +268,7 @@ impl<'a> Clone for Wrapper<'a> {
message: self.message.clone(),
transforms: vec![],
client_details: self.client_details.clone(),
chain_name: self.chain_name.clone()
chain_name: self.chain_name.clone(),
}
}
}
Expand Down Expand Up @@ -359,10 +359,5 @@ pub trait Transform: Send {
}

pub type ResponseFuturesOrdered = FuturesOrdered<
Pin<
Box<
dyn Future<Output = Result<(Message, Result<Messages, anyhow::Error>)>>
+ std::marker::Send,
>,
>,
Pin<Box<dyn Future<Output = Result<(Message, Result<Messages>)>> + std::marker::Send>>,
>;
65 changes: 31 additions & 34 deletions src/transforms/redis_transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ pub mod redis_cluster;
pub mod redis_codec_destination;
pub mod timestamp_tagging;

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug)]
pub enum RedisError {
// TODO: Refactor to avoid mixing levels.

// High level errors.
#[error("authentication is required")]
NotAuthenticated,

Expand All @@ -21,26 +18,28 @@ pub enum RedisError {
#[error("username or password is incorrect")]
BadCredentials,

// TODO: Figure out how to capture context from errors.
// #[error("could not connect to {address} due to {source:?}")]
// ConnectionError {
// address: String,
// #[source]
// source: anyhow::Error,
// },

// Low level errors.
#[error("send error")]
SendError(String),
#[error("unknown error: {0}")]
Unknown(String),
}

#[error("receive error")]
ReceiveError(String),
impl RedisError {
fn from_message(error: &str) -> RedisError {
match error.splitn(2, ' ').next() {
Some("NOAUTH") => RedisError::NotAuthenticated,
Some("NOPERM") => RedisError::NotAuthorized,
Some("WRONGPASS") => RedisError::BadCredentials,
_ => RedisError::Unknown(error.to_string()),
}
}
}

#[error("protocol error")]
ProtocolError(String),
#[derive(thiserror::Error, Debug)]
pub enum TransformError {
#[error(transparent)]
Upstream(#[from] RedisError),

#[error("unknown: {0}")]
Unknown(String),
#[error("protocol error: {0}")]
Protocol(String),

#[error("io error: {0}")]
IO(io::Error),
Expand All @@ -49,24 +48,22 @@ pub enum RedisError {
Other(#[from] anyhow::Error),
}

impl RedisError {
fn from_message(error: &str) -> RedisError {
match error.splitn(2, ' ').next() {
Some(name) => match name {
"NOAUTH" => RedisError::NotAuthenticated,
"NOPERM" => RedisError::NotAuthorized,
"WRONGPASS" => RedisError::BadCredentials,
_ => RedisError::Unknown(error.to_string()),
},
None => RedisError::Unknown(error.to_string()),
impl TransformError {
fn choose_upstream(errors: Vec<TransformError>) -> Option<TransformError> {
match errors.iter().find_map(|e| match e {
TransformError::Upstream(e) => Some(e),
_ => None,
}) {
Some(e) => Some(TransformError::Upstream(e.clone())),
None => errors.into_iter().next(),
}
}
}

impl From<ConnectionError<RedisError>> for RedisError {
fn from(error: ConnectionError<RedisError>) -> Self {
impl From<ConnectionError<TransformError>> for TransformError {
fn from(error: ConnectionError<TransformError>) -> Self {
match error {
ConnectionError::IO(e) => RedisError::IO(e),
ConnectionError::IO(e) => TransformError::IO(e),
ConnectionError::Authenticator(e) => e,
}
}
Expand Down
Loading

0 comments on commit 9348d6c

Please sign in to comment.