Skip to content

Commit

Permalink
Merge pull request #12 from bastion-rs/async-cluster-construct
Browse files Browse the repository at this point in the history
Async cluster construction
  • Loading branch information
vertexclique authored Feb 29, 2020
2 parents db67cd0 + f0cb401 commit fe12258
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 198 deletions.
2 changes: 2 additions & 0 deletions artillery-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
futures = "0.3"
libp2p = "0.16.0"
bastion-executor = "0.3.4"
lightproc = "0.3.4"
crossbeam-channel = "0.4.2"

[dev-dependencies]
bincode = "1.2.1"
Expand Down
68 changes: 47 additions & 21 deletions artillery-core/examples/cball_ap_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,28 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;

use clap::*;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::ToSocketAddrs;
use std::path::Path;

use uuid::Uuid;

use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;

use once_cell::sync::OnceCell;
use serde::*;
use artillery_core::cluster::ap::*;
use futures::future;

use std::thread;
use std::time::Duration;
use artillery_core::cluster::ap_cluster::*;
use bastion_executor::prelude::*;

use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;


#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ExampleSDReply {
ip: String,
port: u16,
}
use std::sync::Arc;

fn main() {
pretty_env_logger::init();

// Let's find a broadcast port
let port = get_port();

// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
Expand All @@ -58,8 +47,45 @@ fn main() {
},
};

let ap_cluster = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
spawn_blocking(async { ap_cluster.launch().await }, ProcStack::default());
// Configure our cluster node
let ap_cluster = Arc::new(ArtilleryAPCluster::new(ap_cluster_config).unwrap());

// Launch the cluster node
run(
async {
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);

let ap_events = ap_cluster.clone();

// Detach cluster launch
let cluster_handle =
spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);

// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().clone().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");

for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);

future::join(events_handle, cluster_handle).await
},
ProcStack::default().with_pid(1),
);
}

