Skip to content

Commit

Permalink
Improve concurrency and update peer message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bcmmbaga committed Jul 12, 2024
1 parent 0a69f8a commit 5a4a393
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
18 changes: 9 additions & 9 deletions management/server/updatechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
}

p.channelsMux.Lock()
p.peerUpdateMessage[peerID] = update

defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
}
}()

if p.peerUpdateMessage[peerID] != nil && p.peerUpdateMessage[peerID].NetworkMap.Network.Serial < update.Update.NetworkMap.GetSerial() {
log.WithContext(ctx).Debugf("peer %s network map serial not changed, skip sending update", peerID)
return
}

if channel, ok := p.peerChannels[peerID]; ok {
found = true
select {
Expand Down Expand Up @@ -192,11 +199,10 @@ func (p *PeersUpdateManager) HasChannel(peerID string) bool {

// handlePeerMessageUpdate checks if the update message for a peer is new and should be sent.
func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID string, update *UpdateMessage) bool {
p.channelsMux.Lock()
defer p.channelsMux.Unlock()
p.channelsMux.RLock()
defer p.channelsMux.RUnlock()

previousUpdateMsg := p.peerUpdateMessage[peerID]

if previousUpdateMsg != nil {
updated, err := isNewPeerUpdateMessage(previousUpdateMsg, update)
if err != nil {
Expand All @@ -209,12 +215,6 @@ func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID
}
}

if p.peerUpdateMessage[peerID] != nil && p.peerUpdateMessage[peerID].NetworkMap.Network.Serial < update.Update.NetworkMap.GetSerial() {
log.WithContext(ctx).Debugf("peer %s network map serial not changed, skip sending update", peerID)
return false
}
p.peerUpdateMessage[peerID] = update

return true
}

Expand Down
33 changes: 10 additions & 23 deletions management/server/updatechannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ func TestCloseChannel(t *testing.T) {

func TestHandlePeerMessageUpdate(t *testing.T) {
tests := []struct {
name string
peerID string
existingUpdate *UpdateMessage
newUpdate *UpdateMessage
expectedResult bool
expectedUpdateSave bool
name string
peerID string
existingUpdate *UpdateMessage
newUpdate *UpdateMessage
expectedResult bool
}{
{
name: "update message with turn credentials update",
Expand All @@ -97,8 +96,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
WiretrusteeConfig: &proto.WiretrusteeConfig{},
},
},
expectedResult: true,
expectedUpdateSave: false,
expectedResult: true,
},
{
name: "update message for peer without existing update",
Expand All @@ -109,8 +107,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
},
expectedResult: true,
expectedUpdateSave: true,
expectedResult: true,
},
{
name: "update message with no changes in update",
Expand All @@ -129,8 +126,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{},
},
expectedResult: false,
expectedUpdateSave: false,
expectedResult: false,
},
{
name: "update message with changes in checks",
Expand All @@ -149,8 +145,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
Checks: []*posture.Checks{{ID: "check1"}},
},
expectedResult: true,
expectedUpdateSave: true,
expectedResult: true,
},
{
name: "update message with lower serial number",
Expand All @@ -167,8 +162,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
},
NetworkMap: &NetworkMap{Network: &Network{Serial: 1}},
},
expectedResult: false,
expectedUpdateSave: false,
expectedResult: false,
},
}

Expand All @@ -182,14 +176,7 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
}

result := p.handlePeerMessageUpdate(ctx, tt.peerID, tt.newUpdate)

assert.Equal(t, tt.expectedResult, result)

if tt.expectedUpdateSave {
assert.Equal(t, tt.newUpdate, p.peerUpdateMessage[tt.peerID])
} else if tt.existingUpdate != nil {
assert.Equal(t, tt.existingUpdate, p.peerUpdateMessage[tt.peerID])
}
})
}
}

0 comments on commit 5a4a393

Please sign in to comment.