Skip to content

Commit

Permalink
[BPF] when host IP change kube-proxy replaces only Syncer
Browse files Browse the repository at this point in the history
Restarting the whole proxy is an overkill, only the syncer eed the set
of host IPs. Also the health check servers created by kube-proxy
frontend are not easily carried over and thus restarting kube-proxy
cannot restart the health checks.

As a side-effect, the "restart" logic in kube-proxy is much simpler as
we do no tneed to stop that much.
  • Loading branch information
tomastigera committed Dec 12, 2023
1 parent 02cfa12 commit d70043a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 42 deletions.
73 changes: 34 additions & 39 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// KubeProxy is a wrapper of Proxy that deals with higher level issue like
// configuration, restarting etc.
type KubeProxy struct {
proxy Proxy
proxy ProxyFrontend
syncer DPSyncer

ipFamily int
Expand Down Expand Up @@ -133,25 +133,40 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
return errors.WithMessage(err, "new bpf syncer")
}

proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
}
kp.proxy.SetSyncer(syncer)

log.Infof("kube-proxy v%d started, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)
log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)

kp.proxy = proxy
kp.syncer = syncer

return nil
}

func (kp *KubeProxy) start() error {
var withLocalNP []net.IP
if kp.ipFamily == 4 {
withLocalNP = append(withLocalNP, podNPIP)
} else {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}

proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
}

kp.proxy = proxy
kp.syncer = syncer

// wait for the initial update
hostIPs := <-kp.hostIPUpdates

err := kp.run(hostIPs)
err = kp.run(hostIPs)
if err != nil {
return err
}
Expand All @@ -160,39 +175,19 @@ func (kp *KubeProxy) start() error {
go func() {
defer kp.wg.Done()
for {
hostIPs, ok := <-kp.hostIPUpdates
if !ok {
defer log.Error("kube-proxy stopped since hostIPUpdates closed")
kp.proxy.Stop()
return
}

stopped := make(chan struct{})

go func() {
defer close(stopped)
defer log.Info("kube-proxy stopped to restart with updated host IPs")
kp.proxy.Stop()
}()

waitforstop:
for {
select {
case hostIPs, ok = <-kp.hostIPUpdates:
if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
}
case <-kp.exiting:
log.Info("kube-proxy: exiting")
select {
case hostIPs, ok := <-kp.hostIPUpdates:
if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
case <-stopped:
err = kp.run(hostIPs)
if err != nil {
log.Panic("kube-proxy failed to start after host IPs update")
}
break waitforstop
}
err = kp.run(hostIPs)
if err != nil {
log.Panic("kube-proxy failed to resync after host IPs update")
}
case <-kp.exiting:
log.Info("kube-proxy: exiting")
return
}
}
}()
Expand Down
34 changes: 33 additions & 1 deletion felix/bpf/proxy/kube-proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package proxy_test

import (
"fmt"
"net"
"net/http"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -41,16 +43,20 @@ var _ = Describe("BPF kube-proxy", func() {

var p *proxy.KubeProxy

healthCheckNodePort := 1212

BeforeEach(func() {
testSvc := &v1.Service{
TypeMeta: typeMetaV1("Service"),
ObjectMeta: objectMetaV1("testService"),
Spec: v1.ServiceSpec{
ClusterIP: "10.1.0.1",
Type: v1.ServiceTypeClusterIP,
Type: v1.ServiceTypeLoadBalancer,
Selector: map[string]string{
"app": "test",
},
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
HealthCheckNodePort: int32(healthCheckNodePort),
Ports: []v1.ServicePort{
{
Protocol: v1.ProtocolTCP,
Expand Down Expand Up @@ -104,6 +110,19 @@ var _ = Describe("BPF kube-proxy", func() {
}).Should(BeTrue())
})

By("checking that the healthCheckNodePort is accessible", func() {
Eventually(func() error {
result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort))
if err != nil {
return err
}
if result.StatusCode != 503 {
return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode)
}
return nil
}, "5s", "200ms").Should(Succeed())
})

By("checking nodeport has the updated IP and not the initial IP", func() {
updatedIP := net.IPv4(2, 2, 2, 2)
p.OnHostIPsUpdate([]net.IP{updatedIP})
Expand All @@ -124,6 +143,19 @@ var _ = Describe("BPF kube-proxy", func() {
}).Should(BeTrue())
})

By("checking that the healthCheckNodePort is still accessible", func() {
Eventually(func() error {
result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort))
if err != nil {
return err
}
if result.StatusCode != 503 {
return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode)
}
return nil
}, "5s", "200ms").Should(Succeed())
})

By("checking nodeport has 2 updated IPs", func() {
ip1 := net.IPv4(3, 3, 3, 3)
ip2 := net.IPv4(4, 4, 4, 4)
Expand Down
24 changes: 22 additions & 2 deletions felix/bpf/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type Proxy interface {
setIpFamily(int)
}

type ProxyFrontend interface {
Proxy
SetSyncer(DPSyncer)
}

// DPSyncerState groups the information passed to the DPSyncer's Apply
type DPSyncerState struct {
SvcMap k8sp.ServicePortMap
Expand Down Expand Up @@ -83,7 +88,8 @@ type proxy struct {
svcMap k8sp.ServicePortMap
epsMap k8sp.EndpointsMap

dpSyncer DPSyncer
dpSyncer DPSyncer
syncerLck sync.Mutex
// executes periodic the dataplane updates
runner *async.BoundedFrequencyRunner
// ensures that only one invocation runs at any time
Expand All @@ -110,7 +116,7 @@ type stoppableRunner interface {
}

// New returns a new Proxy for the given k8s interface
func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (Proxy, error) {
func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (ProxyFrontend, error) {

if k8s == nil {
return nil, errors.Errorf("no k8s client")
Expand Down Expand Up @@ -214,6 +220,9 @@ func (p *proxy) setIpFamily(ipFamily int) {
func (p *proxy) Stop() {
p.stopOnce.Do(func() {
log.Info("Proxy stopping")
// Pass empty update to close all the health checks.
_ = p.svcHealthServer.SyncServices(map[types.NamespacedName]uint16{})
_ = p.svcHealthServer.SyncEndpoints(map[types.NamespacedName]int{})
p.dpSyncer.Stop()
close(p.stopCh)
p.stopWg.Wait()
Expand Down Expand Up @@ -255,11 +264,13 @@ func (p *proxy) invokeDPSyncer() {
log.WithError(err).Error("Error syncing healthcheck endpoints")
}

p.syncerLck.Lock()
err := p.dpSyncer.Apply(DPSyncerState{
SvcMap: p.svcMap,
EpsMap: p.epsMap,
NodeZone: p.nodeZone,
})
p.syncerLck.Unlock()

if err != nil {
log.WithError(err).Errorf("applying changes failed")
Expand Down Expand Up @@ -314,6 +325,15 @@ func (p *proxy) OnEndpointSlicesSynced() {
p.forceSyncDP()
}

func (p *proxy) SetSyncer(s DPSyncer) {
p.syncerLck.Lock()
p.dpSyncer.Stop()
p.dpSyncer = s
p.syncerLck.Unlock()

p.forceSyncDP()
}

type initState struct {
lck sync.RWMutex
svcsSynced bool
Expand Down

0 comments on commit d70043a

Please sign in to comment.