From 5a4a393563ab9f447225e18b2be379a66c5a52fe Mon Sep 17 00:00:00 2001 From: bcmmbaga Date: Fri, 12 Jul 2024 10:56:44 +0300 Subject: [PATCH] Improve concurrency and update peer message handling --- management/server/updatechannel.go | 18 +++++++------- management/server/updatechannel_test.go | 33 ++++++++----------------- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 12441e1c8de..bb759881b87 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -58,6 +58,8 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda } p.channelsMux.Lock() + p.peerUpdateMessage[peerID] = update + defer func() { p.channelsMux.Unlock() if p.metrics != nil { @@ -65,6 +67,11 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda } }() + if p.peerUpdateMessage[peerID] != nil && p.peerUpdateMessage[peerID].NetworkMap.Network.Serial < update.Update.NetworkMap.GetSerial() { + log.WithContext(ctx).Debugf("peer %s network map serial not changed, skip sending update", peerID) + return + } + if channel, ok := p.peerChannels[peerID]; ok { found = true select { @@ -192,11 +199,10 @@ func (p *PeersUpdateManager) HasChannel(peerID string) bool { // handlePeerMessageUpdate checks if the update message for a peer is new and should be sent. func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID string, update *UpdateMessage) bool { - p.channelsMux.Lock() - defer p.channelsMux.Unlock() + p.channelsMux.RLock() + defer p.channelsMux.RUnlock() previousUpdateMsg := p.peerUpdateMessage[peerID] - if previousUpdateMsg != nil { updated, err := isNewPeerUpdateMessage(previousUpdateMsg, update) if err != nil { @@ -209,12 +215,6 @@ func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID } } - if p.peerUpdateMessage[peerID] != nil && p.peerUpdateMessage[peerID].NetworkMap.Network.Serial < update.Update.NetworkMap.GetSerial() { - log.WithContext(ctx).Debugf("peer %s network map serial not changed, skip sending update", peerID) - return false - } - p.peerUpdateMessage[peerID] = update - return true } diff --git a/management/server/updatechannel_test.go b/management/server/updatechannel_test.go index 1f7f6d97a70..420d611026e 100644 --- a/management/server/updatechannel_test.go +++ b/management/server/updatechannel_test.go @@ -82,12 +82,11 @@ func TestCloseChannel(t *testing.T) { func TestHandlePeerMessageUpdate(t *testing.T) { tests := []struct { - name string - peerID string - existingUpdate *UpdateMessage - newUpdate *UpdateMessage - expectedResult bool - expectedUpdateSave bool + name string + peerID string + existingUpdate *UpdateMessage + newUpdate *UpdateMessage + expectedResult bool }{ { name: "update message with turn credentials update", @@ -97,8 +96,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { WiretrusteeConfig: &proto.WiretrusteeConfig{}, }, }, - expectedResult: true, - expectedUpdateSave: false, + expectedResult: true, }, { name: "update message for peer without existing update", @@ -109,8 +107,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { }, NetworkMap: &NetworkMap{Network: &Network{Serial: 1}}, }, - expectedResult: true, - expectedUpdateSave: true, + expectedResult: true, }, { name: "update message with no changes in update", @@ -129,8 +126,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { NetworkMap: &NetworkMap{Network: &Network{Serial: 1}}, Checks: []*posture.Checks{}, }, - expectedResult: false, - expectedUpdateSave: false, + expectedResult: false, }, { name: "update message with changes in checks", @@ -149,8 +145,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { NetworkMap: &NetworkMap{Network: &Network{Serial: 1}}, Checks: []*posture.Checks{{ID: "check1"}}, }, - expectedResult: true, - expectedUpdateSave: true, + expectedResult: true, }, { name: "update message with lower serial number", @@ -167,8 +162,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { }, NetworkMap: &NetworkMap{Network: &Network{Serial: 1}}, }, - expectedResult: false, - expectedUpdateSave: false, + expectedResult: false, }, } @@ -182,14 +176,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) { } result := p.handlePeerMessageUpdate(ctx, tt.peerID, tt.newUpdate) - assert.Equal(t, tt.expectedResult, result) - - if tt.expectedUpdateSave { - assert.Equal(t, tt.newUpdate, p.peerUpdateMessage[tt.peerID]) - } else if tt.existingUpdate != nil { - assert.Equal(t, tt.existingUpdate, p.peerUpdateMessage[tt.peerID]) - } }) } }