Skip to content

Commit

Permalink
refactor(rust): support ak/sk authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowySpirits committed May 28, 2023
1 parent 7a3aed9 commit 0e9fecd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ prost-types = "0.11.8"
thiserror = "1.0"
anyhow = "1.0.70"
parking_lot = "0.12"
hmac = "0.12"
hostname = "0.3.1"
os_type = "2.6.0"

Expand All @@ -67,6 +66,7 @@ mockall = "0.11.4"
mockall_double= "0.3.0"

siphasher = "0.3.10"
ring = "0.16.20"

[build-dependencies]
tonic-build = "0.9.0"
Expand Down
25 changes: 24 additions & 1 deletion rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Configuration of RocketMQ rust client.
use std::time::Duration;

use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
use std::time::Duration;

/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
Expand All @@ -34,6 +35,8 @@ pub struct ClientOption {
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
pub(crate) access_key: String,
pub(crate) secret_key: String,
}

impl Default for ClientOption {
Expand All @@ -46,6 +49,8 @@ impl Default for ClientOption {
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
access_key: "".to_string(),
secret_key: "".to_string(),
}
}
}
Expand Down Expand Up @@ -88,6 +93,24 @@ impl ClientOption {
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}

/// Get the access key
pub fn access_key(&self) -> &str {
&self.access_key
}
/// Set the access key
pub fn set_access_key(&mut self, access_key: impl Into<String>) {
self.access_key = access_key.into();
}

/// Get the secret key
pub fn secret_key(&self) -> &str {
&self.secret_key
}
/// Set the secret key
pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
self.secret_key = secret_key.into();
}
}

/// Log format for output.
Expand Down
36 changes: 35 additions & 1 deletion rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use std::collections::HashMap;

use async_trait::async_trait;
use mockall::automock;
use ring::hmac;
use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -217,6 +220,37 @@ impl Session {
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);

let date_time_result = OffsetDateTime::now_local();
let date_time = if let Ok(result) = date_time_result {
result
} else {
OffsetDateTime::now_utc()
};

let date_time = date_time.format(&Rfc3339).unwrap();

metadata.insert(
"x-mq-date-time",
AsciiMetadataValue::try_from(&date_time).unwrap(),
);

if !self.option.secret_key.is_empty() {
let key = hmac::Key::new(
hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
self.option.secret_key.as_bytes(),
);
let signature = hmac::sign(&key, date_time.as_bytes());
let signature = hex::encode(signature.as_ref());
let authorization = format!(
"MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}",
self.option.access_key, signature
);
metadata.insert(
"authorization",
AsciiMetadataValue::try_from(authorization).unwrap(),
);
}
}

pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
Expand Down Expand Up @@ -458,10 +492,10 @@ impl SessionManager {

#[cfg(test)]
mod tests {
use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;

use crate::conf::ProducerOption;
use crate::log::terminal_logger;
use crate::util::build_producer_settings;

Expand Down

0 comments on commit 0e9fecd

Please sign in to comment.