Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #526] feat(rust): support ak/sk authorization #527

Merged
merged 7 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ prost-types = "0.11.8"
thiserror = "1.0"
anyhow = "1.0.70"
parking_lot = "0.12"
hmac = "0.12"
hostname = "0.3.1"
os_type = "2.6.0"

Expand All @@ -67,6 +66,7 @@ mockall = "0.11.4"
mockall_double= "0.3.0"

siphasher = "0.3.10"
ring = "0.16.20"

[build-dependencies]
tonic-build = "0.9.0"
Expand Down
25 changes: 24 additions & 1 deletion rust/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Configuration of RocketMQ rust client.

use std::time::Duration;

use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
use std::time::Duration;

/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
Expand All @@ -34,6 +35,8 @@ pub struct ClientOption {
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
pub(crate) access_key: String,
pub(crate) secret_key: String,
}

impl Default for ClientOption {
Expand All @@ -46,6 +49,8 @@ impl Default for ClientOption {
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
access_key: "".to_string(),
ShadowySpirits marked this conversation as resolved.
Show resolved Hide resolved
secret_key: "".to_string(),
}
}
}
Expand Down Expand Up @@ -88,6 +93,24 @@ impl ClientOption {
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}

/// Get the access key
pub fn access_key(&self) -> &str {
&self.access_key
}
/// Set the access key
pub fn set_access_key(&mut self, access_key: impl Into<String>) {
self.access_key = access_key.into();
}

/// Get the secret key
pub fn secret_key(&self) -> &str {
&self.secret_key
}
/// Set the secret key
pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
self.secret_key = secret_key.into();
}
}

/// Log format for output.
Expand Down
36 changes: 35 additions & 1 deletion rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use std::collections::HashMap;

use async_trait::async_trait;
use mockall::automock;
use ring::hmac;
use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -217,6 +220,37 @@ impl Session {
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);

let date_time_result = OffsetDateTime::now_local();
let date_time = if let Ok(result) = date_time_result {
result
} else {
OffsetDateTime::now_utc()
};

let date_time = date_time.format(&Rfc3339).unwrap();

metadata.insert(
"x-mq-date-time",
AsciiMetadataValue::try_from(&date_time).unwrap(),
);

if !self.option.secret_key.is_empty() {
let key = hmac::Key::new(
hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
self.option.secret_key.as_bytes(),
);
let signature = hmac::sign(&key, date_time.as_bytes());
let signature = hex::encode(signature.as_ref());
let authorization = format!(
"MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}",
self.option.access_key, signature
);
metadata.insert(
"authorization",
AsciiMetadataValue::try_from(authorization).unwrap(),
);
}
}

pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
Expand Down Expand Up @@ -458,10 +492,10 @@ impl SessionManager {

#[cfg(test)]
mod tests {
use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;

use crate::conf::ProducerOption;
use crate::log::terminal_logger;
use crate::util::build_producer_settings;

Expand Down