Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ All notable changes to this project will be documented in this file.
- feat(smartcontract): add use_onchain_deallocation flag to MulticastGroup ([#2748](https://github.com/malbeclabs/doublezero/pull/2748))
- CLI
- Remove restriction for a single tunnel per user; now a user can have a unicast and multicast tunnel concurrently (but can only be a publisher _or_ a subscriber) ([2728](https://github.com/malbeclabs/doublezero/pull/2728))
- Device agents
- Reduce config agent network and CPU usage by checking config checksums every 5 seconds, and reducing full config check frquency to 1m

## [v0.8.3](https://github.com/malbeclabs/doublezero/compare/client/v0.8.2...client/v0.8.3) – 2026-01-22

Expand Down
95 changes: 68 additions & 27 deletions controlplane/agent/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import (
"context"
"crypto/sha256"
"encoding/hex"
"flag"
"fmt"
"log"
Expand All @@ -20,52 +22,51 @@
)

var (
localDevicePubkey = flag.String("pubkey", "frtyt4WKYudUpqTsvJzwN6Bd4btYxrkaYNhBNAaUVGWn", "This device's public key on the doublezero network")
controllerAddress = flag.String("controller", "18.116.166.35:7000", "The DoubleZero controller IP address and port to connect to")
device = flag.String("device", "127.0.0.1:9543", "IP Address and port of the Arist EOS API. Should always be the local switch at 127.0.0.1:9543.")
sleepIntervalInSeconds = flag.Float64("sleep-interval-in-seconds", 5, "How long to sleep in between polls")
controllerTimeoutInSeconds = flag.Float64("controller-timeout-in-seconds", 30, "How long to wait for a response from the controller before giving up")
maxLockAge = flag.Int("max-lock-age-in-seconds", 3600, "If agent detects a config lock that older than the specified age, it will force unlock.")
verbose = flag.Bool("verbose", false, "Enable verbose logging")
showVersion = flag.Bool("version", false, "Print the version of the doublezero-agent and exit")
metricsEnable = flag.Bool("metrics-enable", false, "Enable prometheus metrics")
metricsAddr = flag.String("metrics-addr", ":8080", "Address to listen on for prometheus metrics")
localDevicePubkey = flag.String("pubkey", "frtyt4WKYudUpqTsvJzwN6Bd4btYxrkaYNhBNAaUVGWn", "This device's public key on the doublezero network")

Check failure on line 25 in controlplane/agent/cmd/agent/main.go

View workflow job for this annotation

GitHub Actions / go-lint

File is not properly formatted (gofmt)
controllerAddress = flag.String("controller", "18.116.166.35:7000", "The DoubleZero controller IP address and port to connect to")
device = flag.String("device", "127.0.0.1:9543", "IP Address and port of the Arist EOS API. Should always be the local switch at 127.0.0.1:9543.")
sleepIntervalInSeconds = flag.Float64("sleep-interval-in-seconds", 5, "How long to sleep in between polls")
controllerTimeoutInSeconds = flag.Float64("controller-timeout-in-seconds", 30, "How long to wait for a response from the controller before giving up")
configCacheTimeoutInSeconds = flag.Int("config-cache-timeout-in-seconds", 60, "Force full config fetch after this many seconds, even if hash unchanged")
maxLockAge = flag.Int("max-lock-age-in-seconds", 3600, "If agent detects a config lock that older than the specified age, it will force unlock.")
verbose = flag.Bool("verbose", false, "Enable verbose logging")
showVersion = flag.Bool("version", false, "Print the version of the doublezero-agent and exit")
metricsEnable = flag.Bool("metrics-enable", false, "Enable prometheus metrics")
metricsAddr = flag.String("metrics-addr", ":8080", "Address to listen on for prometheus metrics")

// set by LDFLAGS
version = "dev"
commit = "none"
date = "unknown"
)

func pollControllerAndConfigureDevice(ctx context.Context, dzclient pb.ControllerClient, eapiClient *arista.EAPIClient, pubkey string, verbose *bool, maxLockAge int, agentVersion string, agentCommit string, agentDate string) error {
var err error

// The dz controller needs to know what BGP sessions we have configured locally
var neighborIpMap map[string][]string
neighborIpMap, err = eapiClient.GetBgpNeighbors(ctx)
if err != nil {
log.Println("pollControllerAndConfigureDevice: eapiClient.GetBgpNeighbors returned error:", err)
agent.ErrorsBgpNeighbors.Inc()
}
func computeChecksum(data string) string {
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:])
}

var configText string
func fetchConfigFromController(ctx context.Context, dzclient pb.ControllerClient, pubkey string, neighborIpMap map[string][]string, verbose *bool, agentVersion string, agentCommit string, agentDate string) (configText string, configHash string, err error) {
configText, err = agent.GetConfigFromServer(ctx, dzclient, pubkey, neighborIpMap, controllerTimeoutInSeconds, agentVersion, agentCommit, agentDate)
if err != nil {
log.Printf("pollControllerAndConfigureDevice failed to call agent.GetConfigFromServer: %q", err)
log.Printf("fetchConfigFromController failed to call agent.GetConfigFromServer: %q", err)
agent.ErrorsGetConfig.Inc()
return err
return "", "", err
}

if *verbose {
log.Printf("controller returned the following config: '%s'", configText)
}

configHash = computeChecksum(configText)
return configText, configHash, nil
}

func applyConfig(ctx context.Context, eapiClient *arista.EAPIClient, configText string, maxLockAge int) error {
if configText == "" {
// Controller returned empty config
return nil
}

_, err = eapiClient.AddConfigToDevice(ctx, configText, nil, maxLockAge) // 3rd arg (diffCmd) is only used for testing
_, err := eapiClient.AddConfigToDevice(ctx, configText, nil, maxLockAge)
if err != nil {
agent.ErrorsApplyConfig.Inc()
return err
Expand Down Expand Up @@ -121,15 +122,55 @@
client := aristapb.NewEapiMgrServiceClient(clientConn)
eapiClient = arista.NewEAPIClient(slog.Default(), client)

var cachedConfigHash string
var configCacheTime time.Time
configCacheTimeout := time.Duration(*configCacheTimeoutInSeconds) * time.Second

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := pollControllerAndConfigureDevice(ctx, dzclient, eapiClient, *localDevicePubkey, verbose, *maxLockAge, version, commit, date)
neighborIpMap, err := eapiClient.GetBgpNeighbors(ctx)
if err != nil {
log.Println("ERROR: eapiClient.GetBgpNeighbors returned", err)
agent.ErrorsBgpNeighbors.Inc()
}

shouldFetchAndApply := false

if cachedConfigHash == "" {
shouldFetchAndApply = true
} else if time.Since(configCacheTime) >= configCacheTimeout {
shouldFetchAndApply = true
} else {
hash, err := agent.GetConfigHashFromServer(ctx, dzclient, *localDevicePubkey, neighborIpMap, controllerTimeoutInSeconds, version, commit, date)
if err != nil {
log.Println("ERROR: GetConfigHashFromServer returned", err)
continue
}
if hash != cachedConfigHash {
shouldFetchAndApply = true
}
}

if !shouldFetchAndApply {
continue
}

configText, configHash, err := fetchConfigFromController(ctx, dzclient, *localDevicePubkey, neighborIpMap, verbose, version, commit, date)
if err != nil {
log.Println("ERROR: fetchConfigFromController returned", err)
continue
}

err = applyConfig(ctx, eapiClient, configText, *maxLockAge)
if err != nil {
log.Println("ERROR: pollAndConfigureDevice returned", err)
log.Println("ERROR: applyConfig returned", err)
continue
}
cachedConfigHash = configHash
configCacheTime = time.Now()
}
}
}
22 changes: 22 additions & 0 deletions controlplane/agent/internal/agent/dzclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,28 @@ func GetConfigFromServer(ctx context.Context, client pb.ControllerClient, localD
return config, nil
}

func GetConfigHashFromServer(ctx context.Context, client pb.ControllerClient, localDevicePubkey string, neighborIpMap map[string][]string, controllerTimeoutInSeconds *float64, agentVersion string, agentCommit string, agentDate string) (hash string, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(*controllerTimeoutInSeconds*float64(time.Second)))
defer cancel()

var bgpPeers []string
bgpPeersByVrf := make(map[string]*pb.BgpPeers)
for vrf, peers := range neighborIpMap {
bgpPeersByVrf[vrf] = &pb.BgpPeers{Peers: peers}
bgpPeers = append(bgpPeers, peers...)
}
slices.Sort(bgpPeers)

req := &pb.ConfigRequest{Pubkey: localDevicePubkey, BgpPeers: bgpPeers, BgpPeersByVrf: bgpPeersByVrf, AgentVersion: &agentVersion, AgentCommit: &agentCommit, AgentDate: &agentDate}
resp, err := client.GetConfigHash(ctx, req)
if err != nil {
log.Printf("Error calling GetConfigHash: %v\n", err)
return "", err
}

return resp.GetHash(), nil
}

func GetDzClient(controllerAddressAndPort string) (pb.ControllerClient, error) {
conn, err := grpc.NewClient(controllerAddressAndPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
log.Printf("controllerAddressAndPort %s\n", controllerAddressAndPort)
Expand Down
86 changes: 86 additions & 0 deletions controlplane/controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,92 @@

The controller generates device configurations from Solana smart contract state and serves them to agents running on network devices via gRPC.

## Architecture

### Agent-Controller Communication Flow

The controller provides two gRPC endpoints, GetConfig and GetConfigHash, that agents use to detect and apply configuration changes. Here's how the agent uses the endpoints.

```
┌─────────┐ ┌────────────┐ ┌────────────┐ ┌─────────┐
│ Agent │ │ Controller │ │ Controller │ │ EOS │
│ main() │ │GetConfigHash │ Config │ │ Device │
│ │ │ GetConfig()│ │ Generator │ │ │
└────┬────┘ └─────┬──────┘ └─────┬──────┘ └────┬────┘
│ │ │ │
│ Every 5s: │ │ │
│ │ │ │
│ GetBgpNeighbors() │ │ │
├─────────────────────────────────────────────────────────────────────────────────────────►│
│◄─────────────────────────────────────────────────────────────────────────────────────────┤
│ [peer IPs] │ │ │
│ │ │ │
│ Decision: should fetch? │ │ │
│ • First run (no hash)? │ │ │
│ • 1m since last apply? │ │ │
│ • Hash changed? │ │ │
│ │ │ │
│ GetConfigHashFromServer() │ │ │
├───────────────────────────►│ │ │
│ │ processConfigRequest() │ │
│ ├─────────────────────────────►│ │
│ │ │ generateConfig() │
│ │ │ • deduplicateTunnels() │
│ │ │ • renderConfig() │
│ │ │ SHA256(config) │
│ │◄─────────────────────────────┤ │
│ │ [hash only] │ │
│◄───────────────────────────┤ │ │
│ ConfigHashResponse │ │ │
│ {hash: "abc123..."} │ │ │
│ (64 bytes) │ │ │
│ │ │ │
│ Compare: hash != lastHash? │ │ │
│ │ │ │
├─── if YES (or first run or 5m timeout): │
│ │ │ │
│ fetchConfigFromController() │ │
│ ├─► GetConfigFromServer() │ │
│ │ ──────────────────► │ │ │
│ │ │ processConfigRequest() │ │
│ │ ├─────────────────────────────►│ │
│ │ │ │ generateConfig() │
│ │ │ │ • deduplicateTunnels() │
│ │ │ │ • renderConfig() │
│ │ │ │ (entire config text) │
│ │ │◄─────────────────────────────┤ │
│ │ ◄──────────────────│ [config string] │ │
│ │ ConfigResponse │ │ │
│ │ {config: "..."} │ │ │
│ │ │ │ │
│ ├─► computeChecksum(config) │ │
│ │ [local SHA256] │ │ │
│ │ │ │ │
│ └─► return config+hash │ │ │
│ │ │ │
│ applyConfig() │ │ │
│ └─► AddConfigToDevice(config) │ │
│ ─────────────────────────────────────────────────────────────────────────────────►│
│ ◄─────────────────────────────────────────────────────────────────────────────────┤
│ [config applied] │ │ │
│ │ │ │
│ lastChecksum = hash │ │ │
│ lastApplyTime = now │ │ │
│ │ │ │
├─── else: skip this cycle (hash unchanged, no work needed) | │
│ │ │ │
│ sleep(5s) │ │ │
│ goto top │ │ │
│ │ │ │
```

**Key Benefits:**
- **Network**: 64 bytes vs ~50KB on most cycles (99%+ reduction when config unchanged)
- **CPU**: Config generation still happens on controller (for hash), but EOS device skips apply
- **Safety**: Full config check every 5 minutes (300s) as fallback
- **Responsiveness**: Still checks for changes every 5 seconds
- **Decision points**: First run, 5m timeout, or hash mismatch triggers full fetch

## Configuration

### ClickHouse Integration
Expand Down
8 changes: 8 additions & 0 deletions controlplane/controller/internal/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ var (
[]string{"pubkey", "device_code", "contributor_code", "exchange_code", "location_code", "device_status", "agent_version", "agent_commit", "agent_date"},
)

getConfigHashOps = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_grpc_getconfighash_requests_total",
Help: "The total number of getconfighash requests",
},
[]string{"pubkey", "device_code", "contributor_code", "exchange_code", "location_code", "device_status", "agent_version", "agent_commit", "agent_date"},
)

getConfigMsgSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "controller_grpc_getconfig_msg_size_bytes",
Help: "The size of GetConfig response messages in bytes",
Expand Down Expand Up @@ -101,6 +108,7 @@ func init() {
prometheus.MustRegister(getConfigRenderErrors)
prometheus.MustRegister(duplicateTunnelPairs)
prometheus.MustRegister(getConfigOps)
prometheus.MustRegister(getConfigHashOps)
prometheus.MustRegister(getConfigMsgSize)
prometheus.MustRegister(getConfigDuration)

Expand Down
Loading
Loading