Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip peer update on unchanged network map #2236

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
github.com/pion/transport/v3 v3.0.1
github.com/pion/turn/v3 v3.0.1
github.com/prometheus/client_golang v1.19.1
github.com/r3labs/diff v1.1.0
github.com/rs/xid v1.3.0
github.com/shirou/gopsutil/v3 v3.24.4
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek=
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
Expand Down
4 changes: 2 additions & 2 deletions management/server/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type Network struct {
Dns string
// Serial is an ID that increments by 1 when any change to the network happened (e.g. new peer has been added).
// Used to synchronize state to the client apps.
Serial uint64
Serial uint64 `diff:"-"`

mu sync.Mutex `json:"-" gorm:"-"`
mu sync.Mutex `json:"-" gorm:"-" diff:"-"`
}

// NewNetwork creates a new Network initializing it with a Serial=0
Expand Down
4 changes: 3 additions & 1 deletion management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ func (am *DefaultAccountManager) deletePeers(ctx context.Context, account *Accou
FirewallRulesIsEmpty: true,
},
},
NetworkMap: &NetworkMap{},
Checks: []*posture.Checks{},
})
am.peersUpdateManager.CloseChannel(ctx, peer.ID)
am.StoreEvent(ctx, userID, peer.ID, account.Id, activity.PeerRemovedByUser, peer.EventMeta(am.GetDNSDomain()))
Expand Down Expand Up @@ -932,6 +934,6 @@ func (am *DefaultAccountManager) updateAccountPeers(ctx context.Context, account
postureChecks := am.getPeerPostureChecks(account, peer)
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, peer.ID, am.dnsDomain, approvedPeersMap)
update := toSyncResponse(ctx, nil, peer, nil, remotePeerNetworkMap, am.GetDNSDomain(), postureChecks)
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update})
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap, Checks: postureChecks})
}
}
18 changes: 9 additions & 9 deletions management/server/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,35 @@ type Peer struct {
// WireGuard public key
Key string `gorm:"index"`
// A setup key this peer was registered with
SetupKey string
SetupKey string `diff:"-"`
// IP address of the Peer
IP net.IP `gorm:"serializer:json"`
// Meta is a Peer system meta data
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_" diff:"-"`
// Name is peer's name (machine name)
Name string
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
// domain to the peer label. e.g. peer-dns-label.netbird.cloud
DNSLabel string
// Status peer's management connection status
Status *PeerStatus `gorm:"embedded;embeddedPrefix:peer_status_"`
Status *PeerStatus `gorm:"embedded;embeddedPrefix:peer_status_" diff:"-"`
// The user ID that registered the peer
UserID string
UserID string `diff:"-"`
// SSHKey is a public SSH key of the peer
SSHKey string
// SSHEnabled indicates whether SSH server is enabled on the peer
SSHEnabled bool
// LoginExpirationEnabled indicates whether peer's login expiration is enabled and once expired the peer has to re-login.
// Works with LastLogin
LoginExpirationEnabled bool
LoginExpirationEnabled bool `diff:"-"`
// LastLogin the time when peer performed last login operation
LastLogin time.Time
LastLogin time.Time `diff:"-"`
// CreatedAt records the time the peer was created
CreatedAt time.Time
CreatedAt time.Time `diff:"-"`
// Indicate ephemeral peer attribute
Ephemeral bool
Ephemeral bool `diff:"-"`
// Geo location based on connection IP
Location Location `gorm:"embedded;embeddedPrefix:location_"`
Location Location `gorm:"embedded;embeddedPrefix:location_" diff:"-"`
}

