Skip to content

Commit

Permalink
Merge pull request #890 from 4t145/fix-mq-in-k8s-cluster
Browse files Browse the repository at this point in the history
event: fix mq in k8s cluster
  • Loading branch information
4t145 authored Jan 2, 2025
2 parents c41a32a + c62fe79 commit 1525521
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ strum = { version = "0.26", features = ["derive"] }
# tardis = { version = "0.2.0", path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "66d4c63" }
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
# asteroid-mq = { version = "0.1.0-alpha.5" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
# asteroid-mq-sdk = { version = "0.1.0-alpha.5" }
#spacegate

Expand Down
28 changes: 19 additions & 9 deletions backend/middlewares/event/src/event_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use asteroid_mq::openraft;
use bios_basic::rbum::rbum_config::RbumConfig;

use serde::{Deserialize, Serialize};
Expand All @@ -14,13 +13,30 @@ pub struct EventConfig {
pub rbum: RbumConfig,
pub enable: bool,
pub svc: String,
pub raft: openraft::Config,
pub raft: Option<RaftConfig>,
// default by 5000ms
pub startup_timeout: u64,
pub durable: bool,
pub avatars: Vec<String>,
pub cluster: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct RaftConfig {
pub election_timeout_min: u64,
pub election_timeout_max: u64,
pub heartbeat_interval: u64,
}

impl Default for RaftConfig {
fn default() -> Self {
RaftConfig {
election_timeout_max: 1000,
election_timeout_min: 500,
heartbeat_interval: 100,
}
}
}

impl EventConfig {
pub const CLUSTER_K8S: &str = "k8s";
Expand All @@ -37,13 +53,7 @@ impl Default for EventConfig {
startup_timeout: 5000,
durable: true,
cluster: Some(Self::CLUSTER_K8S.to_string()),
raft: openraft::Config {
cluster_name: "bios".to_string(),
election_timeout_max: 1000,
election_timeout_min: 500,
heartbeat_interval: 100,
..Default::default()
},
raft: None,
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions backend/middlewares/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use asteroid_mq::{
prelude::{DurableService, Node, NodeConfig, NodeId, TopicConfig, TopicOverflowConfig},
protocol::node::{
edge::auth::EdgeAuthService,
raft::cluster::{K8sClusterProvider, StaticClusterProvider},
raft::cluster::{this_pod_id, K8sClusterProvider, StaticClusterProvider},
},
};
use bios_basic::rbum::{
Expand Down Expand Up @@ -154,34 +154,39 @@ async fn init_mq_cluster(config: &EventConfig, funs: TardisFunsInst, ctx: Tardis

pub async fn init_mq_node(config: &EventConfig, funs: Arc<TardisFunsInst>, ctx: &TardisContext) -> asteroid_mq::prelude::Node {
let timeout = Duration::from_secs(config.startup_timeout);
const ENV_POD_UID: &str = "POD_UID";
if let Some(node) = TardisFuns::store().get_singleton::<asteroid_mq::prelude::Node>() {
node
} else {
let raft_config = config.raft.clone().unwrap_or_default();
let raft_config = asteroid_mq::openraft::Config {
election_timeout_min: raft_config.election_timeout_min,
election_timeout_max: raft_config.election_timeout_max,
heartbeat_interval: raft_config.heartbeat_interval,
..Default::default()
};
let node = match config.cluster.as_deref() {
Some(EventConfig::CLUSTER_K8S) => {
let uid = std::env::var(ENV_POD_UID).expect("POD_UID is required");
let node = Node::new(NodeConfig {
id: NodeId::sha256(uid.as_bytes()),
raft: config.raft.clone(),
id: this_pod_id(),
raft: raft_config,
durable: config.durable.then_some(DurableService::new(BiosDurableAdapter::new(funs.clone(), ctx.clone()))),
edge_auth: Some(EdgeAuthService::new(BiosEdgeAuthAdapter::new(funs.clone(), ctx.clone()))),
..Default::default()
});
let cluster_provider = K8sClusterProvider::new(config.svc.clone(), asteroid_mq::DEFAULT_TCP_PORT).await;
let cluster_provider = K8sClusterProvider::new(asteroid_mq::DEFAULT_TCP_PORT).await;
node.start(cluster_provider).await.expect("fail to init raft");
node
}
Some(EventConfig::NO_CLUSTER) | None => {
let node = Node::new(NodeConfig {
id: NodeId::snowflake(),
raft: config.raft.clone(),
raft: raft_config,
durable: config.durable.then_some(DurableService::new(BiosDurableAdapter::new(funs.clone(), ctx.clone()))),
edge_auth: Some(EdgeAuthService::new(BiosEdgeAuthAdapter::new(funs.clone(), ctx.clone()))),
..Default::default()
});
// singleton mode
let cluster_provider = StaticClusterProvider::singleton(node.id(), node.config().addr);
let cluster_provider = StaticClusterProvider::singleton(node.id(), node.config().addr.to_string());
node.start(cluster_provider).await.expect("fail to init raft");
node
}
Expand Down
5 changes: 4 additions & 1 deletion backend/services/bios-all/build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
REPO=$TAG

CURRENT_DATE=$(date +"%Y%m%d%H%M")
### Rust
if [ -z "$TARGET" ]; then
echo "Please choose a target:"
Expand Down Expand Up @@ -44,6 +44,9 @@ if [ -z "$TAG" ]; then
echo "Please enter a tag:"
read TAG
fi
if [ -z "$TAG" ]; then
TAG="test-$CURRENT_DATE"
fi
docker build -t $TAG ./


Expand Down

0 comments on commit 1525521

Please sign in to comment.