Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add routing peer support #441

Merged
merged 41 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b913bdb
Add routing peer support
mlsmaycon Aug 24, 2022
115728c
Align Route changes and status new method
mlsmaycon Aug 24, 2022
f2a2fc3
add get peer status test
mlsmaycon Aug 24, 2022
4e4fc8b
Rename methods and types with network
mlsmaycon Aug 24, 2022
3df1399
reorganize code and handle context done
mlsmaycon Aug 27, 2022
aba1ad5
handle errors and make consts and global vars private
mlsmaycon Aug 27, 2022
ffc01f2
handle errors
mlsmaycon Aug 27, 2022
9e249b8
handle iptables errors and document
mlsmaycon Aug 27, 2022
a86726d
unexport consts and types and further docs
mlsmaycon Aug 27, 2022
b090e7c
handle possible default route
mlsmaycon Aug 27, 2022
817cfba
Add status peer update notification
mlsmaycon Aug 27, 2022
a56ecc0
act on peers state changes
mlsmaycon Aug 27, 2022
e11fb07
add route manager to engine
mlsmaycon Aug 27, 2022
ccd6e39
fix lint and codacy comments
mlsmaycon Aug 27, 2022
50907f2
Merge branch 'main' into feature/routing-peers-support
mlsmaycon Aug 28, 2022
1abd480
Ensure we always call UpdateRoutes
mlsmaycon Aug 28, 2022
a45498b
init route manager
mlsmaycon Aug 29, 2022
b81ae21
use protoRoutes
mlsmaycon Aug 29, 2022
ee0abef
remove chosen route if removed route id matches
mlsmaycon Aug 29, 2022
435267a
ensure update events are done in the watch client networks method
mlsmaycon Aug 30, 2022
56517bb
refactor router manager client and server updates
mlsmaycon Aug 31, 2022
b11117d
add sendUpdateToClientNetworkWatcher
mlsmaycon Aug 31, 2022
3586248
Update Readme Network Routes feature naming
braginini Aug 31, 2022
4c01231
check peer state before sending update or removing allowed IPs
mlsmaycon Aug 31, 2022
556894c
update serial in the watcher
mlsmaycon Aug 31, 2022
aa21ac3
Merge remote-tracking branch 'origin/feature/routing-peers-support' i…
mlsmaycon Aug 31, 2022
8baff38
set chosen route nil when no route is chosen
mlsmaycon Aug 31, 2022
e1f4478
clean jump rules before removing chains
mlsmaycon Sep 1, 2022
d837de9
Merge branch 'main' into feature/routing-peers-support
mlsmaycon Sep 3, 2022
f4342ad
Add manager and iptables tests
mlsmaycon Sep 4, 2022
382f631
Add nftables tests
mlsmaycon Sep 4, 2022
1d3d31e
Check if routes exists and routing tests
mlsmaycon Sep 4, 2022
f601f27
remove test sleep
mlsmaycon Sep 4, 2022
869cac8
should test against default gateway interface
mlsmaycon Sep 4, 2022
baca81a
return RouteNotFound error
mlsmaycon Sep 4, 2022
08fcee4
Test if route exist using local gateway response
mlsmaycon Sep 4, 2022
a328adc
remove replace for go-netroute
mlsmaycon Sep 4, 2022
be59748
remove unused constant and just log UpdateRoutes call in engine
mlsmaycon Sep 4, 2022
994e0c0
use route manager interface and rename struct
mlsmaycon Sep 4, 2022
dc4ec2f
Adding route update test
mlsmaycon Sep 4, 2022
e94b217
fix lint notes
mlsmaycon Sep 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ NetBird creates an overlay peer-to-peer network connecting machines automaticall
- \[x] Remote SSH access without managing SSH keys.

**Coming soon:**
- \[ ] Router nodes
- \[ ] Network Routes.
- \[ ] Private DNS.
- \[ ] Mobile clients.
- \[ ] Network Activity Monitoring.
Expand Down
36 changes: 36 additions & 0 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package internal
import (
"context"
"fmt"
"github.com/netbirdio/netbird/client/internal/routemanager"
nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/route"
"math/rand"
"net"
"reflect"
Expand Down Expand Up @@ -99,6 +101,8 @@ type Engine struct {
sshServer nbssh.Server

statusRecorder *nbstatus.Status

routeManager *routemanager.Manager
}

// Peer is an instance of the Connection Peer
Expand Down Expand Up @@ -182,6 +186,10 @@ func (e *Engine) Stop() error {
}
}

if e.routeManager != nil {
e.routeManager.Stop()
}

log.Infof("stopped Netbird Engine")

return nil
Expand Down Expand Up @@ -232,6 +240,8 @@ func (e *Engine) Start() error {
return err
}

e.routeManager = routemanager.NewManager(e.ctx, e.config.WgPrivateKey.PublicKey().String(), e.wgInterface, e.statusRecorder)

