Skip to content

Commit

Permalink
[eclipse-iceoryx#550] Introduce health monitoring example
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Dec 19, 2024
1 parent 47802c0 commit 5ad90b2
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 1 deletion.
18 changes: 18 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ path = "rust/event_based_communication/publisher.rs"
name = "event_based_comm_subscriber"
path = "rust/event_based_communication/subscriber.rs"

# event based communication

[[example]]
name = "health_monitoring_publisher_1"
path = "rust/health_monitoring/publisher_1.rs"

[[example]]
name = "health_monitoring_publisher_2"
path = "rust/health_monitoring/publisher_2.rs"

[[example]]
name = "health_monitoring_subscriber"
path = "rust/health_monitoring/subscriber.rs"

[[example]]
name = "health_monitoring_central_daemon"
path = "rust/health_monitoring/central_daemon.rs"

# publish_subscribe

[[example]]
Expand Down
25 changes: 25 additions & 0 deletions examples/rust/_examples_common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,28 @@ mod transmission_data;
pub use custom_header::CustomHeader;
pub use pubsub_event::PubSubEvent;
pub use transmission_data::TransmissionData;

use iceoryx2::{
prelude::*,
service::port_factory::{event, publish_subscribe},
};

pub fn open_service(
node: &Node<ipc::Service>,
service_name: &ServiceName,
) -> Result<
(
event::PortFactory<ipc::Service>,
publish_subscribe::PortFactory<ipc::Service, u64, ()>,
),
Box<dyn std::error::Error>,
> {
let service_pubsub = node
.service_builder(&service_name)
.publish_subscribe::<u64>()
.open()?;

let service_event = node.service_builder(&service_name).event().open()?;

Ok((service_event, service_pubsub))
}
2 changes: 2 additions & 0 deletions examples/rust/_examples_common/pubsub_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum PubSubEvent {
SentSample = 4,
ReceivedSample = 5,
SentHistory = 6,
ProcessDied = 7,
Unknown,
}

Expand All @@ -39,6 +40,7 @@ impl From<EventId> for PubSubEvent {
4 => PubSubEvent::SentSample,
5 => PubSubEvent::ReceivedSample,
6 => PubSubEvent::SentHistory,
7 => PubSubEvent::ProcessDied,
_ => PubSubEvent::Unknown,
}
}
Expand Down
57 changes: 57 additions & 0 deletions examples/rust/health_monitoring/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

load("@rules_rust//rust:defs.bzl", "rust_binary")

