Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All notable changes to this project will be documented in this file.

- CLI
- Remove log noise on resolve route
- Remove "unknown" status from doublezero status command and implement "failed" and "unreachable" statuses
- Onchain programs
- Removed device and user allowlist functionality, updating the global state, initialization flow, tests, and processors accordingly, and cleaning up unused account checks.
- Device Health Oracle
Expand Down
20 changes: 14 additions & 6 deletions client/doublezero/src/command/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ impl StatusCliCommand {
let is_current = user
.map(|u| best.device_pk == u.device_pk.to_string())
.unwrap_or(false);
if self.json || response.doublezero_status.session_status != "up" {
if self.json
|| response.doublezero_status.session_status != "BGP Session Up"
{
best.device_code
} else if is_current || current_device.is_none() {
format!("✅ {}", best.device_code)
Expand Down Expand Up @@ -128,7 +130,7 @@ mod tests {
let mut exchanges = std::collections::HashMap::<Pubkey, Exchange>::new();
let status_responses = vec![StatusResponse {
doublezero_status: DoubleZeroStatus {
session_status: "up".to_string(),
session_status: "BGP Session Up".to_string(),
last_session_update: Some(1625247600),
},
tunnel_name: Some("tunnel_name".to_string()),
Expand Down Expand Up @@ -301,7 +303,10 @@ mod tests {
let result = result.unwrap();
assert_eq!(result.len(), 1);
let status_response = &result[0].response;
assert_eq!(status_response.doublezero_status.session_status, "up");
assert_eq!(
status_response.doublezero_status.session_status,
"BGP Session Up"
);
assert_eq!(status_response.tunnel_name.as_deref(), Some("tunnel_name"));
assert_eq!(status_response.tunnel_src.as_deref(), Some("1.2.3.4"));
assert_eq!(status_response.tunnel_dst.as_deref(), Some("42.42.42.42"));
Expand All @@ -323,7 +328,7 @@ mod tests {
let mut exchanges = std::collections::HashMap::<Pubkey, Exchange>::new();
let status_responses = vec![StatusResponse {
doublezero_status: DoubleZeroStatus {
session_status: "down".to_string(),
session_status: "BGP Session Down".to_string(),
last_session_update: None,
},
tunnel_name: None,
Expand Down Expand Up @@ -473,7 +478,10 @@ mod tests {
let result = result.unwrap();
assert_eq!(result.len(), 1);
let status_response = &result[0].response;
assert_eq!(status_response.doublezero_status.session_status, "down");
assert_eq!(
status_response.doublezero_status.session_status,
"BGP Session Down"
);
assert_eq!(status_response.tunnel_name.as_deref(), None);
assert_eq!(status_response.tunnel_src.as_deref(), None);
assert_eq!(status_response.tunnel_dst.as_deref(), None);
Expand All @@ -495,7 +503,7 @@ mod tests {
let mut exchanges = std::collections::HashMap::<Pubkey, Exchange>::new();
let status_responses = vec![StatusResponse {
doublezero_status: DoubleZeroStatus {
session_status: "up".to_string(),
session_status: "BGP Session Up".to_string(),
last_session_update: Some(1625247600),
},
tunnel_name: Some("tunnel_name".to_string()),
Expand Down
32 changes: 18 additions & 14 deletions client/doublezerod/internal/bgp/bgp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type SessionEvent struct {
type SessionStatus int

const (
SessionStatusunknown SessionStatus = iota
SessionStatusPending
SessionStatusPending SessionStatus = iota
SessionStatusInitializing
SessionStatusDown
SessionStatusUp
SessionStatusFailed
SessionStatusUnreachable
)

func (s *Session) MarshalJSON() ([]byte, error) {
Expand All @@ -51,22 +52,23 @@ type Session struct {

func (s SessionStatus) String() string {
return [...]string{
"unknown",
"pending",
"initializing",
"down",
"up",
"Pending BGP Session",
"Initializing BGP Session",
"BGP Session Down",
"BGP Session Up",
"BGP Session Failed",
"Network Unreachable",
}[s]

}

func (s SessionStatus) FromString(sessionStatus string) SessionStatus {
return map[string]SessionStatus{
"unknown": SessionStatusunknown,
"pending": SessionStatusPending,
"initializing": SessionStatusInitializing,
"down": SessionStatusDown,
"up": SessionStatusUp,
"Pending BGP Session": SessionStatusPending,
"Initializing BGP Session": SessionStatusInitializing,
"BGP Session Down": SessionStatusDown,
"BGP Session Up": SessionStatusUp,
"BGP Session Failed": SessionStatusFailed,
"Network Unreachable": SessionStatusUnreachable,
}[sessionStatus]
}

Expand Down Expand Up @@ -153,6 +155,8 @@ func (b *BgpServer) AddPeer(p *PeerConfig, advertised []NLRI) error {
rrw = liveness.NewRouteReaderWriter(b.livenessManager, b.routeReaderWriter, p.Interface, p.NoUninstall)
}
plugin := NewBgpPlugin(advertised, p.RouteSrc, p.RouteTable, b.peerStatusChan, p.NoInstall, rrw)
plugin.peerAddr = p.RemoteAddress
plugin.startSessionTimeout()
err := b.server.AddPeer(corebgp.PeerConfig{
RemoteAddress: netip.MustParseAddr(p.RemoteAddress.String()),
LocalAS: p.LocalAs,
Expand Down Expand Up @@ -189,5 +193,5 @@ func (b *BgpServer) GetPeerStatus(ip net.IP) Session {
if peerStatus, ok := b.peerStatus[ip.String()]; ok {
return peerStatus
}
return Session{SessionStatus: SessionStatusunknown}
return Session{SessionStatus: SessionStatusPending}
}
68 changes: 68 additions & 0 deletions client/doublezerod/internal/bgp/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bgp

import (
"context"
"log/slog"
"net"
"net/netip"
Expand All @@ -24,8 +25,20 @@ type Plugin struct {
// These fields are used to track the initial establishment of the BGP session.
startedAt time.Time
initialallyEstablished atomic.Bool
currentlyEstablished atomic.Bool // for timeout, reset on close

// peerAddr is stored so the timeout goroutine can emit events
peerAddr net.IP

cancelTimeout context.CancelFunc

tcpConnected atomic.Bool // set when GetCapabilities is called
}

const (
BGPSessionTimeout = 30 * time.Second
)

func NewBgpPlugin(
advertised []NLRI,
routeSrc net.IP,
Expand All @@ -45,15 +58,57 @@ func NewBgpPlugin(
}

func (p *Plugin) GetCapabilities(peer corebgp.PeerConfig) []corebgp.Capability {
p.tcpConnected.Store(true)
caps := make([]corebgp.Capability, 0)
caps = append(caps, corebgp.NewMPExtensionsCapability(corebgp.AFI_IPV4, corebgp.SAFI_UNICAST))
p.peerAddr = net.ParseIP(peer.RemoteAddress.String())
p.PeerStatusChan <- SessionEvent{
PeerAddr: net.ParseIP(peer.RemoteAddress.String()),
Session: Session{SessionStatus: SessionStatusPending, LastSessionUpdate: time.Now().Unix()},
}
return caps
}

func (p *Plugin) startSessionTimeout() {
if p.cancelTimeout != nil {
p.cancelTimeout()
}
ctx, cancel := context.WithCancel(context.Background())
p.cancelTimeout = cancel
go func() {
select {
case <-ctx.Done():
return
case <-time.After(BGPSessionTimeout):
p.emitTimeoutStatus()
}
}()
}

// emitTimeoutStatus checks the current session state and emits the appropriate
// timeout status (Failed or Unreachable)
func (p *Plugin) emitTimeoutStatus() bool {
if p.currentlyEstablished.Load() {
return false
}

var status SessionStatus
if !p.tcpConnected.Load() {
status = SessionStatusUnreachable
slog.Warn("bgp: network unreachable - TCP connection never established", "peer", p.peerAddr)
} else {
status = SessionStatusFailed
slog.Warn("bgp: session failed - BGP handshake incomplete", "peer", p.peerAddr)
}

p.PeerStatusChan <- SessionEvent{
PeerAddr: p.peerAddr,
Session: Session{SessionStatus: status, LastSessionUpdate: time.Now().Unix()},
}
MetricSessionStatus.Set(0)
return true
}

func (p *Plugin) OnOpenMessage(peer corebgp.PeerConfig, routerID netip.Addr, capabilities []corebgp.Capability) *corebgp.Notification {
slog.Info("bgp: peer initializing", "peer", peer.RemoteAddress)
p.PeerStatusChan <- SessionEvent{
Expand All @@ -65,6 +120,12 @@ func (p *Plugin) OnOpenMessage(peer corebgp.PeerConfig, routerID netip.Addr, cap
}

func (p *Plugin) OnEstablished(peer corebgp.PeerConfig, writer corebgp.UpdateMessageWriter) corebgp.UpdateMessageHandler {
if p.cancelTimeout != nil {
p.cancelTimeout()
}

p.currentlyEstablished.Store(true)

if p.initialallyEstablished.CompareAndSwap(false, true) {
// If this is the first time we've established the session, record the duration.
// If the session is closed and then re-established within the lifetime of the same BGP plugin,
Expand Down Expand Up @@ -95,6 +156,12 @@ func (p *Plugin) OnEstablished(peer corebgp.PeerConfig, writer corebgp.UpdateMes
}

func (p *Plugin) OnClose(peer corebgp.PeerConfig) {
if p.cancelTimeout != nil {
p.cancelTimeout()
}

p.currentlyEstablished.Store(false)

slog.Info("bgp: peer closed", "peer", peer.RemoteAddress)
p.PeerStatusChan <- SessionEvent{
PeerAddr: net.ParseIP(peer.RemoteAddress.String()),
Expand All @@ -115,6 +182,7 @@ func (p *Plugin) OnClose(peer corebgp.PeerConfig) {
}

MetricSessionStatus.Set(0)
p.startSessionTimeout() // start a new timeout for the next session
}

func (p *Plugin) handleUpdate(peer corebgp.PeerConfig, u []byte) *corebgp.Notification {
Expand Down
60 changes: 60 additions & 0 deletions client/doublezerod/internal/bgp/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,63 @@ func TestClient_BGPPlugin_BuildUpdateRoundTrip(t *testing.T) {
require.Equal(t, nlri.AsPath[i], gotAsPath[i], "unexpected AS at index %d", i)
}
}

func TestEmitTimeoutStatus_Unreachable(t *testing.T) {
// When tcpConnected is false and session is not established,
// emitTimeoutStatus should emit SessionStatusUnreachable
statusChan := make(chan SessionEvent, 1)
plugin := &Plugin{
PeerStatusChan: statusChan,
peerAddr: net.ParseIP("10.0.0.1"),
}

// Neither tcpConnected nor currentlyEstablished are set (both default to false)
emitted := plugin.emitTimeoutStatus()

require.True(t, emitted, "expected status to be emitted")
event := <-statusChan
require.Equal(t, SessionStatusUnreachable, event.Session.SessionStatus)
require.True(t, event.PeerAddr.Equal(net.ParseIP("10.0.0.1")))
}

func TestEmitTimeoutStatus_Failed(t *testing.T) {
// When tcpConnected is true but session is not established,
// emitTimeoutStatus should emit SessionStatusFailed
statusChan := make(chan SessionEvent, 1)
plugin := &Plugin{
PeerStatusChan: statusChan,
peerAddr: net.ParseIP("10.0.0.1"),
}

// TCP connected but BGP handshake didn't complete
plugin.tcpConnected.Store(true)

emitted := plugin.emitTimeoutStatus()

require.True(t, emitted, "expected status to be emitted")
event := <-statusChan
require.Equal(t, SessionStatusFailed, event.Session.SessionStatus)
require.True(t, event.PeerAddr.Equal(net.ParseIP("10.0.0.1")))
}

func TestEmitTimeoutStatus_NoEmitWhenEstablished(t *testing.T) {
// When session is already established, emitTimeoutStatus should not emit
statusChan := make(chan SessionEvent, 1)
plugin := &Plugin{
PeerStatusChan: statusChan,
peerAddr: net.ParseIP("10.0.0.1"),
}

// Session is established
plugin.currentlyEstablished.Store(true)

emitted := plugin.emitTimeoutStatus()

require.False(t, emitted, "expected no status to be emitted when established")
select {
case <-statusChan:
t.Fatal("unexpected event in status channel")
default:
// Expected: channel should be empty
}
}
2 changes: 1 addition & 1 deletion client/doublezerod/internal/manager/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestHttpStatus(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Fatalf("wanted 200 response; got %d", resp.StatusCode)
}
want := `[{"tunnel_name":"doublezero0","tunnel_src":"1.1.1.1","tunnel_dst":"2.2.2.2","doublezero_ip":"3.3.3.3","doublezero_status":{"session_status":"unknown","last_session_update":0},"user_type":"IBRL"}]` + "\n"
want := `[{"tunnel_name":"doublezero0","tunnel_src":"1.1.1.1","tunnel_dst":"2.2.2.2","doublezero_ip":"3.3.3.3","doublezero_status":{"session_status":"Pending BGP Session","last_session_update":0},"user_type":"IBRL"}]` + "\n"
got, _ := io.ReadAll(resp.Body)
if diff := cmp.Diff(want, string(got), cmpopts.IgnoreFields(bgp.Session{}, "LastSessionUpdate")); diff != "" {
t.Fatalf("Response body mismatch (-want +got): %s\n", diff)
Expand Down
2 changes: 1 addition & 1 deletion e2e/fixtures/ibrl/doublezero_status_connected.tmpl
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Tunnel Status | Last Session Update | Tunnel Name | Tunnel Src | Tunnel Dst | Doublezero IP | User Type | Current Device | Lowest Latency Device | Metro | Network
up | 2025-10-14 17:59:41 UTC | doublezero0 | {{.ClientIP}} | {{.DeviceIP}} | {{.ClientIP}} | IBRL | ny5-dz01 | ✅ ny5-dz01 | New York | local
BGP Session Up | 2025-10-14 17:59:41 UTC | doublezero0 | {{.ClientIP}} | {{.DeviceIP}} | {{.ClientIP}} | IBRL | ny5-dz01 | ✅ ny5-dz01 | New York | local
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Tunnel Status | Last Session Update | Tunnel Name | Tunnel Src | Tunnel Dst | Doublezero IP | User Type | Current Device | Lowest Latency Device | Metro | Network
up | 2025-06-03 19:58:21 UTC | doublezero0 | {{.ClientIP}} | {{.DeviceIP}} | {{.ExpectedAllocatedClientIP}} | IBRLWithAllocatedIP | ny5-dz01 | ✅ ny5-dz01 | New York | local
BGP Session Up | 2025-06-03 19:58:21 UTC | doublezero0 | {{.ClientIP}} | {{.DeviceIP}} | {{.ExpectedAllocatedClientIP}} | IBRLWithAllocatedIP | ny5-dz01 | ✅ ny5-dz01 | New York | local
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Tunnel Status | Last Session Update | Tunnel Name | Tunnel Src | Tunnel Dst | Doublezero IP | User Type | Current Device | Lowest Latency Device | Metro | Network
up | 2025-06-03 19:58:21 UTC | doublezero1 | {{.ClientIP}} | {{.DeviceIP}} | {{.ExpectedAllocatedClientIP}} | Multicast | ny5-dz01 | ✅ ny5-dz01 | New York | local
BGP Session Up | 2025-06-03 19:58:21 UTC | doublezero1 | {{.ClientIP}} | {{.DeviceIP}} | {{.ExpectedAllocatedClientIP}} | Multicast | ny5-dz01 | ✅ ny5-dz01 | New York | local
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Tunnel Status | Last Session Update | Tunnel Name | Tunnel Src | Tunnel Dst | Doublezero IP | User Type | Current Device | Lowest Latency Device | Metro | Network
up | 2025-06-03 19:58:21 UTC | doublezero1 | {{.ClientIP}} | {{.DeviceIP}} | | Multicast | N/A | ✅ ny5-dz01 | N/A | local
BGP Session Up | 2025-06-03 19:58:21 UTC | doublezero1 | {{.ClientIP}} | {{.DeviceIP}} | | Multicast | N/A | ✅ ny5-dz01 | N/A | local
4 changes: 2 additions & 2 deletions e2e/internal/devnet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ type ClientSession struct {
}

const (
ClientSessionStatusUp ClientSessionStatus = "up"
ClientSessionStatusDown ClientSessionStatus = "down"
ClientSessionStatusUp ClientSessionStatus = "BGP Session Up"
ClientSessionStatusDown ClientSessionStatus = "BGP Session Down"
ClientSessionStatusDisconnected ClientSessionStatus = "disconnected"
)

Expand Down
2 changes: 1 addition & 1 deletion e2e/internal/qa/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
grpcDialTimeout = 10 * time.Second
grpcDialMaxRetries = 5

UserStatusUp = "up"
UserStatusUp = "BGP Session Up"
UserStatusDisconnected = "disconnected"
)

Expand Down
4 changes: 2 additions & 2 deletions e2e/internal/rpc/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestQAAgentConnectivity(t *testing.T) {
// Create a mock HTTP server to simulate the doublezerod unix socket API
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/status" {
_, _ = w.Write([]byte(`[{"tunnel_name":"dz-1","doublezero_ip":"100.64.0.1","user_type":"ibrl","doublezero_status":{"session_status":"up"}}]`))
_, _ = w.Write([]byte(`[{"tunnel_name":"dz-1","doublezero_ip":"100.64.0.1","user_type":"ibrl","doublezero_status":{"session_status":"BGP Session Up"}}]`))
}
if r.URL.Path == "/latency" {
_, _ = w.Write([]byte(`[{"device_pk":"8PQkip3CxWhQTdP7doCyhT2kwjSL2csRTdnRg2zbDPs1","device_code":"chi-dn-dzd1","device_ip":"100.0.0.1","min_latency_ns":24989983,"max_latency_ns":25115111,"avg_latency_ns":25063568,"reachable":true}]`))
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestQAAgentConnectivity(t *testing.T) {
statusResult, err := client.GetStatus(ctx, &emptypb.Empty{})
require.NoError(t, err)
require.NotNil(t, statusResult)
require.Equal(t, "up", statusResult.GetStatus()[0].GetSessionStatus())
require.Equal(t, "BGP Session Up", statusResult.GetStatus()[0].GetSessionStatus())
})

t.Run("GetLatency", func(t *testing.T) {
Expand Down
Loading