Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send all peer updates in a separate go routine #3096

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/golang-test-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ jobs:
matrix:
arch: [ '386','amd64' ]
store: [ 'sqlite', 'postgres']
runs: ['1','2','3','4','5','6','7','8','9','10']
runs-on: ubuntu-22.04
steps:
- name: Install Go
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ We use open-source technologies like [WireGuard®](https://www.wireguard.com/),
### Legal
_WireGuard_ and the _WireGuard_ logo are [registered trademarks](https://www.wireguard.com/trademark-policy/) of Jason A. Donenfeld.


2 changes: 1 addition & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {

var dnsRouteFeatureFlag bool
if networkMap.PeerConfig != nil {
dnsRouteFeatureFlag = networkMap.PeerConfig.RoutingPeerDnsResolutionEnabled
dnsRouteFeatureFlag = false
}
routedDomains, routes := toRoutes(networkMap.GetRoutes())

Expand Down
499 changes: 243 additions & 256 deletions management/proto/management.pb.go

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions management/proto/management.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,6 @@ message PeerConfig {
SSHConfig sshConfig = 3;
// Peer fully qualified domain name
string fqdn = 4;

bool RoutingPeerDnsResolutionEnabled = 5;
}

// NetworkMap represents a network state of the peer with the corresponding configuration parameters to establish peer-to-peer connections
Expand Down
27 changes: 13 additions & 14 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,16 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
am.checkAndSchedulePeerLoginExpiration(ctx, account)
}

updateAccountPeers := false
if oldSettings.RoutingPeerDNSResolutionEnabled != newSettings.RoutingPeerDNSResolutionEnabled {
if newSettings.RoutingPeerDNSResolutionEnabled {
am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountRoutingPeerDNSResolutionEnabled, nil)
} else {
am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountRoutingPeerDNSResolutionDisabled, nil)
}
updateAccountPeers = true
account.Network.Serial++
}
// updateAccountPeers := false
// if oldSettings.RoutingPeerDNSResolutionEnabled != newSettings.RoutingPeerDNSResolutionEnabled {
// if newSettings.RoutingPeerDNSResolutionEnabled {
// am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountRoutingPeerDNSResolutionEnabled, nil)
// } else {
// am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountRoutingPeerDNSResolutionDisabled, nil)
// }
// updateAccountPeers = true
// account.Network.Serial++
// }

err = am.handleInactivityExpirationSettings(ctx, account, oldSettings, newSettings, userID, accountID)
if err != nil {
Expand All @@ -417,9 +417,9 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
return nil, err
}

if updateAccountPeers {
go am.UpdateAccountPeers(ctx, accountID)
}
// if updateAccountPeers {
// am.UpdateAccountPeers(ctx, accountID)
// }

return updatedAccount, nil
}
Expand Down Expand Up @@ -1807,7 +1807,6 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain string) *ty

PeerInactivityExpirationEnabled: false,
PeerInactivityExpiration: types.DefaultPeerInactivityExpiration,
RoutingPeerDNSResolutionEnabled: true,
},
}

Expand Down
17 changes: 8 additions & 9 deletions management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,9 @@ func toPeerConfig(peer *nbpeer.Peer, network *types.Network, dnsName string, dns
netmask, _ := network.Net.Mask.Size()
fqdn := peer.FQDN(dnsName)
return &proto.PeerConfig{
Address: fmt.Sprintf("%s/%d", peer.IP.String(), netmask), // take it from the network
SshConfig: &proto.SSHConfig{SshEnabled: peer.SSHEnabled},
Fqdn: fqdn,
RoutingPeerDnsResolutionEnabled: dnsResolutionOnRoutingPeerEnabled,
Address: fmt.Sprintf("%s/%d", peer.IP.String(), netmask), // take it from the network
SshConfig: &proto.SSHConfig{SshEnabled: peer.SSHEnabled},
Fqdn: fqdn,
}
}

Expand Down Expand Up @@ -686,12 +685,12 @@ func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, p
}
}

settings, err := s.settingsManager.GetSettings(ctx, peer.AccountID, peer.UserID)
if err != nil {
return status.Errorf(codes.Internal, "error handling request")
}
// settings, err := s.settingsManager.GetSettings(ctx, peer.AccountID, peer.UserID)
// if err != nil {
// return status.Errorf(codes.Internal, "error handling request")
// }

plainResp := toSyncResponse(ctx, s.config, peer, turnToken, relayToken, networkMap, s.accountManager.GetDNSDomain(), postureChecks, nil, settings.RoutingPeerDNSResolutionEnabled)
plainResp := toSyncResponse(ctx, s.config, peer, turnToken, relayToken, networkMap, s.accountManager.GetDNSDomain(), postureChecks, nil, false)

encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, plainResp)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions management/server/http/handlers/accounts/accounts_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (h *handler) updateAccount(w http.ResponseWriter, r *http.Request) {
if req.Settings.JwtAllowGroups != nil {
settings.JWTAllowGroups = *req.Settings.JwtAllowGroups
}
if req.Settings.RoutingPeerDnsResolutionEnabled != nil {
settings.RoutingPeerDNSResolutionEnabled = *req.Settings.RoutingPeerDnsResolutionEnabled
}
// if req.Settings.RoutingPeerDnsResolutionEnabled != nil {
// settings.RoutingPeerDNSResolutionEnabled = *req.Settings.RoutingPeerDnsResolutionEnabled
// }

updatedAccount, err := h.accountManager.UpdateAccountSettings(r.Context(), accountID, userID, settings)
if err != nil {
Expand Down Expand Up @@ -158,7 +158,6 @@ func toAccountResponse(accountID string, settings *types.Settings) *api.Account
JwtGroupsClaimName: &settings.JWTGroupsClaimName,
JwtAllowGroups: &jwtAllowGroups,
RegularUsersViewBlocked: settings.RegularUsersViewBlocked,
RoutingPeerDnsResolutionEnabled: &settings.RoutingPeerDNSResolutionEnabled,
}

if settings.Extra != nil {
Expand Down
76 changes: 28 additions & 48 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ func (am *DefaultAccountManager) updatePeerStatusAndLocation(ctx context.Context
}
peer.Status = newStatus

if am.geo != nil && realIP != nil {
location, err := am.geo.Lookup(realIP)
if err != nil {
log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
} else {
peer.Location.ConnectionIP = realIP
peer.Location.CountryCode = location.Country.ISOCode
peer.Location.CityName = location.City.Names.En
peer.Location.GeoNameID = location.City.GeonameID
err = am.Store.SavePeerLocation(account.Id, peer)
if err != nil {
log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err)
}
}
}
// if am.geo != nil && realIP != nil {
// location, err := am.geo.Lookup(realIP)
// if err != nil {
// log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
// } else {
// peer.Location.ConnectionIP = realIP
// peer.Location.CountryCode = location.Country.ISOCode
// peer.Location.CityName = location.City.Names.En
// peer.Location.GeoNameID = location.City.GeonameID
// err = am.Store.SavePeerLocation(account.Id, peer)
// if err != nil {
// log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err)
// }
// }
// }

account.UpdatePeer(peer)

Expand Down Expand Up @@ -423,13 +423,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
return nil, nil, nil, status.Errorf(status.NotFound, "failed adding new peer: account not found")
}

unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer func() {
if unlock != nil {
unlock()
}
}()

// This is a handling for the case when the same machine (with the same WireGuard pub key) tries to register twice.
// Such case is possible when AddPeer function takes long time to finish after AcquireWriteLockByUID (e.g., database is slow)
// and the peer disconnects with a timeout and tries to register again.
Expand Down Expand Up @@ -524,16 +517,16 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
opEvent.Meta["setup_key_name"] = setupKeyName
}

if am.geo != nil && newPeer.Location.ConnectionIP != nil {
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
if err != nil {
log.WithContext(ctx).Warnf("failed to get location for new peer realip: [%s]: %v", newPeer.Location.ConnectionIP.String(), err)
} else {
newPeer.Location.CountryCode = location.Country.ISOCode
newPeer.Location.CityName = location.City.Names.En
newPeer.Location.GeoNameID = location.City.GeonameID
}
}
// if am.geo != nil && newPeer.Location.ConnectionIP != nil {
// location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
// if err != nil {
// log.WithContext(ctx).Warnf("failed to get location for new peer realip: [%s]: %v", newPeer.Location.ConnectionIP.String(), err)
// } else {
// newPeer.Location.CountryCode = location.Country.ISOCode
// newPeer.Location.CityName = location.City.Names.En
// newPeer.Location.GeoNameID = location.City.GeonameID
// }
// }

settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil {
Expand Down Expand Up @@ -591,9 +584,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s

am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)

unlock()
unlock = nil

account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return nil, nil, nil, status.NewGetAccountError(err)
Expand Down Expand Up @@ -746,16 +736,8 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
if err != nil {
return nil, nil, nil, err
}
}

unlockAccount := am.Store.AcquireReadLockByUID(ctx, accountID)
defer unlockAccount()
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, login.WireGuardPubKey)
defer func() {
if unlockPeer != nil {
unlockPeer()
}
}()
}

peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, login.WireGuardPubKey)
if err != nil {
Expand Down Expand Up @@ -825,9 +807,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
}
}

unlockPeer()
unlockPeer = nil

account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -1007,6 +986,7 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID,
// UpdateAccountPeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers.
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {

account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to send out updates to peers: %v", err)
Expand Down Expand Up @@ -1055,7 +1035,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}

remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, am.GetDNSDomain(), postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled)
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, am.GetDNSDomain(), postureChecks, dnsCache, false)
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
}(peer)
}
Expand Down
2 changes: 0 additions & 2 deletions management/server/store/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ func restore(ctx context.Context, file string) (*FileStore, error) {

PeerInactivityExpirationEnabled: false,
PeerInactivityExpiration: types.DefaultPeerInactivityExpiration,

RoutingPeerDNSResolutionEnabled: true,
}
}

Expand Down
5 changes: 0 additions & 5 deletions management/server/types/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Settings struct {
// JWTAllowGroups list of groups to which users are allowed access
JWTAllowGroups []string `gorm:"serializer:json"`

// RoutingPeerDNSResolutionEnabled enabled the DNS resolution on the routing peers
RoutingPeerDNSResolutionEnabled bool

// Extra is a dictionary of Account settings
Extra *account.ExtraSettings `gorm:"embedded;embeddedPrefix:extra_"`
}
Expand All @@ -58,8 +55,6 @@ func (s *Settings) Copy() *Settings {

PeerInactivityExpirationEnabled: s.PeerInactivityExpirationEnabled,
PeerInactivityExpiration: s.PeerInactivityExpiration,

RoutingPeerDNSResolutionEnabled: s.RoutingPeerDNSResolutionEnabled,
}
if s.Extra != nil {
settings.Extra = s.Extra.Copy()
Expand Down