Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cluster websocket not send message to client #122

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:

publish-tardis:
if: startsWith(github.ref, 'refs/tags/')
needs: [check, publish-macros]
needs: [check]
runs-on: ubuntu-latest
steps:
- name: Check out the repo
Expand Down
9 changes: 7 additions & 2 deletions tardis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ k8s = ["future", "kube", "k8s-openapi"]
fs = ["tokio/fs", "tokio/io-util"]
process = ["tokio/process"]
test = ["testcontainers", "testcontainers-modules"]
tracing = ["tracing-opentelemetry", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk"]
tracing = [
"tracing-opentelemetry",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
]
tokio-console = ["console-subscriber"]
tracing-appender = ["dep:tracing-appender"]
web-server-grpc = ["web-server", "dep:poem-grpc"]
Expand Down Expand Up @@ -90,7 +95,7 @@ tokio = { version = "1", features = [
] }
tokio-util = { version = "0.7.10" }
# Tardis Macros
tardis-macros = { version = "0.1.0-rc.14", workspace = true, optional = true }
tardis-macros = { version = "0.1.0-rc.14", optional = true }
# Log
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
4 changes: 3 additions & 1 deletion tardis/src/cluster/cluster_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ where
format!("tardis/broadcast/{}", self.ident)
}
pub fn send(&self, message: T) {
// dbg!(self.local_broadcast_channel.send(message.clone()));
if let Err(result) = self.local_broadcast_channel.send(message.clone()) {
tracing::error!("[Tardis.Cluster] broadcast channel send error: {:?}", result);
}
let event = format!("tardis/broadcast/{}", self.ident);
tokio::spawn(async move {
if let Ok(json_value) = serde_json::to_value(message) {
Expand Down
19 changes: 18 additions & 1 deletion tardis/src/config/config_dto/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub use mail::*;
pub(crate) mod os;
pub use os::*;

use crate::redact::Redact;

/// # Tardis Component Configuration
///
/// common structure for components with one defualt module and many submodules
Expand Down Expand Up @@ -121,7 +123,7 @@ pub struct AdvConfig {
pub salt: String,
}

#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, TypedBuilder)]
pub struct ConfCenterConfig {
#[builder(default = "nacos".to_string())]
pub kind: String,
Expand All @@ -140,3 +142,18 @@ pub struct ConfCenterConfig {
/// config change polling interval, in milliseconds, default is 30000ms / 配置变更轮询间隔,单位毫秒, 默认30000ms
pub config_change_polling_interval: Option<u64>,
}

impl std::fmt::Debug for ConfCenterConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConfCenterConfig")
.field("kind", &self.kind)
.field("url", &self.url)
.field("username", &self.username)
.field("password", &self.password.redact())
.field("group", &self.group)
.field("format", &self.format)
.field("namespace", &self.namespace)
.field("config_change_polling_interval", &self.config_change_polling_interval)
.finish()
}
}
10 changes: 9 additions & 1 deletion tardis/src/config/config_dto/component/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use url::Url;

use crate::redact::Redact;
/// Distributed cache configuration / 分布式缓存配置
///
/// Distributed cache operations need to be enabled ```#[cfg(feature = "cache")]``` .
Expand All @@ -15,8 +17,14 @@ use url::Url;
/// ..Default::default()
///};
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, TypedBuilder)]
pub struct CacheModuleConfig {
/// Cache access Url, Url with permission information / 缓存访问Url,Url带权限信息
pub url: Url,
}

impl std::fmt::Debug for CacheModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CacheModuleConfig").field("url", &self.url.redact()).finish()
}
}
22 changes: 21 additions & 1 deletion tardis/src/config/config_dto/component/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};

use typed_builder::TypedBuilder;

use crate::redact::Redact;

/// Database module configuration / 数据库模块配置
///
/// Database operations need to be enabled ```#[cfg(feature = "reldb")]``` .
Expand All @@ -16,7 +18,7 @@ use typed_builder::TypedBuilder;
/// ..Default::default()
/// };
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, TypedBuilder)]
#[serde(default)]
pub struct DBModuleConfig {
#[builder(setter(into))]
Expand Down Expand Up @@ -45,6 +47,24 @@ impl Default for DBModuleConfig {
}
}

impl std::fmt::Debug for DBModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let url_debug = if let Ok(url) = url::Url::parse(&self.url) {
url.redact().to_string()
} else {
self.url.to_string()
};