e.receiveSignalEvents()
e.receiveManagementEvents()

Expand Down Expand Up @@ -619,11 +629,37 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
}
}
}
protoRoutes := networkMap.GetRoutes()
if protoRoutes == nil {
protoRoutes = []*mgmProto.Route{}
}
err := e.routeManager.UpdateRoutes(serial, toRoutes(protoRoutes))
if err != nil {
log.Errorf("failed to update routes, err: %v", err)
}

e.networkSerial = serial
return nil
}

func toRoutes(protoRoutes []*mgmProto.Route) []*route.Route {
routes := make([]*route.Route, 0)
for _, protoRoute := range protoRoutes {
_, prefix, _ := route.ParseNetwork(protoRoute.Network)
convertedRoute := &route.Route{
ID: protoRoute.ID,
Network: prefix,
NetID: protoRoute.NetID,
NetworkType: route.NetworkType(protoRoute.NetworkType),
Peer: protoRoute.Peer,
Metric: int(protoRoute.Metric),
Masquerade: protoRoute.Masquerade,
}
routes = append(routes, convertedRoute)
}
return routes
}

// addNewPeers adds peers that were not know before but arrived from the Management service with the update
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
for _, p := range peersUpdate {
Expand Down
2 changes: 2 additions & 0 deletions client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/iface"
Expand Down Expand Up @@ -196,6 +197,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
WgPort: 33100,
}, nbstatus.NewRecorder())
engine.wgInterface, err = iface.NewWGIFace("utun102", "100.64.0.1/24", iface.DefaultMTU)
engine.routeManager = routemanager.NewManager(ctx, key.PublicKey().String(), engine.wgInterface, engine.statusRecorder)
mlsmaycon marked this conversation as resolved.
Show resolved Hide resolved

type testCase struct {
name string
Expand Down
285 changes: 285 additions & 0 deletions client/internal/routemanager/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package routemanager

import (
"context"
"fmt"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/route"
log "github.com/sirupsen/logrus"
"net/netip"
)

type routerPeerStatus struct {
connected bool
relayed bool
direct bool
}

type routesUpdate struct {
updateSerial uint64
routes []*route.Route
}

type clientNetwork struct {
ctx context.Context
stop context.CancelFunc
statusRecorder *status.Status
wgInterface *iface.WGIface
routes map[string]*route.Route
routeUpdate chan routesUpdate
peerStateUpdate chan struct{}
routePeersNotifiers map[string]chan struct{}
chosenRoute *route.Route
network netip.Prefix
updateSerial uint64
}

func newClientNetworkWatcher(ctx context.Context, wgInterface *iface.WGIface, statusRecorder *status.Status, network netip.Prefix) *clientNetwork {
ctx, cancel := context.WithCancel(ctx)
client := &clientNetwork{
ctx: ctx,
stop: cancel,
statusRecorder: statusRecorder,
wgInterface: wgInterface,
routes: make(map[string]*route.Route),
routePeersNotifiers: make(map[string]chan struct{}),
routeUpdate: make(chan routesUpdate),
peerStateUpdate: make(chan struct{}),
network: network,
}
return client
}

func getClientNetworkID(input *route.Route) string {
return input.NetID + "-" + input.Network.String()
}

func (c *clientNetwork) getRouterPeerStatuses() map[string]routerPeerStatus {
routePeerStatuses := make(map[string]routerPeerStatus)
for _, r := range c.routes {
peerStatus, err := c.statusRecorder.GetPeer(r.Peer)
if err != nil {
log.Debugf("couldn't fetch peer state: %v", err)
continue
}
routePeerStatuses[r.ID] = routerPeerStatus{
connected: peerStatus.ConnStatus == peer.StatusConnected.String(),
relayed: peerStatus.Relayed,
direct: peerStatus.Direct,
}
}
return routePeerStatuses
}

func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[string]routerPeerStatus) string {
var chosen string
chosenScore := 0

currID := ""
if c.chosenRoute != nil {
currID = c.chosenRoute.ID
}

for _, r := range c.routes {
tempScore := 0
peerStatus, found := routePeerStatuses[r.ID]
if !found || !peerStatus.connected {
continue
}
if r.Metric < route.MaxMetric {
metricDiff := route.MaxMetric - r.Metric
tempScore = metricDiff * 10
}
if !peerStatus.relayed {
tempScore++
}
if !peerStatus.direct {
tempScore++
}
if tempScore > chosenScore || (tempScore == chosenScore && currID == r.ID) {
chosen = r.ID
chosenScore = tempScore
}
}

if chosen == "" {
var peers []string
for _, r := range c.routes {
peers = append(peers, r.Peer)
}
log.Warnf("no route was chosen for network %s because no peers from list %s were connected", c.network, peers)
} else if chosen != currID {
log.Infof("new chosen route is %s with peer %s with score %d", chosen, c.routes[chosen].Peer, chosenScore)
}

return chosen
}

