Skip to content

Commit

Permalink
Merge branch 'feat/nonglobal-node-no'
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Aug 8, 2024
2 parents c7f8dd5 + a2ab78f commit ed2c3d5
Show file tree
Hide file tree
Showing 51 changed files with 316 additions and 239 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- core/topology: add `Topology::set_node_no()` and `Topology::node_no()`.

### Changed
- **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`.
- core/addr: expose the `addr` module.
- core: generate `NodeNo` randomly if not provided.

### Fixed
- core: update the `idr-ebr` crate to v0.3 to fix possible crash in `Context::finished()`.

Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ dashmap = "6.0.1"
toml = "0.8.14"
bytesize = { version = "1.2.0", features = ["serde"] }

[workspace.dependencies.derive_more]
version = "1"
features = ["constructor", "deref", "display", "error", "from", "into", "is_variant"]

[workspace.lints.rust]
rust_2018_idioms = { level = "warn", priority = -1 }
unreachable_pub = "warn"
Expand Down
2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils" }

metrics.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
derive_more.workspace = true
criterion = "0.5.1"
futures = "0.3"
derive_more = "0.99.17"
mimalloc = { version = "0.1.39", default-features = false }
jemallocator = "0.5.4"
tcmalloc = { version = "0.3.0", features = ["bundled"] }
Expand Down
2 changes: 1 addition & 1 deletion benches/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn case<const FLAGS: Flags>(c: &mut Criterion) {
}

#[derive(Display)]
#[display(fmt = "{}p{}c{}w", producers, consumers, workers)]
#[display("{producers}p{consumers}c{workers}w")]
struct CaseParams {
workers: u32,
producers: u32,
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ elfo-utils = { version = "0.2.6", path = "../elfo-utils" }
stability.workspace = true
metrics.workspace = true
dashmap.workspace = true
derive_more.workspace = true
tokio = { workspace = true, features = ["rt", "sync", "time", "signal", "macros"] }
idr-ebr = "0.3.0"
futures-intrusive = "0.5"
Expand All @@ -35,7 +36,6 @@ parking_lot = "0.12"
smallbox = "0.8.0"
# TODO: avoid the `rc` feature here?
serde = { version = "1.0.120", features = ["derive", "rc"] }
derive_more = "0.99.11"
tracing = "0.1.25"
futures = "0.3.12"
static_assertions = "1.1.0"
Expand Down
41 changes: 29 additions & 12 deletions elfo-core/src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@ use serde::{Deserialize, Serialize};

// === NodeNo ===

/// Represents the node's number.
/// Represents the node's number in a distributed system.
/// Cannot be `0`, it's reserved to represent the local node.
///
/// Nodes with the same `node_no` cannot be connected.
///
/// NOTE: It's 16-bit unsigned integer, which requires manual management for
/// bigger-than-small clusters and will be replaced with [`NodeLaunchId`]
/// totally in the future in order to simplify the management.
#[stability::unstable]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Display, Serialize, Deserialize)]
pub struct NodeNo(NonZeroU16);

impl NodeNo {
#[stability::unstable]
pub(crate) fn generate() -> Self {
Self::from_bits(random_u64().max(1) as u16).unwrap()
}

#[inline]
pub fn from_bits(bits: u16) -> Option<Self> {
NonZeroU16::new(bits).map(NodeNo)
}

#[stability::unstable]
#[inline]
pub fn into_bits(self) -> u16 {
self.0.get()
Expand All @@ -44,15 +52,7 @@ pub struct NodeLaunchId(u64);

impl NodeLaunchId {
pub(crate) fn generate() -> Self {
use std::{
collections::hash_map::RandomState,
hash::{BuildHasher, Hasher},
};

// `RandomState` is randomly seeded.
let mut hasher = RandomState::new().build_hasher();
hasher.write_u64(0xE1F0E1F0E1F0E1F0);
Self(hasher.finish())
Self(random_u64())
}

#[stability::unstable]
Expand Down Expand Up @@ -305,6 +305,23 @@ const_assert_eq!(
GROUP_NO_SHIFT
);

// === random_u64 ===

fn random_u64() -> u64 {
use std::{
collections::hash_map::RandomState,
hash::{BuildHasher, Hash, Hasher},
thread,
time::Instant,
};

let mut hasher = RandomState::new().build_hasher();
0xE1F0E1F0E1F0E1F0u64.hash(&mut hasher);
Instant::now().hash(&mut hasher);
thread::current().id().hash(&mut hasher);
hasher.finish()
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
21 changes: 9 additions & 12 deletions elfo-core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::BTreeMap,
fmt::{Debug, Display},
};
use std::{collections::BTreeMap, fmt::Debug};

use derive_more::{Display, Error};

Expand Down Expand Up @@ -78,7 +75,7 @@ impl Display for StartError {

#[derive(Clone, Debug, Display, Error)]
#[non_exhaustive]
#[display(fmt = "error from group {group}: {reason}")]
#[display("error from group {group}: {reason}")]
pub struct StartGroupError {
pub group: String,
pub reason: String,
Expand All @@ -87,7 +84,7 @@ pub struct StartGroupError {
// === SendError ===

#[derive(Debug, Display, Error)]
#[display(fmt = "mailbox closed")]
#[display("mailbox closed")]
pub struct SendError<T>(#[error(not(source))] pub T);

impl<T> SendError<T> {
Expand All @@ -108,10 +105,10 @@ impl<T> SendError<T> {
#[derive(Debug, Display, Error)]
pub enum TrySendError<T> {
/// The mailbox is full.
#[display(fmt = "mailbox full")]
#[display("mailbox full")]
Full(#[error(not(source))] T),
/// The mailbox has been closed.
#[display(fmt = "mailbox closed")]
#[display("mailbox closed")]
Closed(#[error(not(source))] T),
}

Expand Down Expand Up @@ -159,10 +156,10 @@ impl<T> From<SendError<T>> for TrySendError<T> {
#[derive(Debug, Display, Error)]
pub enum RequestError {
/// Receiver hasn't got the request.
#[display(fmt = "request failed")]
#[display("request failed")]
Failed,
/// Receiver has got the request, but ignored it.
#[display(fmt = "request ignored")]
#[display("request ignored")]
Ignored,
}

Expand All @@ -185,10 +182,10 @@ impl RequestError {
#[derive(Debug, Clone, Display, Error)]
pub enum TryRecvError {
/// The mailbox is empty.
#[display(fmt = "mailbox empty")]
#[display("mailbox empty")]
Empty,
/// The mailbox has been closed.
#[display(fmt = "mailbox closed")]
#[display("mailbox closed")]
Closed,
}

Expand Down
33 changes: 18 additions & 15 deletions elfo-core/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
use futures::future::BoxFuture;

use crate::{
addr::NodeNo,
config::Config,
context::Context,
envelope::Envelope,
Expand Down Expand Up @@ -105,20 +106,22 @@ impl<R, C> ActorGroup<R, C> {
ER: ExecResult,
C: Config,
{
let mount = move |ctx: Context, name: String, rt_manager: RuntimeManager| {
let addr = ctx.group();
let sv = Arc::new(Supervisor::new(
ctx,
name,
exec,
self.router,
self.restart_policy,
self.termination_policy,
rt_manager,
));

Object::new(addr, Box::new(Handle(sv)) as Box<dyn GroupHandle>)
};
let mount =
move |ctx: Context, node_no: NodeNo, name: String, rt_manager: RuntimeManager| {
let addr = ctx.group();
let sv = Arc::new(Supervisor::new(
ctx,
node_no,
name,
exec,
self.router,
self.restart_policy,
self.termination_policy,
rt_manager,
));

Object::new(addr, Box::new(Handle(sv)) as Box<dyn GroupHandle>)
};

Blueprint {
mount: Box::new(mount),
Expand Down Expand Up @@ -146,7 +149,7 @@ where
}

pub struct Blueprint {
pub(crate) mount: Box<dyn FnOnce(Context, String, RuntimeManager) -> Object>,
pub(crate) mount: Box<dyn FnOnce(Context, NodeNo, String, RuntimeManager) -> Object>,
pub(crate) stop_order: i8,
}

Expand Down
5 changes: 4 additions & 1 deletion elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub async fn start(topology: Topology) {
pub async fn try_start(topology: Topology) -> Result<()> {
check_messages_uniqueness()?;

#[cfg(feature = "test-util")]
warn!("elfo is compiled with `test-util` feature, it may affect performance");

let res = do_start(topology, false, termination).await;

if res.is_err() {
Expand Down Expand Up @@ -180,7 +183,7 @@ pub async fn do_start<F: Future>(
Arc::new(SubscriptionManager::new(ctx.clone())),
);

let scope_shared = ScopeGroupShared::new(addr);
let scope_shared = ScopeGroupShared::new(topology.node_no(), addr);
let mut config = SystemConfig::default();
config.logging.max_level = LevelFilter::INFO;
scope_shared.configure(&config);
Expand Down
11 changes: 1 addition & 10 deletions elfo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ pub use elfo_macros::{message_core as message, msg_core as msg};
#[macro_use]
mod macros;

pub mod addr;
pub mod config;
pub mod coop;
pub mod dumping;
pub mod errors;
pub mod init;
pub mod logging;
pub mod messages;
pub mod node;
pub mod routers;
pub mod scope;
pub mod signal;
Expand All @@ -50,7 +50,6 @@ pub mod tracing;

mod actor;
mod actor_status;
mod addr;
mod address_book;
mod context;
mod demux;
Expand Down Expand Up @@ -80,14 +79,6 @@ mod thread;

#[doc(hidden)]
pub mod _priv {
pub mod node {
pub fn set_node_no(node_no: u16) {
crate::node::set_node_no(node_no)
}
}

#[cfg(feature = "unstable")]
pub use crate::addr::{GroupNo, NodeLaunchId, NodeNo};
pub use crate::{
address_book::AddressBook,
envelope::{EnvelopeBorrowed, EnvelopeOwned, MessageKind},
Expand Down
34 changes: 0 additions & 34 deletions elfo-core/src/node.rs

This file was deleted.

Loading

0 comments on commit ed2c3d5

Please sign in to comment.