From 0e9fecd5b44083442debc8e37eac94fff2b5624a Mon Sep 17 00:00:00 2001 From: SSpirits Date: Sun, 28 May 2023 16:36:42 +0800 Subject: [PATCH] refactor(rust): support ak/sk authorization --- rust/Cargo.toml | 2 +- rust/src/conf.rs | 25 ++++++++++++++++++++++++- rust/src/session.rs | 36 +++++++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 80c06fe23..d7655d1ff 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -43,7 +43,6 @@ prost-types = "0.11.8" thiserror = "1.0" anyhow = "1.0.70" parking_lot = "0.12" -hmac = "0.12" hostname = "0.3.1" os_type = "2.6.0" @@ -67,6 +66,7 @@ mockall = "0.11.4" mockall_double= "0.3.0" siphasher = "0.3.10" +ring = "0.16.20" [build-dependencies] tonic-build = "0.9.0" diff --git a/rust/src/conf.rs b/rust/src/conf.rs index 7ecb69179..55d53ed4c 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -17,12 +17,13 @@ //! Configuration of RocketMQ rust client. +use std::time::Duration; + use crate::model::common::ClientType; #[allow(unused_imports)] use crate::producer::Producer; #[allow(unused_imports)] use crate::simple_consumer::SimpleConsumer; -use std::time::Duration; /// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy. #[derive(Debug, Clone)] @@ -34,6 +35,8 @@ pub struct ClientOption { pub(crate) enable_tls: bool, pub(crate) timeout: Duration, pub(crate) long_polling_timeout: Duration, + pub(crate) access_key: String, + pub(crate) secret_key: String, } impl Default for ClientOption { @@ -46,6 +49,8 @@ impl Default for ClientOption { enable_tls: true, timeout: Duration::from_secs(3), long_polling_timeout: Duration::from_secs(40), + access_key: "".to_string(), + secret_key: "".to_string(), } } } @@ -88,6 +93,24 @@ impl ClientOption { pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) { self.long_polling_timeout = long_polling_timeout; } + + /// Get the access key + pub fn access_key(&self) -> &str { + &self.access_key + } + /// Set the access key + pub fn set_access_key(&mut self, access_key: impl Into) { + self.access_key = access_key.into(); + } + + /// Get the secret key + pub fn secret_key(&self) -> &str { + &self.secret_key + } + /// Set the secret key + pub fn set_secret_key(&mut self, secret_key: impl Into) { + self.secret_key = secret_key.into(); + } } /// Log format for output. diff --git a/rust/src/session.rs b/rust/src/session.rs index 0660931f5..ed6de23b6 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -18,7 +18,10 @@ use std::collections::HashMap; use async_trait::async_trait; use mockall::automock; +use ring::hmac; use slog::{debug, error, info, o, Logger}; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; @@ -217,6 +220,37 @@ impl Session { "x-mq-protocol-version", AsciiMetadataValue::from_static(PROTOCOL_VERSION), ); + + let date_time_result = OffsetDateTime::now_local(); + let date_time = if let Ok(result) = date_time_result { + result + } else { + OffsetDateTime::now_utc() + }; + + let date_time = date_time.format(&Rfc3339).unwrap(); + + metadata.insert( + "x-mq-date-time", + AsciiMetadataValue::try_from(&date_time).unwrap(), + ); + + if !self.option.secret_key.is_empty() { + let key = hmac::Key::new( + hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + self.option.secret_key.as_bytes(), + ); + let signature = hmac::sign(&key, date_time.as_bytes()); + let signature = hex::encode(signature.as_ref()); + let authorization = format!( + "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", + self.option.access_key, signature + ); + metadata.insert( + "authorization", + AsciiMetadataValue::try_from(authorization).unwrap(), + ); + } } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { @@ -458,10 +492,10 @@ impl SessionManager { #[cfg(test)] mod tests { - use crate::conf::ProducerOption; use slog::debug; use wiremock_grpc::generate; + use crate::conf::ProducerOption; use crate::log::terminal_logger; use crate::util::build_producer_settings;