Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions activator/src/activator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use doublezero_sdk::{
link::list::ListLinkCommand, location::list::ListLocationCommand,
user::list::ListUserCommand,
},
AccountData, DZClient, Device, DeviceStatus, Exchange, GetGlobalConfigCommand, LinkStatus,
Location, MulticastGroup, ProgramVersion, UserStatus,
AccountData, DZClient, Device, DeviceStatus, Exchange, GetGlobalConfigCommand, InterfaceType,
LinkStatus, Location, MulticastGroup, ProgramVersion, UserStatus,
};
use log::{debug, error, info, warn};
use solana_sdk::pubkey::Pubkey;
Expand All @@ -37,6 +37,7 @@ pub struct Activator {
pub client: DZClient,

pub link_ids: IDAllocator,
pub segment_routing_ids: IDAllocator,
pub link_ips: IPBlockAllocator,
pub multicastgroup_tunnel_ips: IPBlockAllocator,

Expand Down Expand Up @@ -93,6 +94,7 @@ impl Activator {
client,
link_ips: IPBlockAllocator::new(config.device_tunnel_block.into()),
link_ids: IDAllocator::new(0, vec![]),
segment_routing_ids: IDAllocator::new(1, vec![]),
multicastgroup_tunnel_ips: IPBlockAllocator::new(config.multicastgroup_block.into()),
user_tunnel_ips: IPBlockAllocator::new(config.user_tunnel_block.into()),
devices: HashMap::new(),
Expand Down Expand Up @@ -127,6 +129,14 @@ impl Activator {
.iter()
.filter(|(_, d)| d.status == DeviceStatus::Activated)
{
device.interfaces.iter().for_each(|interface| {
if interface.node_segment_idx > 0 {
self.segment_routing_ids.assign(interface.node_segment_idx);
}
if interface.interface_type == InterfaceType::Loopback {
self.link_ips.assign_block(interface.ip_net.into());
}
});
self.add_device(pubkey, device);
}

Expand Down Expand Up @@ -198,14 +208,23 @@ impl Activator {
let multicastgroups = &mut self.multicastgroups;
let solana_info = &self.solana_info;
let state_transitions = &mut self.state_transitions;
let segment_routing_ids = &mut self.segment_routing_ids;

self.client.gets_and_subscribe(
move |client, pubkey, data| {
debug!("Event: {pubkey} {data:?}");

match data {
AccountData::Device(device) => {
process_device_event(client, pubkey, devices, device, state_transitions);
process_device_event(
client,
pubkey,
devices,
device,
state_transitions,
segment_routing_ids,
link_ips,
);
}
AccountData::Link(tunnel) => {
process_tunnel_event(
Expand Down
198 changes: 190 additions & 8 deletions activator/src/process/device.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use crate::{idallocator::IDAllocator, ipblockallocator::IPBlockAllocator};
use doublezero_sdk::{
commands::device::{activate::ActivateDeviceCommand, closeaccount::CloseAccountDeviceCommand},
Device, DeviceStatus, DoubleZeroClient,
commands::device::{
activate::ActivateDeviceCommand, closeaccount::CloseAccountDeviceCommand,
update::UpdateDeviceCommand,
},
Device, DeviceStatus, DoubleZeroClient, InterfaceType, LoopbackType, NetworkV4,
};
use log::info;
use log::{error, info};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::{hash_map::Entry, HashMap},
Expand All @@ -17,6 +21,8 @@ pub fn process_device_event(
devices: &mut DeviceMap,
device: &Device,
state_transitions: &mut HashMap<&'static str, usize>,
segment_routing_ids: &mut IDAllocator,
link_ips: &mut IPBlockAllocator,
) {
match device.status {
DeviceStatus::Pending => {
Expand Down Expand Up @@ -84,17 +90,84 @@ pub fn process_device_event(
}
_ => {}
}

if device.status == DeviceStatus::Deleting || device.status == DeviceStatus::Rejected {
return;
}

let mut do_update = false;
let mut interfaces = device.interfaces.clone();
for interface in &mut interfaces {
if interface.interface_type == InterfaceType::Loopback {
if interface.node_segment_idx == 0 && interface.loopback_type == LoopbackType::Vpnv4 {
let id = segment_routing_ids.next_available();
info!(
"Assigning segment routing ID {} to device {} interface {}",
id, device.code, interface.name
);
interface.node_segment_idx = id;
do_update = true;
}
if interface.ip_net == NetworkV4::default() {
// Assign a loopback IP if not already set
interface.ip_net = link_ips
.next_available_block(1, 1)
.unwrap_or_else(|| {
error!(
"No available loopback IP block for device {} interface {}",
device.code, interface.name
);
"0.0.0.0/0".parse().unwrap()
})
.into();
info!(
"Assigning IP {} to device {} interface {}",
interface.ip_net, device.code, interface.name
);
do_update = true;
}
}
}

if do_update {
info!("Updating device {}", device.code);
UpdateDeviceCommand {
pubkey: *pubkey,
code: None,
device_type: None,
public_ip: None,
dz_prefixes: None,
metrics_publisher: None,
contributor_pk: None,
bgp_asn: None,
dia_bgp_asn: None,
mgmt_vrf: None,
dns_servers: None,
ntp_servers: None,
interfaces: Some(interfaces),
}
.execute(client)
.map_err(|err| {
error!("Failed to update device {}: {}", device.code, err);
})
.ok();
}
}

#[cfg(test)]
mod tests {
use crate::tests::utils::{create_test_client, get_device_bump_seed};

use super::*;
use doublezero_sdk::{AccountData, AccountType, DeviceType};
use doublezero_sdk::{
AccountData, AccountType, DeviceType, Interface, CURRENT_INTERFACE_VERSION,
};
use doublezero_serviceability::{
instructions::DoubleZeroInstruction,
processors::device::{activate::DeviceActivateArgs, closeaccount::DeviceCloseAccountArgs},
processors::device::{
activate::DeviceActivateArgs, closeaccount::DeviceCloseAccountArgs,
update::DeviceUpdateArgs,
},
};
use mockall::{predicate, Sequence};
use solana_sdk::signature::Signature;
Expand All @@ -105,6 +178,7 @@ mod tests {
let mut seq = Sequence::new();
let mut devices = HashMap::new();
let mut client = create_test_client();
let mut segment_ids = IDAllocator::new(1, vec![]);

let device_pubkey = Pubkey::from_str_const("8KvLQiyKgrK3KyVGVVyT1Pmg7ahPVFsvHUVPg97oYynV");
let mut device = Device {
Expand All @@ -127,9 +201,34 @@ mod tests {
mgmt_vrf: "default".to_string(),
dns_servers: vec![[8, 8, 8, 8].into(), [8, 8, 4, 4].into()],
ntp_servers: vec![[192, 168, 1, 1].into(), [192, 168, 1, 2].into()],
interfaces: vec![],
interfaces: vec![
Interface {
version: CURRENT_INTERFACE_VERSION,
name: "eth0".to_string(),
interface_type: InterfaceType::Physical,
loopback_type: LoopbackType::None,
vlan_id: 0,
ip_net: NetworkV4::default(),
node_segment_idx: 0,
user_tunnel_endpoint: false,
},
Interface {
version: CURRENT_INTERFACE_VERSION,
name: "lo0".to_string(),
interface_type: InterfaceType::Loopback,
loopback_type: LoopbackType::Vpnv4,
vlan_id: 0,
ip_net: NetworkV4::default(),
node_segment_idx: 0,
user_tunnel_endpoint: false,
},
],
};

let mut expected_interfaces = device.interfaces.clone();
expected_interfaces[1].ip_net = "1.1.1.1/32".parse().unwrap();
expected_interfaces[1].node_segment_idx = 1;

let mut state_transitions: HashMap<&'static str, usize> = HashMap::new();

client
Expand All @@ -142,12 +241,39 @@ mod tests {
)
.returning(|_, _| Ok(Signature::new_unique()));

client
.expect_execute_transaction()
.times(1)
.in_sequence(&mut seq)
.with(
predicate::eq(DoubleZeroInstruction::UpdateDevice(DeviceUpdateArgs {
code: None,
device_type: None,
public_ip: None,
dz_prefixes: None,
metrics_publisher_pk: None,
contributor_pk: None,
bgp_asn: None,
dia_bgp_asn: None,
mgmt_vrf: None,
dns_servers: None,
ntp_servers: None,
interfaces: Some(expected_interfaces.clone()),
})),
predicate::always(),
)
.returning(|_, _| Ok(Signature::new_unique()));

let mut ip_block_allocator = IPBlockAllocator::new("1.1.1.0/24".parse().unwrap());

process_device_event(
&client,
&device_pubkey,
&mut devices,
&device,
&mut state_transitions,
&mut segment_ids,
&mut ip_block_allocator,
);

assert!(devices.contains_key(&device_pubkey));
Expand Down Expand Up @@ -183,6 +309,8 @@ mod tests {
&mut devices,
&device,
&mut state_transitions,
&mut segment_ids,
&mut ip_block_allocator,
);
assert!(!devices.contains_key(&device_pubkey));
assert_eq!(state_transitions.len(), 2);
Expand All @@ -193,8 +321,9 @@ mod tests {
#[test]
fn test_process_device_event_activated() {
let mut devices = HashMap::new();
let client = create_test_client();
let mut client = create_test_client();
let pubkey = Pubkey::new_unique();
let mut segment_ids = IDAllocator::new(1, vec![]);

let mut device = Device {
account_type: AccountType::Device,
Expand All @@ -216,29 +345,82 @@ mod tests {
mgmt_vrf: "default".to_string(),
dns_servers: vec![[8, 8, 8, 8].into(), [8, 8, 4, 4].into()],
ntp_servers: vec![[192, 168, 1, 1].into(), [192, 168, 1, 2].into()],
interfaces: vec![],
interfaces: vec![
Interface {
version: CURRENT_INTERFACE_VERSION,
name: "eth0".to_string(),
interface_type: InterfaceType::Physical,
loopback_type: LoopbackType::None,
vlan_id: 0,
ip_net: NetworkV4::default(),
node_segment_idx: 0,
user_tunnel_endpoint: false,
},
Interface {
version: CURRENT_INTERFACE_VERSION,
name: "lo0".to_string(),
interface_type: InterfaceType::Loopback,
loopback_type: LoopbackType::Vpnv4,
vlan_id: 0,
ip_net: NetworkV4::default(),
node_segment_idx: 0,
user_tunnel_endpoint: false,
},
],
};

let mut state_transitions: HashMap<&'static str, usize> = HashMap::new();
let mut ip_block_allocator = IPBlockAllocator::new("1.1.1.0/24".parse().unwrap());

let mut expected_interfaces = device.interfaces.clone();
expected_interfaces[1].ip_net = "1.1.1.1/32".parse().unwrap();
expected_interfaces[1].node_segment_idx = 1;

client
.expect_execute_transaction()
.times(1)
.with(
predicate::eq(DoubleZeroInstruction::UpdateDevice(DeviceUpdateArgs {
code: None,
device_type: None,
public_ip: None,
dz_prefixes: None,
metrics_publisher_pk: None,
contributor_pk: None,
bgp_asn: None,
dia_bgp_asn: None,
mgmt_vrf: None,
dns_servers: None,
ntp_servers: None,
interfaces: Some(expected_interfaces.clone()),
})),
predicate::always(),
)
.returning(|_, _| Ok(Signature::new_unique()));

process_device_event(
&client,
&pubkey,
&mut devices,
&device,
&mut state_transitions,
&mut segment_ids,
&mut ip_block_allocator,
);

assert!(devices.contains_key(&pubkey));
assert_eq!(devices.get(&pubkey).unwrap().device, device);

device.dz_prefixes.push("10.0.1.1/24".parse().unwrap());
device.interfaces = expected_interfaces;
process_device_event(
&client,
&pubkey,
&mut devices,
&device,
&mut state_transitions,
&mut segment_ids,
&mut ip_block_allocator,
);

assert!(devices.contains_key(&pubkey));
Expand Down
Loading
Loading