rust_binary(
name = "publisher_1",
srcs = [
"publisher_1.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "publisher_2",
srcs = [
"publisher_2.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "subscriber",
srcs = [
"subscriber.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "central_daemon",
srcs = [
"central_daemon.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)
1 change: 1 addition & 0 deletions examples/rust/health_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Health Monitoring
95 changes: 95 additions & 0 deletions examples/rust/health_monitoring/central_daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use core::time::Duration;
use examples_common::PubSubEvent;
use iceoryx2::{node::NodeView, prelude::*};

const CYCLE_TIME: Duration = Duration::from_millis(100);

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_name_1 = ServiceName::new("service_1")?;
let service_name_2 = ServiceName::new("service_2")?;

let node = NodeBuilder::new()
.name(&"watchdog and resource creator".try_into()?)
.create::<ipc::Service>()?;

// The central daemon is responsible to create all services before hand and the other processes
// just open the communication resources and start communicating.
let _service_pubsub_1 = node
.service_builder(&service_name_1)
.publish_subscribe::<u64>()
// We use here open_or_create so that, in case of a crash of the central daemon, it can
// be restarted.
.open_or_create()?;

let _service_event_1 = node
.service_builder(&service_name_1)
.event()
// Whenever a new notifier is created the PublisherConnected event is emitted. this makes
// sense since in this example a notifier is always created after a new publisher was
// created.
// The task of the notifier/event is it to inform and wake up other processes when certain
// system event have happened.
.notifier_created_event(PubSubEvent::PublisherConnected.into())
.notifier_dropped_event(PubSubEvent::PublisherDisconnected.into())
// This event is emitted when either the central daemon or a decentralized process detects
// a dead node and cleaned up all of its stale resources succesfully.
.notifier_dead_event(PubSubEvent::ProcessDied.into())
.open_or_create()?;

let _service_pubsub_2 = node
.service_builder(&service_name_2)
.publish_subscribe::<u64>()
.open_or_create()?;

let _service_event_2 = node
.service_builder(&service_name_2)
.event()
.notifier_created_event(PubSubEvent::PublisherConnected.into())
.notifier_dropped_event(PubSubEvent::PublisherDisconnected.into())
.notifier_dead_event(PubSubEvent::ProcessDied.into())
.open_or_create()?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let _cycle_guard = waitset.attach_interval(CYCLE_TIME);

println!("Central daemon up and running.");
waitset.wait_and_process(|_| {
// The only task of our central daemon is it to monitor all running nodes and cleanup their
// resources if a process has died.
//
// Since we added the notifier_dead_event to the service, all listeners, that are waiting
// on a service where one participant has died, will be woken up and they receive
// the PubSubEvent::ProcessDied
find_and_cleanup_dead_nodes();
CallbackProgression::Continue
})?;

Ok(())
}

fn find_and_cleanup_dead_nodes() {
Node::<ipc::Service>::list(Config::global_config(), |node_state| {
if let NodeState::Dead(state) = node_state {
println!(
"detected dead node: {:?}",
state.details().as_ref().map(|v| v.name())
);
state.remove_stale_resources().expect("");
}

CallbackProgression::Continue
})
.expect("");
}
55 changes: 55 additions & 0 deletions examples/rust/health_monitoring/publisher_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use core::time::Duration;
use examples_common::{open_service, PubSubEvent};
use iceoryx2::prelude::*;

const CYCLE_TIME: Duration = Duration::from_millis(1000);

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_name = ServiceName::new("service_1")?;
let node = NodeBuilder::new()
.name(&"publisher 1".try_into()?)
.create::<ipc::Service>()?;

let (service_event, service_pubsub) = open_service(&node, &service_name)?;

let publisher = service_pubsub.publisher_builder().create()?;
let notifier = service_event
.notifier_builder()
// we only want to notify the other side explicitly when we have sent a sample
// so we can define it as default event id
.default_event_id(PubSubEvent::SentSample.into())
.create()?;
let mut counter: u64 = 0;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;

// we need to send out a sample with an interval of CYCLE_TIME, therefore we attach an
// interval that wakes us up regularly to send out the next sample
let _cycle_guard = waitset.attach_interval(CYCLE_TIME);

waitset.wait_and_process(|_| {
println!("{:?}: Send sample {} ...", service_name, counter);
publisher
.send_copy(counter)
.expect("sample delivery successful.");
notifier.notify().expect("notification successful.");
counter += 1;
CallbackProgression::Continue
})?;

println!("exit");

Ok(())
}
55 changes: 55 additions & 0 deletions examples/rust/health_monitoring/publisher_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use core::time::Duration;
use examples_common::{open_service, PubSubEvent};
use iceoryx2::prelude::*;

const CYCLE_TIME: Duration = Duration::from_millis(1500);

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_name = ServiceName::new("service_2")?;
let node = NodeBuilder::new()
.name(&"publisher 2".try_into()?)
.create::<ipc::Service>()?;

let (service_event, service_pubsub) = open_service(&node, &service_name)?;

let publisher = service_pubsub.publisher_builder().create()?;
let notifier = service_event
.notifier_builder()
// we only want to notify the other side explicitly when we have sent a sample
// so we can define it as default event id
.default_event_id(PubSubEvent::SentSample.into())
.create()?;
let mut counter: u64 = 1000000;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;

// we only want to notify the other side explicitly when we have sent a sample
// so we can define it as default event id
let _cycle_guard = waitset.attach_interval(CYCLE_TIME);

waitset.wait_and_process(|_| {
println!("{:?}: Send sample {} ...", service_name, counter);
publisher
.send_copy(counter)
.expect("sample delivery successful.");
notifier.notify().expect("notification successful.");
counter += 1;
CallbackProgression::Continue
})?;

println!("exit");

Ok(())
}
Loading

0 comments on commit 5ad90b2

Please sign in to comment.