Skip to content

Commit

Permalink
Skip peer update on unchanged network map (#2236)
Browse files Browse the repository at this point in the history
* Enhance network updates by skipping unchanged messages

Optimizes the network update process
by skipping updates where no changes in the peer update message received.

* Add unit tests

* add locks

* Improve concurrency and update peer message handling

* Refactor account manager network update tests

* fix test

* Fix inverted network map update condition

* Add default group and policy to test data

* Run peer updates in a separate goroutine

* Refactor

* Refactor lock
  • Loading branch information
bcmmbaga authored Jul 18, 2024
1 parent a711e11 commit f17016b
Show file tree
Hide file tree
Showing 9 changed files with 410 additions and 148 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
github.com/pion/transport/v3 v3.0.1
github.com/pion/turn/v3 v3.0.1
github.com/prometheus/client_golang v1.19.1
github.com/r3labs/diff v1.1.0
github.com/rs/xid v1.3.0
github.com/shirou/gopsutil/v3 v3.24.4
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek=
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
Expand Down
302 changes: 174 additions & 128 deletions management/server/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,70 +1108,70 @@ func TestAccountManager_AddPeerWithUserID(t *testing.T) {
assert.Equal(t, peer.IP.String(), fmt.Sprint(ev.Meta["ip"]))
}

func TestAccountManager_NetworkUpdates(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
return
}
func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
manager, account, peer1, peer2, peer3 := setupNetworkMapTest(t)

userID := "account_creator"
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)

account, err := createAccount(manager, "test_account", userID, "")
if err != nil {
t.Fatal(err)
group := group.Group{
ID: "group-id",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}

setupKey, err := manager.CreateSetupKey(context.Background(), account.Id, "test-key", SetupKeyReusable, time.Hour, nil, 999, userID, false)
if err != nil {
t.Fatal("error creating setup key")
return
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

if account.Network.Serial != 0 {
t.Errorf("expecting account network to have an initial Serial=0")
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
}
}()

if err := manager.SaveGroup(context.Background(), account.Id, userID, &group); err != nil {
t.Errorf("save group: %v", err)
return
}

getPeer := func() *nbpeer.Peer {
key, err := wgtypes.GeneratePrivateKey()
if err != nil {
t.Fatal(err)
return nil
}
expectedPeerKey := key.PublicKey().String()
wg.Wait()
}

peer, _, _, err := manager.AddPeer(context.Background(), setupKey.Key, "", &nbpeer.Peer{
Key: expectedPeerKey,
Meta: nbpeer.PeerSystemMeta{Hostname: expectedPeerKey},
})
if err != nil {
t.Fatalf("expecting peer1 to be added, got failure %v", err)
return nil
}
func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
manager, account, peer1, _, _ := setupNetworkMapTest(t)

return peer
}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)

peer1 := getPeer()
peer2 := getPeer()
peer3 := getPeer()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

account, err = manager.Store.GetAccount(context.Background(), account.Id)
if err != nil {
t.Fatal(err)
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}()

if err := manager.DeletePolicy(context.Background(), account.Id, account.Policies[0].ID, userID); err != nil {
t.Errorf("delete default rule: %v", err)
return
}

wg.Wait()
}

func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
manager, account, peer1, _, _ := setupNetworkMapTest(t)

updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)

group := group.Group{
ID: "group-id",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}

policy := Policy{
Enabled: true,
Rules: []*PolicyRule{
Expand All @@ -1186,107 +1186,110 @@ func TestAccountManager_NetworkUpdates(t *testing.T) {
}

wg := sync.WaitGroup{}
t.Run("save group update", func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()

message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
}
}()
wg.Add(1)
go func() {
defer wg.Done()

if err := manager.SaveGroup(context.Background(), account.Id, userID, &group); err != nil {
t.Errorf("save group: %v", err)
return
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
}
}()

wg.Wait()
})
if err := manager.SavePolicy(context.Background(), account.Id, userID, &policy); err != nil {
t.Errorf("save policy: %v", err)
return
}

t.Run("delete policy update", func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
wg.Wait()
}

message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}()
func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
manager, account, peer1, _, peer3 := setupNetworkMapTest(t)

