Skip to content

Commit

Permalink
nftables_netlink
Browse files Browse the repository at this point in the history
  • Loading branch information
aojea committed Jan 5, 2025
1 parent 037df9a commit 7fcb127
Showing 1 changed file with 63 additions and 60 deletions.
123 changes: 63 additions & 60 deletions pkg/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package networkpolicy
import (
"context"
"fmt"
"net/netip"
"os"
"time"

nfqueue "github.com/florianl/go-nfqueue"
"github.com/google/nftables"

Check failure on line 11 in pkg/networkpolicy/controller.go

View workflow job for this annotation

GitHub Actions / test (1.23.x)

no required module provides package github.com/google/nftables; to add it:
"github.com/mdlayher/netlink"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -103,15 +105,9 @@ func NewController(client clientset.Interface,
if err != nil {
return nil, err
}
klog.V(2).Info("Initializing nftables")
nft, err := knftables.New(knftables.InetFamily, config.NFTableName)
if err != nil {
return nil, err
}

return newController(
client,
nft,
networkpolicyInformer,
namespaceInformer,
podInformer,
Expand All @@ -124,7 +120,6 @@ func NewController(client clientset.Interface,
}

func newController(client clientset.Interface,
nft knftables.Interface,
networkpolicyInformer networkinginformers.NetworkPolicyInformer,
namespaceInformer coreinformers.NamespaceInformer,
podInformer coreinformers.PodInformer,
Expand All @@ -144,7 +139,6 @@ func newController(client clientset.Interface,
c := &Controller{
client: client,
config: config,
nft: nft,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: controllerName},
Expand Down Expand Up @@ -347,7 +341,6 @@ type Controller struct {
// if an error or not found it returns nil
getPodAssignedToIP func(podIP string) *v1.Pod

nft knftables.Interface // install the necessary nftables rules
nfq *nfqueue.Nfqueue
flushed bool
}
Expand Down Expand Up @@ -673,37 +666,33 @@ func (c *Controller) handleErr(err error, key string) {
// and check if network policies must apply.
// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset.
func (c *Controller) syncNFTablesRules(ctx context.Context) error {
table := &knftables.Table{
Comment: knftables.PtrTo("rules for kubernetes NetworkPolicy"),
nft, err := nftables.New()
if err != nil {
return fmt.Errorf("fastpath failure, can not start nftables:%v", err)
}
tx := c.nft.NewTransaction()
// do it once to delete the existing table
if !c.flushed {
tx.Add(table)
tx.Delete(table)
c.flushed = true
// add + delete + add for flushing all the table
table := &nftables.Table{
Name: c.config.NFTableName,
Family: nftables.TableFamilyINet,
}
tx.Add(table)

nft.AddTable(table)
nft.DelTable(table)
nft.AddTable(table)

// only if no admin network policies are used
if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy {
// add set with Local Pod IPs impacted by network policies
tx.Add(&knftables.Set{
v4Set := &nftables.Set{
Table: table,
Name: podV4IPsSet,
Type: "ipv4_addr",
Comment: ptr.To("Local V4 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV4IPsSet,
})
tx.Add(&knftables.Set{
KeyType: nftables.TypeIPAddr,
}
v6Set := &nftables.Set{
Table: table,
Name: podV6IPsSet,
Type: "ipv6_addr",
Comment: ptr.To("Local V6 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV6IPsSet,
})
KeyType: nftables.TypeIP6Addr,
}

networkPolicies, err := c.networkpolicyLister.List(labels.Everything())
if err != nil {
Expand All @@ -724,33 +713,45 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error {
}
}

var elementsV4, elementsV6 []nftables.SetElement
for _, ip := range podV4IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV4IPsSet,
Key: []string{ip},
addr, err := netip.ParseAddr(ip)
if err != nil {
continue
}
elementsV4 = append(elementsV4, nftables.SetElement{
Key: addr.AsSlice(),
})
}
for _, ip := range podV6IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV6IPsSet,
Key: []string{ip},
addr, err := netip.ParseAddr(ip)
if err != nil {
continue
}
elementsV6 = append(elementsV6, nftables.SetElement{
Key: addr.AsSlice(),
})
}

if err := nft.SetAddElements(v4Set, elementsV4); err != nil {
return fmt.Errorf("failed to add Set %s : %v", v4Set.Name, err)
}
if err := nft.SetAddElements(v6Set, elementsV6); err != nil {
return fmt.Errorf("failed to add Set %s : %v", v6Set.Name, err)
}
}

// Process the packets that are, usually on the FORWARD hook, but
// IPVS packets follow a different path in netfilter, so we process
// everything in the POSTROUTING hook before SNAT happens.
// Ref: https://github.com/kubernetes-sigs/kube-network-policies/issues/46
hook := knftables.PostroutingHook
chainName := string(hook)
tx.Add(&knftables.Chain{
Name: chainName,
Type: knftables.PtrTo(knftables.FilterType),
Hook: knftables.PtrTo(hook),
Priority: knftables.PtrTo(knftables.SNATPriority + "-5"),
})
tx.Flush(&knftables.Chain{
Name: chainName,
chain := nft.AddChain(&nftables.Chain{
Name: "postrouting",
Table: table,
Type: nftables.ChainTypeFilter,
Hooknum: nftables.ChainHookPostrouting,
// ChainPriorityNATSource *ChainPriority = ChainPriorityRef(100)
Priority: nftables.ChainPriorityRef(95),
})

// DNS is processed by addDNSRacersWorkaroundRules()
Expand Down Expand Up @@ -910,19 +911,21 @@ func (c *Controller) addDNSRacersWorkaroundRules(ctx context.Context, tx *knftab
}

func (c *Controller) cleanNFTablesRules(ctx context.Context) {
tx := c.nft.NewTransaction()
nft, err := nftables.New()
if err != nil {
klog.Infof("network policies cleanup failure, can not start nftables:%v", err)
return
}
// Add+Delete is idempotent and won't return an error if the table doesn't already
// exist.
tx.Add(&knftables.Table{})
tx.Delete(&knftables.Table{})

// When this function is called, the ctx is likely cancelled. So
// we only use it for logging, and create a context with timeout
// for nft.Run. There is a grace period of 5s in main, so we keep
// this timeout shorter
nctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
if err := c.nft.Run(nctx, tx); err != nil {
klog.FromContext(ctx).Error(err, "deleting nftables rules")
table := &nftables.Table{
Name: c.config.NFTableName,
Family: nftables.TableFamilyINet,
}
nft.DelTable(table)

err = nft.Flush()
if err != nil {
klog.Infof("error deleting nftables rules %v", err)
}
}

0 comments on commit 7fcb127

Please sign in to comment.