Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion activator/src/process/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use doublezero_sdk::{
},
DoubleZeroClient, Link, LinkStatus,
};
use doublezero_serviceability::error::DoubleZeroError;
use log::info;
use solana_sdk::pubkey::Pubkey;
use std::fmt::Write;
Expand Down Expand Up @@ -70,7 +71,19 @@ pub fn process_link_event(
)
.increment(1);
}
Err(e) => write!(&mut log_msg, " Error {e}").unwrap(),
Err(e) => {
if let Some(dz_err) = e.downcast_ref::<DoubleZeroError>() {
if matches!(dz_err, DoubleZeroError::InvalidStatus) {
write!(&mut log_msg, " [already activated]").unwrap();
metrics::counter!("doublezero_activator_invalid_status_encountered", "entity_type" => "link")
.increment(1);
} else {
write!(&mut log_msg, " Error {e}").unwrap();
}
} else {
write!(&mut log_msg, " Error {e}").unwrap();
}
}
}
}
None => {
Expand Down
4 changes: 3 additions & 1 deletion activator/src/process/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ fn log_error_ignore_invalid_status(log_msg: &mut String, e: eyre::ErrReport) {
// Ignore DoubleZeroError::InvalidStatus errors since this only happens when the user is already activated
if let Some(dz_err) = e.downcast_ref::<DoubleZeroError>() {
if matches!(dz_err, DoubleZeroError::InvalidStatus) {
// Do nothing
write!(log_msg, " [already activated]").unwrap();
metrics::counter!("doublezero_activator_invalid_status_encountered", "entity_type" => "user")
.increment(1);
} else {
write!(log_msg, "Error: {e}").unwrap();
}
Expand Down
235 changes: 234 additions & 1 deletion activator/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use doublezero_sdk::{
use log::{debug, error, info, warn};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
Expand All @@ -46,6 +46,7 @@ pub struct Processor<T: DoubleZeroClient> {
locations: LocationMap,
exchanges: ExchangeMap,
multicastgroups: MulticastGroupMap,
in_flight_activations: HashSet<Pubkey>,
}

impl<T: DoubleZeroClient> Processor<T> {
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<T: DoubleZeroClient> Processor<T> {
locations,
exchanges,
multicastgroups: HashMap::new(),
in_flight_activations: HashSet::new(),
})
}

Expand Down Expand Up @@ -171,15 +173,46 @@ impl<T: DoubleZeroClient> Processor<T> {
);
}
AccountData::Link(link) => {
if link.status == LinkStatus::Pending {
if self.in_flight_activations.contains(pubkey) {
info!(
"Skipping duplicate Link(Pending) event for {} - activation already in flight",
pubkey
);
metrics::counter!("doublezero_activator_duplicate_event_skipped", "entity_type" => "link")
.increment(1);
return;
}
self.in_flight_activations.insert(*pubkey);
}

process_link_event(
self.client.as_ref(),
pubkey,
&mut self.link_ips,
&mut self.link_ids,
link,
);

self.in_flight_activations.remove(pubkey);
}
AccountData::User(user) => {
let is_activation_trigger =
user.status == UserStatus::Pending || user.status == UserStatus::Updating;

if is_activation_trigger {
if self.in_flight_activations.contains(pubkey) {
info!(
"Skipping duplicate User({:?}) event for {} - activation already in flight",
user.status, pubkey
);
metrics::counter!("doublezero_activator_duplicate_event_skipped", "entity_type" => "user")
.increment(1);
return;
}
self.in_flight_activations.insert(*pubkey);
}

process_user_event(
self.client.as_ref(),
pubkey,
Expand All @@ -190,6 +223,8 @@ impl<T: DoubleZeroClient> Processor<T> {
&self.locations,
&self.exchanges,
);

self.in_flight_activations.remove(pubkey);
}
AccountData::Location(location) => {
process_location_event(pubkey, &mut self.locations, location);
Expand Down Expand Up @@ -224,3 +259,201 @@ impl<T: DoubleZeroClient> Processor<T> {
metrics::counter!("doublezero_activator_event_handled").increment(1);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::MetricsSnapshot;
use doublezero_program_common::types::NetworkV4;
use doublezero_sdk::{AccountType, User, UserCYOA, UserStatus, UserType};
use metrics_util::debugging::DebuggingRecorder;
use std::net::Ipv4Addr;

#[test]
fn test_duplicate_user_pending_event_is_skipped() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let user_pubkey = Pubkey::new_unique();
let user = User {
account_type: AccountType::User,
owner: Pubkey::new_unique(),
index: 0,
bump_seed: 0,
user_type: UserType::IBRL,
tenant_pk: Pubkey::new_unique(),
device_pk: Pubkey::new_unique(),
cyoa_type: UserCYOA::GREOverDIA,
client_ip: [192, 168, 1, 1].into(),
dz_ip: Ipv4Addr::UNSPECIFIED,
tunnel_id: 0,
tunnel_net: NetworkV4::default(),
status: UserStatus::Pending,
publishers: vec![],
subscribers: vec![],
validator_pubkey: Pubkey::default(),
};

let mut in_flight: HashSet<Pubkey> = HashSet::new();

let is_activation_trigger =
user.status == UserStatus::Pending || user.status == UserStatus::Updating;

assert!(is_activation_trigger);
assert!(!in_flight.contains(&user_pubkey));
in_flight.insert(user_pubkey);

if in_flight.contains(&user_pubkey) {
metrics::counter!("doublezero_activator_duplicate_event_skipped", "entity_type" => "user")
.increment(1);
} else {
panic!("Expected in_flight to contain the pubkey");
}

let mut snapshot = MetricsSnapshot::new(snapshotter.snapshot());
snapshot
.expect_counter(
"doublezero_activator_duplicate_event_skipped",
vec![("entity_type", "user")],
1,
)
.verify();
});
}

#[test]
fn test_duplicate_link_pending_event_is_skipped() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
use doublezero_sdk::{Link, LinkLinkType, LinkStatus};

let link_pubkey = Pubkey::new_unique();
let link = Link {
account_type: AccountType::Link,
owner: Pubkey::new_unique(),
index: 0,
bump_seed: 0,
contributor_pk: Pubkey::new_unique(),
side_a_pk: Pubkey::new_unique(),
side_z_pk: Pubkey::new_unique(),
link_type: LinkLinkType::WAN,
bandwidth: 10_000_000_000,
mtu: 1500,
delay_ns: 20_000,
jitter_ns: 100,
delay_override_ns: 0,
tunnel_id: 0,
tunnel_net: "10.1.2.0/31".parse().unwrap(),
status: LinkStatus::Pending,
code: "TestLink".to_string(),
side_a_iface_name: "Ethernet0".to_string(),
side_z_iface_name: "Ethernet1".to_string(),
};

let mut in_flight: HashSet<Pubkey> = HashSet::new();

let is_activation_trigger = link.status == LinkStatus::Pending;
assert!(is_activation_trigger);
assert!(!in_flight.contains(&link_pubkey));
in_flight.insert(link_pubkey);

if in_flight.contains(&link_pubkey) {
metrics::counter!("doublezero_activator_duplicate_event_skipped", "entity_type" => "link")
.increment(1);
}

let mut snapshot = MetricsSnapshot::new(snapshotter.snapshot());
snapshot
.expect_counter(
"doublezero_activator_duplicate_event_skipped",
vec![("entity_type", "link")],
1,
)
.verify();
});
}