f.debug_struct("DBModuleConfig")
.field("url", &url_debug)
.field("max_connections", &self.max_connections)
.field("min_connections", &self.min_connections)
.field("connect_timeout_sec", &self.connect_timeout_sec)
.field("idle_timeout_sec", &self.idle_timeout_sec)
.field("compatible_type", &self.compatible_type)
.finish()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
pub enum CompatibleType {
#[default]
Expand Down
17 changes: 16 additions & 1 deletion tardis/src/config/config_dto/component/mail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use serde::{Deserialize, Serialize};

use typed_builder::TypedBuilder;

use crate::redact::Redact;

/// Mail module configuration / 邮件模块配置
///
#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, TypedBuilder)]
#[serde(default)]
pub struct MailModuleConfig {
/// SMTP host
Expand All @@ -27,6 +29,19 @@ pub struct MailModuleConfig {
pub starttls: bool,
}

impl std::fmt::Debug for MailModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MailModuleConfig")
.field("smtp_host", &self.smtp_host)
.field("smtp_port", &self.smtp_port)
.field("smtp_username", &self.smtp_username)
.field("smtp_password", &self.smtp_password.redact())
.field("default_from", &self.default_from)
.field("starttls", &self.starttls)
.finish()
}
}

impl Default for MailModuleConfig {
fn default() -> Self {
MailModuleConfig::builder().build()
Expand Down
10 changes: 9 additions & 1 deletion tardis/src/config/config_dto/component/mq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use url::Url;

use crate::redact::Redact;

/// Message queue configuration / 消息队列配置
///
/// Message queue operation needs to be enabled ```#[cfg(feature = "mq")]``` .
Expand All @@ -16,8 +18,14 @@ use url::Url;
/// ..Default::default()
///};
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, TypedBuilder)]
pub struct MQModuleConfig {
/// Message queue access Url, Url with permission information / 消息队列访问Url,Url带权限信息
pub url: Url,
}

impl std::fmt::Debug for MQModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MQModuleConfig").field("url", &self.url.redact()).finish()
}
}
17 changes: 16 additions & 1 deletion tardis/src/config/config_dto/component/os.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use serde::{Deserialize, Serialize};

use typed_builder::TypedBuilder;

#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
use crate::redact::Redact;

#[derive(Serialize, Deserialize, Clone, TypedBuilder)]
#[serde(default)]
pub struct OSModuleConfig {
/// s3/oss/obs, Support amazon s3 / aliyun oss / huaweicloud obs
Expand All @@ -20,6 +22,19 @@ pub struct OSModuleConfig {
pub default_bucket: String,
}

impl std::fmt::Debug for OSModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OSModuleConfig")
.field("kind", &self.kind)
.field("endpoint", &self.endpoint)
.field("ak", &self.ak)
.field("sk", &self.sk.redact())
.field("region", &self.region)
.field("default_bucket", &self.default_bucket)
.finish()
}
}

impl Default for OSModuleConfig {
fn default() -> Self {
Self::builder().build()
Expand Down
10 changes: 9 additions & 1 deletion tardis/src/config/config_dto/component/search.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use url::Url;

use crate::redact::Redact;
/// Search configuration / 搜索配置
///
/// Search operation needs to be enabled ```#[cfg(feature = "web-client")]``` .
Expand All @@ -15,11 +17,17 @@ use url::Url;
/// ..Default::default()
///};
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
#[derive(Serialize, Deserialize, Clone, TypedBuilder)]
pub struct SearchModuleConfig {
/// Search access Url, Url with permission information / 搜索访问Url,Url带权限信息
pub url: Url,
#[builder(default = 60)]
/// Timeout / 操作超时时间
pub timeout_sec: u64,
}

impl std::fmt::Debug for SearchModuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SearchModuleConfig").field("url", &self.url.redact()).field("timeout_sec", &self.timeout_sec).finish()
}
}
2 changes: 1 addition & 1 deletion tardis/src/config/config_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl ConfCenterConfig {
}