fn get_port() -> u16 {
Expand Down
2 changes: 1 addition & 1 deletion artillery-core/examples/cball_mdns_sd_infection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn main() {
.expect("cannot start cluster-event-poller");

thread::sleep(Duration::from_secs(1));
for discovery in sd.events {
for discovery in sd.events().iter() {
if discovery.get().port() != this_node_cluster_port {
cluster.add_seed_node(discovery.get());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use crate::epidemic::prelude::*;
use crate::errors::*;
use crate::service_discovery::mdns::prelude::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;
use uuid::Uuid;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Receiver;

use lightproc::prelude::*;
use crate::errors::*;
use std::rc::Rc;

use std::future::Future;
use std::error::Error;

use std::sync::Arc;
use uuid::Uuid;

#[derive(Default, Clone)]
pub struct ArtilleryAPClusterConfig {
Expand All @@ -24,7 +20,7 @@ pub struct ArtilleryAPClusterConfig {
pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>
sd: Arc<MDNSServiceDiscovery>,
}

unsafe impl Send for ArtilleryAPCluster {}
Expand All @@ -34,20 +30,14 @@ pub type DiscoveryLaunch = RecoverableHandle<()>;

impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
let sd =
MDNSServiceDiscovery::new_service_discovery(
config.sd_config.clone())?;
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;

let cluster =
Cluster::new_cluster(
config.node_id,
config.cluster_config.clone()
)?;
let cluster = Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;

Ok(Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd)
sd: Arc::new(sd),
})
}

Expand All @@ -59,7 +49,7 @@ impl ArtilleryAPCluster {
self.sd.clone()
}

pub fn launch(&self) -> impl Future<Output=()> + '_ {
pub fn launch(&self) -> impl Future<Output = ()> + '_ {
let config = self.config.clone();
let events = self.service_discovery().events();
let cluster = self.cluster.clone();
Expand All @@ -71,7 +61,9 @@ impl ArtilleryAPCluster {

events_inner
.iter()
.filter(|discovery| discovery.get().port() != config_inner.sd_config.local_service_addr.port())
.filter(|discovery| {
discovery.get().port() != config_inner.sd_config.local_service_addr.port()
})
.for_each(|discovery| cluster_inner.add_seed_node(discovery.get()))
}
}
Expand Down
2 changes: 1 addition & 1 deletion artillery-core/src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod ap_cluster;
pub mod ap;
36 changes: 21 additions & 15 deletions artillery-core/src/epidemic/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use super::state::ArtilleryState;
use super::state::ArtilleryEpidemic;
use crate::epidemic::cluster_config::ClusterConfig;
use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_stack::ProcStack;
use std::convert::AsRef;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::{
future::Future,
pin::Pin,
sync::mpsc::{channel, Receiver, Sender},
task::{Context, Poll},
};
use uuid::Uuid;

pub struct Cluster {
Expand All @@ -17,16 +24,17 @@ impl Cluster {
let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>();
let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>();

let (poll, state) = ArtilleryState::new(host_key, config, event_tx, internal_tx.clone())?;
let (poll, state) =
ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?;

debug!("Starting Artillery Cluster");
std::thread::Builder::new()
.name("artillery-epidemic-cluster-state".to_string())
.spawn(move || {
ArtilleryState::event_loop(&mut internal_rx, poll, state)
let _cluster_handle = spawn_blocking(
async move {
ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state)
.expect("Failed to create event loop");
})
.expect("cannot start epidemic cluster state management thread");
},
ProcStack::default(),
);

Ok(Self {
events: event_rx,
Expand Down Expand Up @@ -59,12 +67,10 @@ impl Cluster {
impl Future for Cluster {
type Output = ArtilleryClusterEvent;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
return match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions artillery-core/src/epidemic/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ pub fn most_uptodate_member_data<'a>(
lhs: &'a ArtilleryMember,
rhs: &'a ArtilleryMember,
) -> &'a ArtilleryMember {
// Don't apply clippy here.
// It's important bit otherwise we won't understand.
#![allow(clippy::match_same_arms)]

let lhs_overrides = match (
lhs.member_state,
lhs.incarnation_number,
Expand Down
52 changes: 33 additions & 19 deletions artillery-core/src/epidemic/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ArtilleryMemberList {
}

fn mut_myself(&mut self) -> &mut ArtilleryMember {
for member in self.members.iter_mut() {
for member in &mut self.members {
if member.is_current() {
return member;
}
Expand Down Expand Up @@ -79,25 +79,32 @@ impl ArtilleryMemberList {

pub fn time_out_nodes(
&mut self,
expired_hosts: HashSet<SocketAddr>,
expired_hosts: &HashSet<SocketAddr>,
) -> (Vec<ArtilleryMember>, Vec<ArtilleryMember>) {
let mut suspect_members = Vec::new();
let mut down_members = Vec::new();

for member in self.members.iter_mut() {
for member in &mut self.members {
if let Some(remote_host) = member.remote_host() {
if !expired_hosts.contains(&remote_host) {
continue;
}

if member.state() == ArtilleryMemberState::Alive {
member.set_state(ArtilleryMemberState::Suspect);
suspect_members.push(member.clone());
} else if member.state() == ArtilleryMemberState::Suspect
&& member.state_change_older_than(Duration::seconds(3))
{
member.set_state(ArtilleryMemberState::Down);
down_members.push(member.clone());
match member.state() {
ArtilleryMemberState::Alive => {
member.set_state(ArtilleryMemberState::Suspect);
suspect_members.push(member.clone());
}
// TODO: Config suspect timeout
ArtilleryMemberState::Suspect
if member.state_change_older_than(Duration::seconds(3)) =>
{
member.set_state(ArtilleryMemberState::Down);
down_members.push(member.clone());
}
ArtilleryMemberState::Suspect
| ArtilleryMemberState::Down
| ArtilleryMemberState::Left => {}
}
}
}
Expand All @@ -106,7 +113,7 @@ impl ArtilleryMemberList {
}

pub fn mark_node_alive(&mut self, src_addr: &SocketAddr) -> Option<ArtilleryMember> {
for member in self.members.iter_mut() {
for member in &mut self.members {
if member.remote_host() == Some(*src_addr)
&& member.state() != ArtilleryMemberState::Alive
{
Expand Down Expand Up @@ -144,8 +151,7 @@ impl ArtilleryMemberList {
match old_member_data {
Entry::Occupied(mut entry) => {
let new_member =
member::most_uptodate_member_data(&new_member_data, entry.get())
.clone();
member::most_uptodate_member_data(new_member_data, entry.get()).clone();
let new_host = new_member
.remote_host()
.or_else(|| entry.get().remote_host())
Expand Down Expand Up @@ -173,6 +179,9 @@ impl ArtilleryMemberList {
(new_nodes, changed_nodes)
}

///
///
/// Random ping enqueuing
pub fn hosts_for_indirect_ping(
&self,
host_count: usize,
Expand All @@ -181,12 +190,16 @@ impl ArtilleryMemberList {
let mut possible_members: Vec<_> = self
.members
.iter()
.filter(|m| {
m.state() == ArtilleryMemberState::Alive
.filter_map(|m| {
if m.state() == ArtilleryMemberState::Alive
&& m.is_remote()
&& m.remote_host() != Some(*target)
{
m.remote_host()
} else {
None
}
})
.map(|m| m.remote_host().unwrap())
.collect();

math::shuffle_linear(&mut possible_members);
Expand All @@ -197,14 +210,15 @@ impl ArtilleryMemberList {
pub fn has_member(&self, remote_host: &SocketAddr) -> bool {
self.members
.iter()
.any(|ref m| m.remote_host() == Some(*remote_host))
.any(|m| m.remote_host() == Some(*remote_host))
}

pub fn add_member(&mut self, member: ArtilleryMember) {
self.members.push(member)
}

/// get_member will return artillery member if the given uuid is matches with any of the
///
/// `get_member` will return artillery member if the given uuid is matches with any of the
/// member in the cluster.
pub fn get_member(&self, id: &Uuid) -> Option<ArtilleryMember> {
let member: Vec<_> = self
Expand Down
Loading

0 comments on commit fe12258

Please sign in to comment.