type PeerStatus struct { //nolint:revive
Expand Down
78 changes: 73 additions & 5 deletions management/server/updatechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package server

import (
"context"
"fmt"
"sync"
"time"

"github.com/netbirdio/netbird/management/server/posture"
"github.com/r3labs/diff"
log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/management/proto"
Expand All @@ -14,24 +17,29 @@ import (
const channelBufferSize = 100

type UpdateMessage struct {
Update *proto.SyncResponse
Update *proto.SyncResponse
NetworkMap *NetworkMap
Checks []*posture.Checks
}

type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
// peerNetworkMaps is the UpdateMessage indexed by Peer.ID.
peerUpdateMessage map[string]*UpdateMessage
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.Mutex
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
metrics telemetry.AppMetrics
}

// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
channelsMux: &sync.Mutex{},
metrics: metrics,
peerChannels: make(map[string]chan *UpdateMessage),
peerUpdateMessage: make(map[string]*UpdateMessage),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
}

Expand All @@ -40,6 +48,15 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
start := time.Now()
var found, dropped bool

// skip sending sync update to the peer if there is no change in update message,
// it will not check on turn credential refresh as we do not send network map or client posture checks
if update.NetworkMap != nil {
updated := p.handlePeerMessageUpdate(ctx, peerID, update)
if !updated {
return
}
}

p.channelsMux.Lock()
defer func() {
p.channelsMux.Unlock()
Expand Down Expand Up @@ -80,6 +97,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
closed = true
delete(p.peerChannels, peerID)
close(channel)
delete(p.peerUpdateMessage, peerID)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
Expand All @@ -94,6 +112,7 @@ func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
delete(p.peerUpdateMessage, peerID)
}

log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
Expand Down Expand Up @@ -170,3 +189,52 @@ func (p *PeersUpdateManager) HasChannel(peerID string) bool {

return ok
}

// handlePeerMessageUpdate checks if the update message for a peer is new and should be sent.
func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID string, update *UpdateMessage) bool {
p.channelsMux.Lock()
defer p.channelsMux.Unlock()

previousUpdateMsg := p.peerUpdateMessage[peerID]

if previousUpdateMsg != nil {
updated, err := isNewPeerUpdateMessage(previousUpdateMsg, update)
if err != nil {
log.WithContext(ctx).Errorf("error checking for SyncResponse updates: %v", err)
return false
}
if !updated {
log.WithContext(ctx).Debugf("peer %s network map is not updated, skip sending update", peerID)
return false
}
}

if p.peerUpdateMessage[peerID] != nil && p.peerUpdateMessage[peerID].NetworkMap.Network.Serial < update.Update.NetworkMap.GetSerial() {
log.WithContext(ctx).Debugf("peer %s network map serial not changed, skip sending update", peerID)
return false
}
p.peerUpdateMessage[peerID] = update

return true
}

// isNewPeerUpdateMessage checks if there are any changes between the previous and current UpdateMessage.
func isNewPeerUpdateMessage(prevResponse, currResponse *UpdateMessage) (bool, error) {
changelog, err := diff.Diff(prevResponse.Checks, currResponse.Checks)
if err != nil {
return false, fmt.Errorf("failed to diff checks: %v", err)
}
if len(changelog) > 0 {
return true, nil
}

changelog, err = diff.Diff(prevResponse.NetworkMap, currResponse.NetworkMap)
if err != nil {
return false, fmt.Errorf("failed to diff network map: %v", err)
}
if len(changelog) > 0 {
return true, nil
}

return false, nil
}
116 changes: 116 additions & 0 deletions management/server/updatechannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/posture"
"github.com/stretchr/testify/assert"
)

// var peersUpdater *PeersUpdateManager
Expand Down Expand Up @@ -77,3 +79,117 @@ func TestCloseChannel(t *testing.T) {
t.Error("Error closing the channel")
}
}

func TestHandlePeerMessageUpdate(t *testing.T) {
tests := []struct {
name string
peerID string
existingUpdate *UpdateMessage
newUpdate *UpdateMessage
expectedResult bool
expectedUpdateSave bool
}{
{
name: "update message with turn credentials update",
peerID: "peer",
newUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
WiretrusteeConfig: &proto.WiretrusteeConfig{},
},
},
expectedResult: true,
expectedUpdateSave: false,
},
{
name: "update message for peer without existing update",
peerID: "peer1",
newUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
},
expectedResult: true,
expectedUpdateSave: true,
},
{
name: "update message with no changes in update",
peerID: "peer2",
existingUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{},
},
newUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{},
},
expectedResult: false,
expectedUpdateSave: false,
},
{
name: "update message with changes in checks",
peerID: "peer3",
existingUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{},
},
newUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{{ID: "check1"}},
},
expectedResult: true,
expectedUpdateSave: true,
},
{
name: "update message with lower serial number",
peerID: "peer4",
existingUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 2},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 2}},
},
newUpdate: &UpdateMessage{
Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{Serial: 1},
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
},
expectedResult: false,
expectedUpdateSave: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPeersUpdateManager(nil)
ctx := context.Background()

if tt.existingUpdate != nil {
p.peerUpdateMessage[tt.peerID] = tt.existingUpdate
}

result := p.handlePeerMessageUpdate(ctx, tt.peerID, tt.newUpdate)

assert.Equal(t, tt.expectedResult, result)

if tt.expectedUpdateSave {
assert.Equal(t, tt.newUpdate, p.peerUpdateMessage[tt.peerID])
} else if tt.existingUpdate != nil {
assert.Equal(t, tt.existingUpdate, p.peerUpdateMessage[tt.peerID])
}
})
}
}
Loading