if err := manager.DeletePolicy(context.Background(), account.Id, account.Policies[0].ID, userID); err != nil {
t.Errorf("delete default rule: %v", err)
return
}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)

wg.Wait()
})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

t.Run("save policy update", func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
}
}()

message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
}
}()
if err := manager.DeletePeer(context.Background(), account.Id, peer3.ID, userID); err != nil {
t.Errorf("delete peer: %v", err)
return
}

if err := manager.SavePolicy(context.Background(), account.Id, userID, &policy); err != nil {
t.Errorf("delete default rule: %v", err)
return
}
wg.Wait()
}

wg.Wait()
})
t.Run("delete peer update", func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()

message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
}
}()
func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
manager, account, peer1, peer2, peer3 := setupNetworkMapTest(t)

if err := manager.DeletePeer(context.Background(), account.Id, peer3.ID, userID); err != nil {
t.Errorf("delete peer: %v", err)
return
}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)

wg.Wait()
})
group := group.Group{
ID: "group-id",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}

policy := Policy{
Enabled: true,
Rules: []*PolicyRule{
{
Enabled: true,
Sources: []string{"group-id"},
Destinations: []string{"group-id"},
Bidirectional: true,
Action: PolicyTrafficActionAccept,
},
},
}

t.Run("delete group update", func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if err := manager.DeletePolicy(context.Background(), account.Id, account.Policies[0].ID, userID); err != nil {
t.Errorf("delete default rule: %v", err)
return
}

message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}()
if err := manager.SavePolicy(context.Background(), account.Id, userID, &policy); err != nil {
t.Errorf("save policy: %v", err)
return
}

// clean policy is pre requirement for delete group
_ = manager.DeletePolicy(context.Background(), account.Id, policy.ID, userID)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

if err := manager.DeleteGroup(context.Background(), account.Id, "", group.ID); err != nil {
t.Errorf("delete group: %v", err)
return
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}()

wg.Wait()
})
// clean policy is pre requirement for delete group
if err := manager.DeletePolicy(context.Background(), account.Id, policy.ID, userID); err != nil {
t.Errorf("delete default rule: %v", err)
return
}

if err := manager.DeleteGroup(context.Background(), account.Id, "", group.ID); err != nil {
t.Errorf("delete group: %v", err)
return
}

wg.Wait()
}

func TestAccountManager_DeletePeer(t *testing.T) {
Expand Down Expand Up @@ -2328,3 +2331,46 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
return true
}
}

func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *Account, *nbpeer.Peer, *nbpeer.Peer, *nbpeer.Peer) {
t.Helper()

manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}

account, err := createAccount(manager, "test_account", userID, "")
if err != nil {
t.Fatal(err)
}

setupKey, err := manager.CreateSetupKey(context.Background(), account.Id, "test-key", SetupKeyReusable, time.Hour, nil, 999, userID, false)
if err != nil {
t.Fatal("error creating setup key")
}

getPeer := func(manager *DefaultAccountManager, setupKey *SetupKey) *nbpeer.Peer {
key, err := wgtypes.GeneratePrivateKey()
if err != nil {
t.Fatal(err)
}
expectedPeerKey := key.PublicKey().String()

peer, _, _, err := manager.AddPeer(context.Background(), setupKey.Key, "", &nbpeer.Peer{
Key: expectedPeerKey,
Meta: nbpeer.PeerSystemMeta{Hostname: expectedPeerKey},
})
if err != nil {
t.Fatalf("expecting peer to be added, got failure %v", err)
}

return peer
}

peer1 := getPeer(manager, setupKey)
peer2 := getPeer(manager, setupKey)
peer3 := getPeer(manager, setupKey)

return manager, account, peer1, peer2, peer3
}
4 changes: 2 additions & 2 deletions management/server/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type Network struct {
Dns string
// Serial is an ID that increments by 1 when any change to the network happened (e.g. new peer has been added).
// Used to synchronize state to the client apps.
Serial uint64
Serial uint64 `diff:"-"`

mu sync.Mutex `json:"-" gorm:"-"`
mu sync.Mutex `json:"-" gorm:"-" diff:"-"`
}

// NewNetwork creates a new Network initializing it with a Serial=0
Expand Down
Loading

0 comments on commit f17016b

Please sign in to comment.