diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 92276e8d0fb..dba15380bd0 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -30,7 +30,7 @@ import ( // KubeProxy is a wrapper of Proxy that deals with higher level issue like // configuration, restarting etc. type KubeProxy struct { - proxy Proxy + proxy ProxyFrontend syncer DPSyncer ipFamily int @@ -133,25 +133,42 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error { return errors.WithMessage(err, "new bpf syncer") } - proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) - if err != nil { - return errors.WithMessage(err, "new proxy") - } + kp.proxy.SetSyncer(syncer) - log.Infof("kube-proxy v%d started, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) + log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) - kp.proxy = proxy kp.syncer = syncer return nil } func (kp *KubeProxy) start() error { + var withLocalNP []net.IP + if kp.ipFamily == 4 { + withLocalNP = append(withLocalNP, podNPIP) + } else { + withLocalNP = append(withLocalNP, podNPIPV6) + } + + syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt) + if err != nil { + return errors.WithMessage(err, "new bpf syncer") + } + + proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) + if err != nil { + return errors.WithMessage(err, "new proxy") + } + + kp.lock.Lock() + kp.proxy = proxy + kp.syncer = syncer + kp.lock.Unlock() // wait for the initial update hostIPs := <-kp.hostIPUpdates - err := kp.run(hostIPs) + err = kp.run(hostIPs) if err != nil { return err } @@ -160,39 +177,19 @@ func (kp *KubeProxy) start() error { go func() { defer kp.wg.Done() for { - hostIPs, ok := <-kp.hostIPUpdates - if !ok { - defer log.Error("kube-proxy stopped since hostIPUpdates closed") - kp.proxy.Stop() - return - } - - stopped := make(chan struct{}) - - go func() { - defer close(stopped) - defer log.Info("kube-proxy stopped to restart with updated host IPs") - kp.proxy.Stop() - }() - - waitforstop: - for { - select { - case hostIPs, ok = <-kp.hostIPUpdates: - if !ok { - log.Error("kube-proxy: hostIPUpdates closed") - return - } - case <-kp.exiting: - log.Info("kube-proxy: exiting") + select { + case hostIPs, ok := <-kp.hostIPUpdates: + if !ok { + log.Error("kube-proxy: hostIPUpdates closed") return - case <-stopped: - err = kp.run(hostIPs) - if err != nil { - log.Panic("kube-proxy failed to start after host IPs update") - } - break waitforstop } + err = kp.run(hostIPs) + if err != nil { + log.Panic("kube-proxy failed to resync after host IPs update") + } + case <-kp.exiting: + log.Info("kube-proxy: exiting") + return } } }() diff --git a/felix/bpf/proxy/kube-proxy_test.go b/felix/bpf/proxy/kube-proxy_test.go index 263818d46e1..3361f2c5c7d 100644 --- a/felix/bpf/proxy/kube-proxy_test.go +++ b/felix/bpf/proxy/kube-proxy_test.go @@ -15,7 +15,9 @@ package proxy_test import ( + "fmt" "net" + "net/http" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -41,16 +43,20 @@ var _ = Describe("BPF kube-proxy", func() { var p *proxy.KubeProxy + healthCheckNodePort := 1212 + BeforeEach(func() { testSvc := &v1.Service{ TypeMeta: typeMetaV1("Service"), ObjectMeta: objectMetaV1("testService"), Spec: v1.ServiceSpec{ ClusterIP: "10.1.0.1", - Type: v1.ServiceTypeClusterIP, + Type: v1.ServiceTypeLoadBalancer, Selector: map[string]string{ "app": "test", }, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + HealthCheckNodePort: int32(healthCheckNodePort), Ports: []v1.ServicePort{ { Protocol: v1.ProtocolTCP, @@ -104,6 +110,19 @@ var _ = Describe("BPF kube-proxy", func() { }).Should(BeTrue()) }) + By("checking that the healthCheckNodePort is accessible", func() { + Eventually(func() error { + result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort)) + if err != nil { + return err + } + if result.StatusCode != 503 { + return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode) + } + return nil + }, "5s", "200ms").Should(Succeed()) + }) + By("checking nodeport has the updated IP and not the initial IP", func() { updatedIP := net.IPv4(2, 2, 2, 2) p.OnHostIPsUpdate([]net.IP{updatedIP}) @@ -124,6 +143,19 @@ var _ = Describe("BPF kube-proxy", func() { }).Should(BeTrue()) }) + By("checking that the healthCheckNodePort is still accessible", func() { + Eventually(func() error { + result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort)) + if err != nil { + return err + } + if result.StatusCode != 503 { + return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode) + } + return nil + }, "5s", "200ms").Should(Succeed()) + }) + By("checking nodeport has 2 updated IPs", func() { ip1 := net.IPv4(3, 3, 3, 3) ip2 := net.IPv4(4, 4, 4, 4) diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 32603753ecf..6f28a06e9e7 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -51,6 +51,11 @@ type Proxy interface { setIpFamily(int) } +type ProxyFrontend interface { + Proxy + SetSyncer(DPSyncer) +} + // DPSyncerState groups the information passed to the DPSyncer's Apply type DPSyncerState struct { SvcMap k8sp.ServicePortMap @@ -83,7 +88,8 @@ type proxy struct { svcMap k8sp.ServicePortMap epsMap k8sp.EndpointsMap - dpSyncer DPSyncer + dpSyncer DPSyncer + syncerLck sync.Mutex // executes periodic the dataplane updates runner *async.BoundedFrequencyRunner // ensures that only one invocation runs at any time @@ -110,7 +116,7 @@ type stoppableRunner interface { } // New returns a new Proxy for the given k8s interface -func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (Proxy, error) { +func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (ProxyFrontend, error) { if k8s == nil { return nil, errors.Errorf("no k8s client") @@ -214,6 +220,9 @@ func (p *proxy) setIpFamily(ipFamily int) { func (p *proxy) Stop() { p.stopOnce.Do(func() { log.Info("Proxy stopping") + // Pass empty update to close all the health checks. + _ = p.svcHealthServer.SyncServices(map[types.NamespacedName]uint16{}) + _ = p.svcHealthServer.SyncEndpoints(map[types.NamespacedName]int{}) p.dpSyncer.Stop() close(p.stopCh) p.stopWg.Wait() @@ -255,11 +264,13 @@ func (p *proxy) invokeDPSyncer() { log.WithError(err).Error("Error syncing healthcheck endpoints") } + p.syncerLck.Lock() err := p.dpSyncer.Apply(DPSyncerState{ SvcMap: p.svcMap, EpsMap: p.epsMap, NodeZone: p.nodeZone, }) + p.syncerLck.Unlock() if err != nil { log.WithError(err).Errorf("applying changes failed") @@ -314,6 +325,15 @@ func (p *proxy) OnEndpointSlicesSynced() { p.forceSyncDP() } +func (p *proxy) SetSyncer(s DPSyncer) { + p.syncerLck.Lock() + p.dpSyncer.Stop() + p.dpSyncer = s + p.syncerLck.Unlock() + + p.forceSyncDP() +} + type initState struct { lck sync.RWMutex svcsSynced bool