Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more documents about cluster #114

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tardis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ anyhow = { version = "1.0", optional = true }

# K8s
kube = { version = "^0.87", features = ["runtime"], optional = true }
# temporarily fix https://github.com/kube-rs/kube/issues/1486
json-patch = "=1.2.0"
k8s-openapi = { version = "^0.20", features = ["earliest"], optional = true }

# Test
Expand Down
12 changes: 10 additions & 2 deletions tardis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ tardis = { version = "^0", features = ["web-server"] }

Processor Configuration

```ignore
```rust
use tardis::basic::error::TardisError;
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::Query;
Expand All @@ -100,7 +100,7 @@ impl Api {

Startup class configuration

```ignore
```rust
use tardis::basic::result::TardisResult;
use tardis::tokio;
use tardis::TardisFuns;
Expand All @@ -117,6 +117,14 @@ async fn main() -> TardisResult<()> {
Ok(())
}
```
### Run with cluster mode
You can enable cluster mode when it has multi nodes, especially when it comes to k8s.
```toml
[fw.cluster]
watch_kind = "k8s"
k8s_svc = "my-service"
k8s_ns = "my-namespace"
```

### Dependencies

Expand Down
5 changes: 5 additions & 0 deletions tardis/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
/// Broadcast channel between cluster nodes.
pub mod cluster_broadcast;
/// Sync map between cluster nodes.
pub mod cluster_hashmap;
/// Cluster processor.
pub mod cluster_processor;
/// Event publish
pub mod cluster_publish;
/// Event receive
pub mod cluster_receive;
mod cluster_watch_by_cache;
#[cfg(feature = "k8s")]
Expand Down
13 changes: 13 additions & 0 deletions tardis/src/cluster/cluster_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::{basic::result::TardisResult, TardisFuns};
use async_trait::async_trait;

pub const CLUSTER_NODE_WHOAMI: &str = "__cluster_node_who_am_i__";
/// cluster ping event
pub const EVENT_PING: &str = "tardis/ping";
/// cluster status check event
pub const EVENT_STATUS: &str = "tardis/status";
pub const CLUSTER_MESSAGE_CACHE_SIZE: usize = 10000;
pub const WHOAMI_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -97,6 +99,10 @@ impl std::fmt::Display for ClusterRemoteNodeKey {
pub type ClusterMessageId = String;

#[async_trait]
/// Cluster event subscriber trait, a subscriber object can be registered to the cluster event system and respond to the event
///
/// # Register
/// see [`subscribe`], [`subscribe_boxed`] and [`subscribe_if_not_exist`]
pub trait TardisClusterSubscriber: Send + Sync + 'static {
fn event_name(&self) -> Cow<'static, str>;
async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult<Option<Value>>;
Expand Down Expand Up @@ -310,16 +316,19 @@ async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult<TardisClusterN
Ok(remote)
}

/// subscribe a boxed cluster event
pub async fn subscribe_boxed(subscriber: Box<dyn TardisClusterSubscriber>) {
let event_name = subscriber.event_name();
info!("[Tardis.Cluster] [Server] subscribe event {event_name}");
subscribers().write().await.insert(event_name, subscriber);
}

/// subscribe a cluster event
pub async fn subscribe<S: TardisClusterSubscriber>(subscriber: S) {
subscribe_boxed(Box::new(subscriber)).await;
}

/// subscribe a cluster event if not exist
pub async fn subscribe_if_not_exist<S: TardisClusterSubscriber>(subscriber: S) {
let mut wg = subscribers().write().await;
let event_name = subscriber.event_name();
Expand All @@ -330,11 +339,13 @@ pub async fn subscribe_if_not_exist<S: TardisClusterSubscriber>(subscriber: S) {
}
}

/// unsubscribe a cluster event
pub async fn unsubscribe(event_name: &str) {
info!("[Tardis.Cluster] [Server] unsubscribe event {event_name}");
subscribers().write().await.remove(event_name);
}

/// a request message for cluster
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct TardisClusterMessageReq {
pub(crate) msg_id: String,
Expand All @@ -358,6 +369,8 @@ impl TardisClusterMessageReq {
}
}

/// a response message for cluster

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct TardisClusterMessageResp {
pub(crate) msg_id: String,
Expand Down
22 changes: 21 additions & 1 deletion tardis/src/cluster/cluster_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ use serde::Serialize;
use serde_json::Value;
use tracing::{error, trace};

/// Cluster-wide event
///
/// `<L>` is the listener type, default is [`Once`], which implies that the response message will be received only once.
///
/// # Example
/// ```
/// # use tardis::cluster::cluster_publish::ClusterEvent;
/// let event = ClusterEvent::new("hello").no_response().message(&("hello", "world"));
/// ```
#[derive(Debug, Clone)]
pub struct ClusterEvent<L = Once> {
event: Cow<'static, str>,
Expand Down Expand Up @@ -43,6 +52,7 @@ impl<L> ClusterEvent<L> {
listener,
}
}
/// Set the listener to receive only one response.
pub fn one_response(self, timeout: Option<Duration>) -> ClusterEvent<Once> {
ClusterEvent {
event: self.event,
Expand All @@ -51,6 +61,7 @@ impl<L> ClusterEvent<L> {
listener: Once { timeout },
}
}
/// Don't expect any response.
pub fn no_response(self) -> ClusterEvent<Never> {
ClusterEvent {
event: self.event,
Expand All @@ -59,6 +70,7 @@ impl<L> ClusterEvent<L> {
listener: Never,
}
}
/// Set the message of the event.
pub fn message<T: Serialize>(self, message: &T) -> TardisResult<Self> {
Ok(Self {
message: crate::TardisFuns::json.obj_to_json(message)?,
Expand All @@ -68,12 +80,16 @@ impl<L> ClusterEvent<L> {
pub fn json_message(self, message: Value) -> Self {
Self { message, ..self }
}
/// Set the target of the event.
///
/// see [`ClusterEventTarget`]
pub fn target(self, target: impl Into<ClusterEventTarget>) -> Self {
Self { target: target.into(), ..self }
}
}

impl ClusterEvent<Once> {
/// Publish the event and receive only one response.
pub async fn publish_one_response(self) -> TardisResult<TardisClusterMessageResp> {
publish_event_with_listener(self.event, self.message, self.target, self.listener).await?.await.map_err(|e| {
let error_info = format!("[Tardis.Cluster] [Client] Oneshot receive error: {e}, this may caused by timeout");
Expand All @@ -84,15 +100,18 @@ impl ClusterEvent<Once> {
}

impl<L: Listener> ClusterEvent<L> {
/// Publish the event.
pub async fn publish(self) -> TardisResult<L::Reply> {
publish_event_with_listener(self.event, self.message, self.target, self.listener).await
}
}

/// Publish an event with no response.
pub async fn publish_event_no_response(event: impl Into<Cow<'static, str>>, message: Value, target: impl Into<ClusterEventTarget>) -> TardisResult<String> {
publish_event_with_listener(event, message, target, Never).await
}

/// Publish an event and receive only one response.
pub async fn publish_event_one_response(
event: impl Into<Cow<'static, str>>,
message: Value,
Expand All @@ -106,6 +125,7 @@ pub async fn publish_event_one_response(
})
}

/// Publish an event
pub async fn publish_event_with_listener<S: Listener>(
event: impl Into<Cow<'static, str>>,
message: Value,
Expand Down Expand Up @@ -147,7 +167,7 @@ pub async fn publish_event_with_listener<S: Listener>(
Ok(reply)
}

pub async fn do_publish_event(message_req: TardisClusterMessageReq, clients: impl IntoIterator<Item = Arc<TardisWSClient>>) -> TardisResult<()> {
pub(crate) async fn do_publish_event(message_req: TardisClusterMessageReq, clients: impl IntoIterator<Item = Arc<TardisWSClient>>) -> TardisResult<()> {
let ws_message = tokio_tungstenite::tungstenite::Message::Text(TardisFuns::json.obj_to_string(&message_req)?);
let publish_result = join_all(clients.into_iter().map(|client| {
let ws_message = ws_message.clone();
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/cluster/cluster_receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tardis_static! {
responser_subscribers: RwLock<HashMap::<String, ResponseFn>>;
}

pub async fn listen_reply<S: Listener>(strategy: S, id: String) -> S::Reply {
pub(crate) async fn listen_reply<S: Listener>(strategy: S, id: String) -> S::Reply {
strategy.subscribe(id).await
}

Expand Down
Loading