#[test]
fn test_terminal_state_not_subject_to_in_flight_guard() {
let user = User {
account_type: AccountType::User,
owner: Pubkey::new_unique(),
index: 0,
bump_seed: 0,
user_type: UserType::IBRL,
tenant_pk: Pubkey::new_unique(),
device_pk: Pubkey::new_unique(),
cyoa_type: UserCYOA::GREOverDIA,
client_ip: [192, 168, 1, 1].into(),
dz_ip: [10, 0, 0, 1].into(),
tunnel_id: 500,
tunnel_net: "169.254.0.0/31".parse().unwrap(),
status: UserStatus::Activated,
publishers: vec![],
subscribers: vec![],
validator_pubkey: Pubkey::default(),
};

let is_activation_trigger =
user.status == UserStatus::Pending || user.status == UserStatus::Updating;

assert!(
!is_activation_trigger,
"Activated status should NOT be an activation trigger"
);

}

#[test]
fn test_updating_status_triggers_in_flight_guard() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let user_pubkey = Pubkey::new_unique();
let user = User {
account_type: AccountType::User,
owner: Pubkey::new_unique(),
index: 0,
bump_seed: 0,
user_type: UserType::IBRL,
tenant_pk: Pubkey::new_unique(),
device_pk: Pubkey::new_unique(),
cyoa_type: UserCYOA::GREOverDIA,
client_ip: [192, 168, 1, 1].into(),
dz_ip: [10, 0, 0, 1].into(),
tunnel_id: 500,
tunnel_net: "169.254.0.0/31".parse().unwrap(),
status: UserStatus::Updating,
publishers: vec![],
subscribers: vec![],
validator_pubkey: Pubkey::default(),
};

let mut in_flight: HashSet<Pubkey> = HashSet::new();

let is_activation_trigger =
user.status == UserStatus::Pending || user.status == UserStatus::Updating;
assert!(is_activation_trigger, "Updating should be an activation trigger");

in_flight.insert(user_pubkey);

if in_flight.contains(&user_pubkey) {
metrics::counter!("doublezero_activator_duplicate_event_skipped", "entity_type" => "user")
.increment(1);
}

let mut snapshot = MetricsSnapshot::new(snapshotter.snapshot());
snapshot
.expect_counter(
"doublezero_activator_duplicate_event_skipped",
vec![("entity_type", "user")],
1,
)
.verify();
});
}
}