Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[management] Add more logs to the peer update processes #2881

Merged
merged 2 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,7 +2129,7 @@ func (am *DefaultAccountManager) syncJWTGroups(ctx context.Context, accountID st
if settings.GroupsPropagationEnabled {
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return fmt.Errorf("error getting account: %w", err)
return status.NewGetAccountError(err)
}

if areGroupChangesAffectPeers(account, addNewGroups) || areGroupChangesAffectPeers(account, removeOldGroups) {
Expand Down Expand Up @@ -2290,12 +2290,12 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID

account, err := am.Store.GetAccount(ctx, accountID)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, status.NewGetAccountError(err)
}

peer, netMap, postureChecks, err := am.SyncPeer(ctx, PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, account)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
}

err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, account)
Expand All @@ -2314,7 +2314,7 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account

account, err := am.Store.GetAccount(ctx, accountID)
if err != nil {
return err
return status.NewGetAccountError(err)
}

err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account)
Expand Down
9 changes: 8 additions & 1 deletion management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi

peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
return mapError(ctx, err)
}

Expand Down Expand Up @@ -207,6 +208,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi

// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for {
select {
// condition when there are some updates
Expand Down Expand Up @@ -260,10 +262,15 @@ func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, accountID string, p
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
defer unlock()

_ = s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key)
err := s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key)
if err != nil {
log.WithContext(ctx).Errorf("failed to disconnect peer %s properly: %v", peer.Key, err)
}
s.peersUpdateManager.CloseChannel(ctx, peer.ID)
s.secretsManager.CancelRefresh(peer.ID)
s.ephemeralManager.OnPeerDisconnected(ctx, peer)

log.WithContext(ctx).Tracef("peer %s has been disconnected", peer.Key)
}

func (s *GRPCServer) validateToken(ctx context.Context, jwtToken string) (string, error) {
Expand Down
18 changes: 10 additions & 8 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,16 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, account *Account) error {
peer, err := account.FindPeerByPubKey(peerPubKey)
if err != nil {
return err
return fmt.Errorf("failed to find peer by pub key: %w", err)
}

expired, err := am.updatePeerStatusAndLocation(ctx, peer, connected, realIP, account)
if err != nil {
return err
return fmt.Errorf("failed to update peer status and location: %w", err)
}

log.WithContext(ctx).Debugf("mark peer %s connected: %t", peer.ID, connected)

if peer.AddedWithSSOLogin() {
if peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled {
am.checkAndSchedulePeerLoginExpiration(ctx, account)
Expand Down Expand Up @@ -168,7 +170,7 @@ func (am *DefaultAccountManager) updatePeerStatusAndLocation(ctx context.Context

err := am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus)
if err != nil {
return false, err
return false, fmt.Errorf("failed to save peer status: %w", err)
}

return oldStatus.LoginExpired, nil
Expand Down Expand Up @@ -587,7 +589,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s

account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return nil, nil, nil, fmt.Errorf("error getting account: %w", err)
return nil, nil, nil, status.NewGetAccountError(err)
}

allGroup, err := account.GetGroupAll()
Expand Down Expand Up @@ -640,7 +642,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
if peer.UserID != "" {
user, err := account.FindUser(peer.UserID)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("failed to get user: %w", err)
}

err = checkIfPeerOwnerIsBlocked(peer, user)
Expand All @@ -657,7 +659,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
if updated {
err = am.Store.SavePeer(ctx, account.Id, peer)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("failed to save peer: %w", err)
}

if sync.UpdateAccountPeers {
Expand All @@ -667,7 +669,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac

peerNotValid, isStatusChanged, err := am.integratedPeerValidator.IsNotValidPeer(ctx, account.Id, peer, account.GetPeerGroupsList(peer.ID), account.Settings.Extra)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("failed to validate peer: %w", err)
}

var postureChecks []*posture.Checks
Expand All @@ -685,7 +687,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac

validPeersMap, err := am.GetValidatedPeers(account)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("failed to get validated peers: %w", err)
}
postureChecks = am.getPeerPostureChecks(account, peer)

Expand Down
5 changes: 5 additions & 0 deletions management/server/status/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,8 @@ func NewStoreContextCanceledError(duration time.Duration) error {
func NewInvalidKeyIDError() error {
return Errorf(InvalidArgument, "invalid key ID")
}

// NewGetAccountError creates a new Error with Internal type for an issue getting account
func NewGetAccountError(err error) error {
return Errorf(Internal, "error getting account: %s", err)
}
5 changes: 4 additions & 1 deletion management/server/updatechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)

log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return
}

log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
log.WithContext(ctx).Debugf("closing updates channel: peer %s has no channel", peerID)
}

// CloseChannels closes updates channel for each given peer
Expand Down
12 changes: 10 additions & 2 deletions management/server/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"time"

"github.com/google/uuid"
log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/management/server/activity"
nbContext "github.com/netbirdio/netbird/management/server/context"
nbgroup "github.com/netbirdio/netbird/management/server/group"
"github.com/netbirdio/netbird/management/server/idp"
"github.com/netbirdio/netbird/management/server/integration_reference"
"github.com/netbirdio/netbird/management/server/jwtclaims"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/status"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -1105,15 +1107,21 @@ func (am *DefaultAccountManager) GetUsersFromAccount(ctx context.Context, accoun
func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, account *Account, peers []*nbpeer.Peer) error {
var peerIDs []string
for _, peer := range peers {
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peer.Key)

if peer.Status.LoginExpired {
continue
}
peerIDs = append(peerIDs, peer.ID)
peer.MarkLoginExpired(true)
account.UpdatePeer(peer)
if err := am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status); err != nil {
return err
return fmt.Errorf("failed saving peer status for peer %s: %s", peer.ID, err)
}

log.WithContext(ctx).Tracef("mark peer %s login expired", peer.ID)

am.StoreEvent(
ctx,
peer.UserID, peer.ID, account.Id,
Expand Down
Loading