func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-closer:
return
case <-c.statusRecorder.GetPeerStateChangeNotifier(peerKey):
state, err := c.statusRecorder.GetPeer(peerKey)
if err != nil || state.ConnStatus == peer.StatusConnecting.String() {
continue
}
peerStateUpdate <- struct{}{}
log.Debugf("triggered route state update for Peer %s, state: %s", peerKey, state.ConnStatus)
}
}
}

func (c *clientNetwork) startPeersStatusChangeWatcher() {
for _, r := range c.routes {
_, found := c.routePeersNotifiers[r.Peer]
if !found {
c.routePeersNotifiers[r.Peer] = make(chan struct{})
go c.watchPeerStatusChanges(c.ctx, r.Peer, c.peerStateUpdate, c.routePeersNotifiers[r.Peer])
}
}
}

func (c *clientNetwork) removeRouteFromWireguardPeer(peerKey string) error {
state, err := c.statusRecorder.GetPeer(peerKey)
if err != nil || state.ConnStatus != peer.StatusConnected.String() {
return nil
}

err = c.wgInterface.RemoveAllowedIP(peerKey, c.network.String())
if err != nil {
return fmt.Errorf("couldn't remove allowed IP %s removed for peer %s, err: %v",
c.network, c.chosenRoute.Peer, err)
}
return nil
}

func (c *clientNetwork) removeRouteFromPeerAndSystem() error {
if c.chosenRoute != nil {
err := c.removeRouteFromWireguardPeer(c.chosenRoute.Peer)
if err != nil {
return err
}
err = removeFromRouteTableIfNonSystem(c.network, c.wgInterface.GetAddress().IP.String())
if err != nil {
return fmt.Errorf("couldn't remove route %s from system, err: %v",
c.network, err)
}
}
return nil
}

func (c *clientNetwork) recalculateRouteAndUpdatePeerAndSystem() error {

var err error

routerPeerStatuses := c.getRouterPeerStatuses()

chosen := c.getBestRouteFromStatuses(routerPeerStatuses)
if chosen == "" {
err = c.removeRouteFromPeerAndSystem()
if err != nil {
return err
}

c.chosenRoute = nil

return nil
}

if c.chosenRoute != nil && c.chosenRoute.ID == chosen {
if c.chosenRoute.IsEqual(c.routes[chosen]) {
return nil
}
}

if c.chosenRoute != nil {
err = c.removeRouteFromWireguardPeer(c.chosenRoute.Peer)
if err != nil {
return err
}
} else {
err = addToRouteTableIfNoExists(c.network, c.wgInterface.GetAddress().IP.String())
if err != nil {
return fmt.Errorf("route %s couldn't be added for peer %s, err: %v",
c.chosenRoute.Network.String(), c.wgInterface.GetAddress().IP.String(), err)
}
}

c.chosenRoute = c.routes[chosen]
err = c.wgInterface.AddAllowedIP(c.chosenRoute.Peer, c.network.String())
if err != nil {
log.Errorf("couldn't add allowed IP %s added for peer %s, err: %v",
c.network, c.chosenRoute.Peer, err)
}

return nil
}

func (c *clientNetwork) sendUpdateToClientNetworkWatcher(update routesUpdate) {
mlsmaycon marked this conversation as resolved.
Show resolved Hide resolved
go func() {
c.routeUpdate <- update
}()
}

func (c *clientNetwork) handleUpdate(update routesUpdate) {
updateMap := make(map[string]*route.Route)

for _, r := range update.routes {
updateMap[r.ID] = r
}

for id, r := range c.routes {
_, found := updateMap[id]
if !found {
close(c.routePeersNotifiers[r.Peer])
delete(c.routePeersNotifiers, r.Peer)
}
}

c.routes = updateMap
}

// peersStateAndUpdateWatcher is the main point of reacting on client network routing events.
// All the processing related to the client network should be done here. Thread-safe.
func (c *clientNetwork) peersStateAndUpdateWatcher() {
mlsmaycon marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-c.ctx.Done():
log.Debugf("stopping watcher for network %s", c.network)
err := c.removeRouteFromPeerAndSystem()
if err != nil {
log.Error(err)
}
return
case <-c.peerStateUpdate:
err := c.recalculateRouteAndUpdatePeerAndSystem()
if err != nil {
log.Error(err)
}
case update := <-c.routeUpdate:
if update.updateSerial < c.updateSerial {
log.Warnf("received a routes update with smaller serial number, ignoring it")
continue
}

log.Debugf("received a new client network route update for %s", c.network)

c.handleUpdate(update)

c.updateSerial = update.updateSerial

err := c.recalculateRouteAndUpdatePeerAndSystem()
if err != nil {
log.Error(err)
}

c.startPeersStatusChangeWatcher()
}
}
}
Loading