From a6c826e15463bfe55f6a69bd3e6d52988020f249 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Sun, 28 May 2023 16:36:42 +0800 Subject: [PATCH 1/7] feat(rust): support ak/sk authorization Signed-off-by: SSpirits --- rust/Cargo.toml | 2 +- rust/src/conf.rs | 25 ++++++++++++++++++++++++- rust/src/session.rs | 36 +++++++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 80c06fe23..d7655d1ff 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -43,7 +43,6 @@ prost-types = "0.11.8" thiserror = "1.0" anyhow = "1.0.70" parking_lot = "0.12" -hmac = "0.12" hostname = "0.3.1" os_type = "2.6.0" @@ -67,6 +66,7 @@ mockall = "0.11.4" mockall_double= "0.3.0" siphasher = "0.3.10" +ring = "0.16.20" [build-dependencies] tonic-build = "0.9.0" diff --git a/rust/src/conf.rs b/rust/src/conf.rs index 7ecb69179..55d53ed4c 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -17,12 +17,13 @@ //! Configuration of RocketMQ rust client. +use std::time::Duration; + use crate::model::common::ClientType; #[allow(unused_imports)] use crate::producer::Producer; #[allow(unused_imports)] use crate::simple_consumer::SimpleConsumer; -use std::time::Duration; /// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy. #[derive(Debug, Clone)] @@ -34,6 +35,8 @@ pub struct ClientOption { pub(crate) enable_tls: bool, pub(crate) timeout: Duration, pub(crate) long_polling_timeout: Duration, + pub(crate) access_key: String, + pub(crate) secret_key: String, } impl Default for ClientOption { @@ -46,6 +49,8 @@ impl Default for ClientOption { enable_tls: true, timeout: Duration::from_secs(3), long_polling_timeout: Duration::from_secs(40), + access_key: "".to_string(), + secret_key: "".to_string(), } } } @@ -88,6 +93,24 @@ impl ClientOption { pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) { self.long_polling_timeout = long_polling_timeout; } + + /// Get the access key + pub fn access_key(&self) -> &str { + &self.access_key + } + /// Set the access key + pub fn set_access_key(&mut self, access_key: impl Into) { + self.access_key = access_key.into(); + } + + /// Get the secret key + pub fn secret_key(&self) -> &str { + &self.secret_key + } + /// Set the secret key + pub fn set_secret_key(&mut self, secret_key: impl Into) { + self.secret_key = secret_key.into(); + } } /// Log format for output. diff --git a/rust/src/session.rs b/rust/src/session.rs index 0660931f5..ed6de23b6 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -18,7 +18,10 @@ use std::collections::HashMap; use async_trait::async_trait; use mockall::automock; +use ring::hmac; use slog::{debug, error, info, o, Logger}; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; @@ -217,6 +220,37 @@ impl Session { "x-mq-protocol-version", AsciiMetadataValue::from_static(PROTOCOL_VERSION), ); + + let date_time_result = OffsetDateTime::now_local(); + let date_time = if let Ok(result) = date_time_result { + result + } else { + OffsetDateTime::now_utc() + }; + + let date_time = date_time.format(&Rfc3339).unwrap(); + + metadata.insert( + "x-mq-date-time", + AsciiMetadataValue::try_from(&date_time).unwrap(), + ); + + if !self.option.secret_key.is_empty() { + let key = hmac::Key::new( + hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + self.option.secret_key.as_bytes(), + ); + let signature = hmac::sign(&key, date_time.as_bytes()); + let signature = hex::encode(signature.as_ref()); + let authorization = format!( + "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", + self.option.access_key, signature + ); + metadata.insert( + "authorization", + AsciiMetadataValue::try_from(authorization).unwrap(), + ); + } } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { @@ -458,10 +492,10 @@ impl SessionManager { #[cfg(test)] mod tests { - use crate::conf::ProducerOption; use slog::debug; use wiremock_grpc::generate; + use crate::conf::ProducerOption; use crate::log::terminal_logger; use crate::util::build_producer_settings; From e0bbda2603116e44c42c9197d630d6a19b64d37d Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 1 Jun 2023 19:37:12 +0800 Subject: [PATCH 2/7] feat(rust): change ak/sk type to option Signed-off-by: SSpirits --- rust/src/conf.rs | 20 ++++++++++---------- rust/src/session.rs | 7 ++++--- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/rust/src/conf.rs b/rust/src/conf.rs index 55d53ed4c..95b7adcb0 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -35,8 +35,8 @@ pub struct ClientOption { pub(crate) enable_tls: bool, pub(crate) timeout: Duration, pub(crate) long_polling_timeout: Duration, - pub(crate) access_key: String, - pub(crate) secret_key: String, + pub(crate) access_key: Option, + pub(crate) secret_key: Option, } impl Default for ClientOption { @@ -49,8 +49,8 @@ impl Default for ClientOption { enable_tls: true, timeout: Duration::from_secs(3), long_polling_timeout: Duration::from_secs(40), - access_key: "".to_string(), - secret_key: "".to_string(), + access_key: None, + secret_key: None, } } } @@ -95,21 +95,21 @@ impl ClientOption { } /// Get the access key - pub fn access_key(&self) -> &str { - &self.access_key + pub fn access_key(&self) -> Option<&String> { + self.access_key.as_ref() } /// Set the access key pub fn set_access_key(&mut self, access_key: impl Into) { - self.access_key = access_key.into(); + self.access_key = Some(access_key.into()); } /// Get the secret key - pub fn secret_key(&self) -> &str { - &self.secret_key + pub fn secret_key(&self) -> Option<&String> { + self.secret_key.as_ref() } /// Set the secret key pub fn set_secret_key(&mut self, secret_key: impl Into) { - self.secret_key = secret_key.into(); + self.secret_key = Some(secret_key.into()); } } diff --git a/rust/src/session.rs b/rust/src/session.rs index ed6de23b6..a38e42c2c 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -235,16 +235,17 @@ impl Session { AsciiMetadataValue::try_from(&date_time).unwrap(), ); - if !self.option.secret_key.is_empty() { + if self.option.access_key().is_some() && self.option.secret_key().is_some() { let key = hmac::Key::new( hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, - self.option.secret_key.as_bytes(), + self.option.secret_key().unwrap().as_bytes(), ); let signature = hmac::sign(&key, date_time.as_bytes()); let signature = hex::encode(signature.as_ref()); let authorization = format!( "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", - self.option.access_key, signature + self.option.access_key().unwrap(), + signature ); metadata.insert( "authorization", From c8f44e6d8c71353120ee0ee6abf009fca69bb13a Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 1 Jun 2023 12:22:36 +0000 Subject: [PATCH 3/7] Make code idiomatic Signed-off-by: Li Zhanhui --- rust/.cargo/config.toml | 2 ++ rust/src/session.rs | 37 ++++++++++++++++++++----------------- 2 files changed, 22 insertions(+), 17 deletions(-) create mode 100644 rust/.cargo/config.toml diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml new file mode 100644 index 000000000..70f9eaeb2 --- /dev/null +++ b/rust/.cargo/config.toml @@ -0,0 +1,2 @@ +[registries.crates-io] +protocol = "sparse" diff --git a/rust/src/session.rs b/rust/src/session.rs index a38e42c2c..c537e5e01 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -235,23 +235,26 @@ impl Session { AsciiMetadataValue::try_from(&date_time).unwrap(), ); - if self.option.access_key().is_some() && self.option.secret_key().is_some() { - let key = hmac::Key::new( - hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, - self.option.secret_key().unwrap().as_bytes(), - ); - let signature = hmac::sign(&key, date_time.as_bytes()); - let signature = hex::encode(signature.as_ref()); - let authorization = format!( - "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", - self.option.access_key().unwrap(), - signature - ); - metadata.insert( - "authorization", - AsciiMetadataValue::try_from(authorization).unwrap(), - ); - } + self.option + .access_key() + .zip(self.option.secret_key()) + .and_then(|(access_key, access_secret)| { + let key = hmac::Key::new( + hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + access_secret.as_bytes(), + ); + let signature = hmac::sign(&key, date_time.as_bytes()); + let signature = hex::encode(signature.as_ref()); + let authorization = format!( + "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", + access_key, signature + ); + metadata.insert( + "authorization", + AsciiMetadataValue::try_from(authorization).unwrap(), + ); + Some(()) + }); } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { From c8c8c301cde3ea33cdd562fc2811c54ce2c8edd7 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 1 Jun 2023 20:28:47 +0800 Subject: [PATCH 4/7] feat(rust): fix license Signed-off-by: SSpirits --- rust/.cargo/config.toml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml index 70f9eaeb2..311a0281e 100644 --- a/rust/.cargo/config.toml +++ b/rust/.cargo/config.toml @@ -1,2 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# [registries.crates-io] protocol = "sparse" From 64538be4d62fbc8bcb84b6c9e1d841c0446bd58e Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 1 Jun 2023 20:36:07 +0800 Subject: [PATCH 5/7] feat(rust): optimize code Signed-off-by: SSpirits --- rust/src/session.rs | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/rust/src/session.rs b/rust/src/session.rs index c537e5e01..229d9e561 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -235,26 +235,24 @@ impl Session { AsciiMetadataValue::try_from(&date_time).unwrap(), ); - self.option - .access_key() - .zip(self.option.secret_key()) - .and_then(|(access_key, access_secret)| { - let key = hmac::Key::new( - hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, - access_secret.as_bytes(), - ); - let signature = hmac::sign(&key, date_time.as_bytes()); - let signature = hex::encode(signature.as_ref()); - let authorization = format!( - "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", - access_key, signature - ); - metadata.insert( - "authorization", - AsciiMetadataValue::try_from(authorization).unwrap(), - ); - Some(()) - }); + if let Some((access_key, access_secret)) = + self.option.access_key().zip(self.option.secret_key()) + { + let key = hmac::Key::new( + hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + access_secret.as_bytes(), + ); + let signature = hmac::sign(&key, date_time.as_bytes()); + let signature = hex::encode(signature.as_ref()); + let authorization = format!( + "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}", + access_key, signature + ); + metadata.insert( + "authorization", + AsciiMetadataValue::try_from(authorization).unwrap(), + ); + } } pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> { From 782b03b3777e20b6d4595988ea5ea5e02a1c277c Mon Sep 17 00:00:00 2001 From: SSpirits Date: Fri, 2 Jun 2023 10:34:37 +0800 Subject: [PATCH 6/7] feat(rust): fix msrv test Signed-off-by: SSpirits --- rust/.cargo/Cargo.lock.min | 65 ++------------------------------------ 1 file changed, 2 insertions(+), 63 deletions(-) diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min index 9262a0806..3b2ccb864 100644 --- a/rust/.cargo/Cargo.lock.min +++ b/rust/.cargo/Cargo.lock.min @@ -136,15 +136,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "bumpalo" version = "3.12.0" @@ -210,16 +201,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crypto-common" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "ctor" version = "0.1.26" @@ -249,17 +230,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" -[[package]] -name = "digest" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" -dependencies = [ - "block-buffer", - "crypto-common", - "subtle", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -439,16 +409,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "getrandom" version = "0.2.9" @@ -521,15 +481,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "hostname" version = "0.3.1" @@ -1315,7 +1266,7 @@ dependencies = [ [[package]] name = "rocketmq" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", @@ -1323,7 +1274,6 @@ dependencies = [ "byteorder", "futures", "hex", - "hmac", "hostname", "lazy_static", "mac_address", @@ -1338,6 +1288,7 @@ dependencies = [ "prost 0.11.9", "prost-types", "regex", + "ring", "siphasher", "slog", "slog-async", @@ -1578,12 +1529,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "subtle" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" - [[package]] name = "syn" version = "1.0.109" @@ -1974,12 +1919,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "typenum" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" - [[package]] name = "unicode-ident" version = "1.0.8" From ae801a0aa6cc175d0c04ebacbdd43fe2502226a9 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 6 Jun 2023 11:49:30 +0800 Subject: [PATCH 7/7] fix(rust): fix msrv test Signed-off-by: SSpirits --- .github/workflows/rust_build.yml | 2 +- rust/src/client.rs | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml index b7e43500b..6689dd702 100644 --- a/.github/workflows/rust_build.yml +++ b/.github/workflows/rust_build.yml @@ -65,7 +65,7 @@ jobs: toolchain: ${{ matrix.msrv }} - name: Check MSRV ${{ matrix.msrv }} working-directory: ./rust - run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{ matrix.msrv }} check --locked --frozen + run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }} fetch && cargo +${{ matrix.msrv }} check --locked --frozen build: name: "${{ matrix.os }}" runs-on: ${{ matrix.os }} diff --git a/rust/src/client.rs b/rust/src/client.rs index f13c3a68d..4b601facd 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -372,15 +372,10 @@ impl Client { mut rpc_client: T, messages: Vec, ) -> Result, ClientError> { - let message_count = messages.len(); let request = SendMessageRequest { messages }; let response = rpc_client.send_message(request).await?; Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?; - if response.entries.len() != message_count { - error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count); - } - Ok(response .entries .iter()