#[cfg(feature = "crypto")]
fn decryption(text: &str, salt: &str) -> TardisResult<String> {
pub fn decryption(text: &str, salt: &str) -> TardisResult<String> {
use crate::crypto::crypto_aead::algorithm::Aes128;
if salt.len() != 16 {
return Err(TardisError::format_error("[Tardis.Config] [salt] Length must be 16", ""));
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/crypto/crypto_digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct TardisCryptoDigest;
/// algorithms for digest
pub mod algorithm {
pub use digest::Digest;
pub use hex;
pub use hmac::{Hmac, Mac};
pub use md5::Md5;
pub use sha1::Sha1;
Expand Down Expand Up @@ -49,7 +50,6 @@ pub mod output {
}
}
}

/// Digest handle / 摘要处理
///
/// # Examples
Expand Down
16 changes: 10 additions & 6 deletions tardis/src/db/reldb_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl TardisRelDBClient {
compatible_type,
}: &DBModuleConfig,
) -> TardisResult<TardisRelDBClient> {
use crate::utils::redact::Redact;
let url = Url::parse(str_url).map_err(|_| TardisError::format_error(&format!("[Tardis.RelDBClient] Invalid url {str_url}"), "406-tardis-reldb-url-error"))?;
info!(
"[Tardis.RelDBClient] Initializing, host:{}, port:{}, max_connections:{}",
Expand Down Expand Up @@ -191,7 +192,7 @@ impl TardisRelDBClient {
match result {
Ok(pool) => Ok(SqlxMySqlConnector::from_sqlx_mysql_pool(pool)),
Err(error) => Err(TardisError::format_error(
&format!("[Tardis.RelDBClient] {str_url} Initialization error: {error}"),
&format!("[Tardis.RelDBClient] {} Initialization error: {error}", url.redact()),
"406-tardis-reldb-conn-init-error",
)),
}
Expand Down Expand Up @@ -219,20 +220,23 @@ impl TardisRelDBClient {
match result {
Ok(pool) => Ok(SqlxPostgresConnector::from_sqlx_postgres_pool(pool)),
Err(error) => Err(TardisError::format_error(
&format!("[Tardis.RelDBClient] {str_url} Initialization error: {error}"),
&format!("[Tardis.RelDBClient] {} Initialization error: {error}", url.redact()),
"406-tardis-reldb-conn-init-error",
)),
}
}
_ => Err(TardisError::format_error(
&format!("[Tardis.RelDBClient] {str_url} , current database does not support setting timezone"),
&format!("[Tardis.RelDBClient] {} , current database does not support setting timezone", url.redact()),
"406-tardis-reldb-conn-init-error",
)),
}
} else {
Database::connect(opt)
.await
.map_err(|error| TardisError::format_error(&format!("[Tardis.RelDBClient] {str_url} Initialization error: {error}"), "406-tardis-reldb-conn-init-error"))
Database::connect(opt).await.map_err(|error| {
TardisError::format_error(
&format!("[Tardis.RelDBClient] {} Initialization error: {error}", url.redact()),
"406-tardis-reldb-conn-init-error",
)
})
}?;
info!(
"[Tardis.RelDBClient] Initialized, host:{}, port:{}, max_connections:{}",
Expand Down
1 change: 1 addition & 0 deletions tardis/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub(crate) use tardis_component::*;
pub mod build_info;
pub mod initializer;
pub mod mapper;
pub mod redact;
pub mod tardis_static;
25 changes: 25 additions & 0 deletions tardis/src/utils/redact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/// Use to mask some sensitive data in the logs
pub trait Redact: Sized {
fn redact(&self) -> Self;
}

const PASSWORD_MASK: &str = "**";
const STRING_MASK: &str = "[REDACTED]";

impl Redact for url::Url {
/// Redact the password part of the URL
fn redact(&self) -> Self {
let mut url = self.clone();
if url.password().is_some() {
let _ = url.set_password(Some(PASSWORD_MASK));
}
url
}
}

impl Redact for String {
/// Redact the string
fn redact(&self) -> Self {
STRING_MASK.to_string()
}
}
1 change: 0 additions & 1 deletion tardis/src/web/ws_processor/cluster_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{TardisWebsocketMgrMessage, WsBroadcastSender};

impl WsBroadcastSender for ClusterBroadcastChannel<TardisWebsocketMgrMessage> {
fn subscribe(&self) -> tokio::sync::broadcast::Receiver<TardisWebsocketMgrMessage> {
// dbg!(self.local_broadcast_channel.receiver_count());
self.local_broadcast_channel.subscribe()
}

Expand Down
Loading