diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 71896837a62..e140f3c0790 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -18,7 +18,7 @@ featureGates: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. -{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "CleanupStaleUDPSvcConntrack" "default" false) }} +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "CleanupStaleUDPSvcConntrack" "default" true) }} # Enable traceflow which provides packet tracing feature to diagnose network issue. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Traceflow" "default" true) }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 73dcb502d4a..9655b66b5da 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3605,7 +3605,7 @@ data: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. - # CleanupStaleUDPSvcConntrack: false + # CleanupStaleUDPSvcConntrack: true # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4975,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec + checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a labels: app: antrea component: antrea-agent @@ -5213,7 +5213,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec + checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index d0435aef30f..d60cc445413 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3605,7 +3605,7 @@ data: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. - # CleanupStaleUDPSvcConntrack: false + # CleanupStaleUDPSvcConntrack: true # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4975,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec + checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a labels: app: antrea component: antrea-agent @@ -5214,7 +5214,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec + checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 8401b0a94b0..8fb55a482f9 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3605,7 +3605,7 @@ data: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. - # CleanupStaleUDPSvcConntrack: false + # CleanupStaleUDPSvcConntrack: true # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4975,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce + checksum/config: 5299e6235e262daf606758cf900766470fcb8dd21a0d707a3ae284548bd8c2b2 labels: app: antrea component: antrea-agent @@ -5211,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce + checksum/config: 5299e6235e262daf606758cf900766470fcb8dd21a0d707a3ae284548bd8c2b2 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 91964dd82a3..c843452c63c 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3618,7 +3618,7 @@ data: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. - # CleanupStaleUDPSvcConntrack: false + # CleanupStaleUDPSvcConntrack: true # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4988,7 +4988,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 + checksum/config: ba93df141f512a1f8483114b5994444c7231b298e7e9133483ddc1f4210ec395 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5270,7 +5270,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 + checksum/config: ba93df141f512a1f8483114b5994444c7231b298e7e9133483ddc1f4210ec395 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 53b32787a9b..60e8df147d4 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3605,7 +3605,7 @@ data: # Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. This requires AntreaProxy to # be enabled, otherwise this flag will not take effect. - # CleanupStaleUDPSvcConntrack: false + # CleanupStaleUDPSvcConntrack: true # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -4975,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 + checksum/config: aca23e21519e0fc112647f23d3ce6f92a3dea0bc7ebf1c6d7a7eed2dbe80f0a3 labels: app: antrea component: antrea-agent @@ -5211,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 + checksum/config: aca23e21519e0fc112647f23d3ce6f92a3dea0bc7ebf1c6d7a7eed2dbe80f0a3 labels: app: antrea component: antrea-controller diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 20de5037fe7..287d9dacc58 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -35,7 +35,7 @@ edit the Agent configuration in the | `AntreaProxy` | Agent | `true` | GA | v0.8 | v0.11 | v1.14 | Yes | Must be enabled for Windows. | | `EndpointSlice` | Agent | `true` | GA | v0.13.0 | v1.11 | v1.14 | Yes | | | `TopologyAwareHints` | Agent | `true` | Beta | v1.8 | v1.12 | N/A | Yes | | -| `CleanupStaleUDPSvcConntrack` | Agent | `false` | Alpha | v1.13 | N/A | N/A | Yes | | +| `CleanupStaleUDPSvcConntrack` | Agent | `true` | Beta | v1.13 | v2.1 | N/A | Yes | | | `LoadBalancerModeDSR` | Agent | `false` | Alpha | v1.13 | N/A | N/A | Yes | | | `AntreaPolicy` | Agent + Controller | `true` | Beta | v0.8 | v1.0 | N/A | No | Agent side config required from v0.9.0+. | | `Traceflow` | Agent + Controller | `true` | Beta | v0.8 | v0.11 | N/A | Yes | | diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 8778b94842e..72f346fdc89 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -570,7 +570,7 @@ func testLoadBalancerAdd(t *testing.T, options = append(options, withoutEndpointSlice) } bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 nodePortAddresses := nodePortAddressesIPv4 svcIP := svc1IPv4 externalIP := externalIPv4 @@ -580,7 +580,7 @@ func testLoadBalancerAdd(t *testing.T, loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 nodePortAddresses = nodePortAddressesIPv6 svcIP = svc1IPv6 externalIP = externalIPv6 @@ -651,7 +651,7 @@ func testLoadBalancerAdd(t *testing.T, ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: nodeLocalExternal, @@ -710,7 +710,7 @@ func testLoadBalancerAdd(t *testing.T, ClusterGroupID: clusterGroupID, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: nodeLocalExternal, @@ -770,7 +770,7 @@ func testNodePortAdd(t *testing.T, options = append(options, withoutEndpointSlice) } bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 nodePortAddresses := nodePortAddressesIPv4 svcIP := svc1IPv4 externalIP := externalIPv4 @@ -778,7 +778,7 @@ func testNodePortAdd(t *testing.T, ep2IP := ep2IPv4 if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 nodePortAddresses = nodePortAddressesIPv6 svcIP = svc1IPv6 externalIP = externalIPv6 @@ -844,7 +844,7 @@ func testNodePortAdd(t *testing.T, ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: nodeLocalExternal, @@ -885,7 +885,7 @@ func testNodePortAdd(t *testing.T, ClusterGroupID: clusterGroupID, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: nodeLocalExternal, @@ -1535,9 +1535,9 @@ func testClusterIPRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } if needClearConntrackEntries(bindingProtocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol).Times(1) } } fp.syncProxyRules() @@ -1570,14 +1570,14 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - vIP := agentconfig.VirtualNodePortDNATIPv4 + virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4 svcNodePortIP := svcNodePortIPv4 nodePortAddresses := nodePortAddressesIPv4 svcIP := svc1IPv4 externalIP := externalIPv4 epIP := ep1IPv4 if isIPv6 { - vIP = agentconfig.VirtualNodePortDNATIPv6 + virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6 svcNodePortIP = svcNodePortIPv6 nodePortAddresses = nodePortAddressesIPv6 svcIP = svc1IPv6 @@ -1620,7 +1620,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIP, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: true, @@ -1645,7 +1645,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { @@ -1653,11 +1653,11 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } if needClearConntrackEntries(bindingProtocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(virtualNodePortDNATIP, uint16(svcNodePort), nil, bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol).Times(1) } } fp.syncProxyRules() @@ -1688,7 +1688,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) } - vIP := agentconfig.VirtualNodePortDNATIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 svcNodePortIP := svcNodePortIPv4 nodePortAddresses := nodePortAddressesIPv4 svcIP := svc1IPv4 @@ -1697,7 +1697,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP loadBalancerIP := loadBalancerIPv4 loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 if isIPv6 { - vIP = agentconfig.VirtualNodePortDNATIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 svcNodePortIP = svcNodePortIPv6 nodePortAddresses = nodePortAddressesIPv6 svcIP = svc1IPv6 @@ -1747,7 +1747,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, TrafficPolicyLocal: true, @@ -1782,7 +1782,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -1792,12 +1792,12 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } if needClearConntrackEntries(bindingProtocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(virtualNodePortDNATIPv4, uint16(svcNodePort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol).Times(1) } } fp.syncProxyRules() @@ -1962,26 +1962,29 @@ func TestLoadBalancerRemove(t *testing.T) { }) } -func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { +func testClusterIPNoEndpoint(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + svcIP := svc1IPv4 + if isIPv6 { + svcIP = svc1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withCleanupStaleUDPSvcConntrack) - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) - updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false, nil) + svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, nil, nil, false, nil) + updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), apiProtocol, nil, nil, false, nil) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) - protocol := binding.ProtocolTCP - if isIPv6 { - protocol = binding.ProtocolTCPv6 - } mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, []k8sproxy.Endpoint{}).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: false, ClusterGroupID: 1, }).Times(1) @@ -1989,10 +1992,13 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { assert.Contains(t, fp.serviceInstalledMap, svcPortName) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) + } mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: false, ClusterGroupID: 1, }).Times(1) @@ -2001,61 +2007,71 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { } func TestClusterIPNoEndpoint(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { - testClusterIPNoEndpoint(t, svc1IPv4, false) + t.Run("IPv4 TCP", func(t *testing.T) { + testClusterIPNoEndpoint(t, binding.ProtocolTCP, false) }) - t.Run("IPv6", func(t *testing.T) { - testClusterIPNoEndpoint(t, svc1IPv6, true) + t.Run("IPv4 UDP", func(t *testing.T) { + testClusterIPNoEndpoint(t, binding.ProtocolUDP, false) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + testClusterIPNoEndpoint(t, binding.ProtocolTCPv6, true) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + testClusterIPNoEndpoint(t, binding.ProtocolUDPv6, true) }) } -func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, isIPv6 bool) { +func testNodePortNoEndpoint(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) svc := makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), - corev1.ProtocolTCP, + apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) updatedSvc := makeTestNodePortService(&svcPortName, svcIP, nil, - int32(svcPort+1), - int32(svcNodePort), - corev1.ProtocolTCP, + int32(svcPort), + int32(svcNodePort)+1, + apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) - vIP := agentconfig.VirtualNodePortDNATIPv4 - protocol := binding.ProtocolTCP - if isIPv6 { - vIP = agentconfig.VirtualNodePortDNATIPv6 - protocol = binding.ProtocolTCPv6 - } - mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), - Protocol: protocol, + Protocol: bindingProtocol, LocalGroupID: 1, ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, @@ -2065,45 +2081,64 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.syncProxyRules() - mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + if needClearConntrackEntries(bindingProtocol) { + for _, nodeIP := range nodePortAddresses { + mockRouteClient.EXPECT().ClearConntrackEntryForService(nodeIP, uint16(svcNodePort), nil, bindingProtocol).Times(1) + } + mockRouteClient.EXPECT().ClearConntrackEntryForService(virtualNodePortDNATIPv4, uint16(svcNodePort), nil, bindingProtocol).Times(1) + } mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: svcIP, - ServicePort: uint16(svcPort + 1), - Protocol: protocol, - LocalGroupID: 1, - ClusterGroupID: 2, - }).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, - ServicePort: uint16(svcNodePort), - Protocol: protocol, + ServiceIP: virtualNodePortDNATIPv4, + ServicePort: uint16(svcNodePort) + 1, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort)+1, gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } func TestNodePortNoEndpoint(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { - testNodePortNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, false) + t.Run("IPv4 TCP", func(t *testing.T) { + testNodePortNoEndpoint(t, binding.ProtocolTCP, false) }) - t.Run("IPv6", func(t *testing.T) { - testNodePortNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, true) + t.Run("IPv4 UDP", func(t *testing.T) { + testNodePortNoEndpoint(t, binding.ProtocolUDP, false) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + testNodePortNoEndpoint(t, binding.ProtocolTCPv6, true) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + testNodePortNoEndpoint(t, binding.ProtocolUDPv6, true) }) } -func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP, loadBalancerIPModeProxyIP net.IP, isIPv6 bool) { +func testLoadBalancerNoEndpoint(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal @@ -2115,7 +2150,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), - corev1.ProtocolTCP, + apiProtocol, nil, &internalTrafficPolicy, externalTrafficPolicy) @@ -2126,33 +2161,26 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort+1), int32(svcNodePort), - corev1.ProtocolTCP, + apiProtocol, nil, &internalTrafficPolicy, externalTrafficPolicy) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) - vIP := agentconfig.VirtualNodePortDNATIPv4 - protocol := binding.ProtocolTCP - if isIPv6 { - vIP = agentconfig.VirtualNodePortDNATIPv6 - protocol = binding.ProtocolTCPv6 - } - mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), - Protocol: protocol, + Protocol: bindingProtocol, LocalGroupID: 1, ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, @@ -2162,7 +2190,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: loadBalancerIP, ServicePort: uint16(svcPort), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, @@ -2173,21 +2201,25 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol).Times(1) + } mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), - Protocol: protocol, + Protocol: bindingProtocol, LocalGroupID: 1, ClusterGroupID: 2, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, @@ -2197,7 +2229,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: loadBalancerIP, ServicePort: uint16(svcPort + 1), - Protocol: protocol, + Protocol: bindingProtocol, TrafficPolicyLocal: true, LocalGroupID: 1, ClusterGroupID: 2, @@ -2210,18 +2242,30 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP } func TestLoadBalancerNoEndpoint(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { - testLoadBalancerNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, false) + t.Run("IPv4 TCP", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, binding.ProtocolTCP, false) }) - t.Run("IPv6", func(t *testing.T) { - testLoadBalancerNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, true) + t.Run("IPv4 UDP", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, binding.ProtocolUDP, false) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, binding.ProtocolTCPv6, true) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, binding.ProtocolUDPv6, true) }) } -func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { +func testClusterIPRemoveSamePortEndpoint(t *testing.T, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() + svcIP := svc1IPv4 + epIP := ep1IPv4 + if isIPv6 { + svcIP = svc1IPv6 + epIP = ep1IPv6 + } fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withCleanupStaleUDPSvcConntrack) svcPortNameTCP := makeSvcPortName("ns", "svc-tcp", strconv.Itoa(svcPort), corev1.ProtocolTCP) @@ -2279,20 +2323,38 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP func TestClusterIPRemoveSamePortEndpoint(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - testClusterIPRemoveSamePortEndpoint(t, svc1IPv4, ep1IPv4, false) + testClusterIPRemoveSamePortEndpoint(t, false) }) t.Run("IPv6", func(t *testing.T) { - testClusterIPRemoveSamePortEndpoint(t, svc1IPv6, ep1IPv6, true) + testClusterIPRemoveSamePortEndpoint(t, true) }) } -func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP, loadBalancerIP, loadBalancerIPModeProxyIP net.IP, bindingProtocol binding.Protocol, isIPv6 bool) { +func testLoadBalancerRemoveEndpoints(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() apiProtocol := getAPIProtocol(bindingProtocol) // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + externalIP := externalIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + epIP := ep1IPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + svcNodePortIP := svcNodePortIPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + externalIP = externalIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + epIP = ep1IPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + svcNodePortIP = svcNodePortIPv6 + } fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeCluster @@ -2315,13 +2377,6 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) - vIP := agentconfig.VirtualNodePortDNATIPv4 - svcNodePortIP := svcNodePortIPv4 - if isIPv6 { - vIP = agentconfig.VirtualNodePortDNATIPv6 - svcNodePortIP = svcNodePortIPv6 - } - mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2331,7 +2386,7 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s ClusterGroupID: 1, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, IsExternal: true, @@ -2364,10 +2419,10 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) if needClearConntrackEntries(bindingProtocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), epIP, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), epIP, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), epIP, bindingProtocol) - mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), epIP, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), epIP, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), epIP, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), epIP, bindingProtocol).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), epIP, bindingProtocol).Times(1) } fp.endpointsChanges.OnEndpointSliceUpdate(eps, true) fp.syncProxyRules() @@ -2381,23 +2436,29 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s func TestLoadBalancerRemoveEndpoints(t *testing.T) { t.Run("IPv4 TCP", func(t *testing.T) { - testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, binding.ProtocolTCP, false) + testLoadBalancerRemoveEndpoints(t, binding.ProtocolTCP, false) }) t.Run("IPv4 UDP", func(t *testing.T) { - testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, binding.ProtocolUDP, false) + testLoadBalancerRemoveEndpoints(t, binding.ProtocolUDP, false) }) t.Run("IPv6 TCP", func(t *testing.T) { - testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, binding.ProtocolTCPv6, true) + testLoadBalancerRemoveEndpoints(t, binding.ProtocolTCPv6, true) }) t.Run("IPv6 UDP", func(t *testing.T) { - testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, binding.ProtocolUDPv6, true) + testLoadBalancerRemoveEndpoints(t, binding.ProtocolUDPv6, true) }) } -func testSessionAffinity(t *testing.T, svcIP net.IP, epIP net.IP, affinitySeconds int32, isIPv6 bool) { +func testSessionAffinity(t *testing.T, affinitySeconds int32, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() + svcIP := svc1IPv4 + epIP := ep1IPv4 + if isIPv6 { + svcIP = svc1IPv6 + epIP = ep1IPv6 + } fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { @@ -2447,10 +2508,10 @@ func testSessionAffinity(t *testing.T, svcIP net.IP, epIP net.IP, affinitySecond func TestSessionAffinity(t *testing.T) { affinitySeconds := corev1.DefaultClientIPServiceAffinitySeconds t.Run("IPv4", func(t *testing.T) { - testSessionAffinity(t, svc1IPv4, ep1IPv4, affinitySeconds, false) + testSessionAffinity(t, affinitySeconds, false) }) t.Run("IPv6", func(t *testing.T) { - testSessionAffinity(t, svc1IPv6, ep1IPv6, affinitySeconds, true) + testSessionAffinity(t, affinitySeconds, true) }) } @@ -2458,21 +2519,28 @@ func TestSessionAffinityOverflow(t *testing.T) { // Ensure that the SessionAffinity timeout is truncated to the max supported value, instead // of wrapping around. affinitySeconds := int32(math.MaxUint16 + 10) - testSessionAffinity(t, svc1IPv4, ep1IPv4, affinitySeconds, false) + testSessionAffinity(t, affinitySeconds, false) } -func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIPv6 bool) { +func testSessionAffinityNoEndpoint(t *testing.T, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) - + protocol := binding.ProtocolTCP + externalIP := externalIPv4 + svcIP := svc1IPv4 + if isIPv6 { + protocol = binding.ProtocolTCPv6 + externalIP = externalIPv6 + svcIP = svc1IPv6 + } timeoutSeconds := corev1.DefaultClientIPServiceAffinitySeconds svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() - svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} + svc.Spec.ExternalIPs = []string{externalIP.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP svc.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ ClientIP: &corev1.ClientIPConfig{ @@ -2489,10 +2557,6 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne makeServiceMap(fp, svc) makeEndpointsMap(fp) - protocol := binding.ProtocolTCP - if isIPv6 { - protocol = binding.ProtocolTCPv6 - } mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), true, []k8sproxy.Endpoint{}).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, @@ -2506,53 +2570,54 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne func TestSessionAffinityNoEndpoint(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - testSessionAffinityNoEndpoint(t, net.ParseIP("50.60.70.81"), svc1IPv4, false) + testSessionAffinityNoEndpoint(t, false) }) t.Run("IPv6", func(t *testing.T) { - testSessionAffinityNoEndpoint(t, net.ParseIP("5060:70::81"), svc1IPv6, true) + testSessionAffinityNoEndpoint(t, true) }) } -func testServicePortUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - loadBalancerIP net.IP, - loadBalancerIPModeProxyIP net.IP, - epIP net.IP, - svcType corev1.ServiceType, - isIPv6 bool) { +func testServicePortUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool, svcType corev1.ServiceType) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + epIP := ep1IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + epIP = ep1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var svc, updatedSvc *corev1.Service switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false, nil) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, nil, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort+1), apiProtocol, nil, nil, false, nil) case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort+1), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort+1), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2563,6 +2628,9 @@ func testServicePortUpdate(t *testing.T, }).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol).Times(1) + } s2 := mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), @@ -2573,7 +2641,7 @@ func testServicePortUpdate(t *testing.T, if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -2582,9 +2650,9 @@ func testServicePortUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -2605,6 +2673,9 @@ func testServicePortUpdate(t *testing.T, mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol).Times(1) + } s2 = mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: loadBalancerIP, ServicePort: uint16(svcPort + 1), @@ -2617,6 +2688,7 @@ func testServicePortUpdate(t *testing.T, mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } + fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) assert.Contains(t, fp.endpointsInstalledMap, svcPortName) @@ -2628,66 +2700,87 @@ func testServicePortUpdate(t *testing.T, func TestServicePortUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServicePortUpdate(t, nil, svc1IPv4, nil, nil, ep1IPv4, corev1.ServiceTypeClusterIP, false) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeClusterIP) }) - t.Run("NodePort", func(t *testing.T) { - testServicePortUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, nil, ep1IPv4, corev1.ServiceTypeNodePort, false) + t.Run("NodePort TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServicePortUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, ep1IPv4, corev1.ServiceTypeLoadBalancer, false) + t.Run("NodePort UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeNodePort) + }) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeLoadBalancer) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServicePortUpdate(t, nil, svc1IPv6, nil, nil, ep1IPv6, corev1.ServiceTypeClusterIP, true) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("NodePort TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeNodePort) + }) + t.Run("NodePort UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeNodePort) }) - t.Run("NodePort", func(t *testing.T) { - testServicePortUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, nil, ep1IPv6, corev1.ServiceTypeNodePort, true) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeLoadBalancer) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServicePortUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, ep1IPv6, corev1.ServiceTypeLoadBalancer, true) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServicePortUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeLoadBalancer) }) }) } -func testServiceNodePortUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - loadBalancerIP net.IP, - loadBalancerIPModeProxyIP net.IP, - epIP net.IP, - svcType corev1.ServiceType, - isIPv6 bool) { +func testServiceNodePortUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool, svcType corev1.ServiceType) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + epIP := ep1IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + epIP = ep1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var svc, updatedSvc *corev1.Service switch svcType { case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort+1), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort+1), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2699,7 +2792,7 @@ func testServiceNodePortUpdate(t *testing.T, if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -2708,10 +2801,16 @@ func testServiceNodePortUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol) + s1 := mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + if needClearConntrackEntries(bindingProtocol) { + for _, nodeIP := range nodePortAddresses { + mockRouteClient.EXPECT().ClearConntrackEntryForService(nodeIP, uint16(svcNodePort), nil, bindingProtocol).Times(1) + } + mockRouteClient.EXPECT().ClearConntrackEntryForService(virtualNodePortDNATIPv4, uint16(svcNodePort), nil, bindingProtocol).Times(1) + } s2 := mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort + 1), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -2743,56 +2842,78 @@ func testServiceNodePortUpdate(t *testing.T, func TestServiceNodePortUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("NodePort", func(t *testing.T) { - testServiceNodePortUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, nil, ep1IPv4, corev1.ServiceTypeNodePort, false) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeNodePort) + }) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceNodePortUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, ep1IPv4, corev1.ServiceTypeLoadBalancer, false) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeLoadBalancer) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("NodePort", func(t *testing.T) { - testServiceNodePortUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, nil, ep1IPv6, corev1.ServiceTypeNodePort, true) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeNodePort) + }) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeNodePort) + }) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeLoadBalancer) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceNodePortUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, ep1IPv6, corev1.ServiceTypeLoadBalancer, true) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceNodePortUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeLoadBalancer) }) }) } -func testServiceExternalTrafficPolicyUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - loadBalancerIP net.IP, - loadBalancerIPModeProxyIP net.IP, - externalIP net.IP, - ep1IP net.IP, - ep2IP net.IP, - svcType corev1.ServiceType, - isIPv6 bool) { +func testServiceExternalTrafficPolicyUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool, svcType corev1.ServiceType) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + externalIP := externalIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + ep1IP := ep1IPv4 + ep2IP := ep2IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + externalIP = externalIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + ep1IP = ep1IPv6 + ep2IP = ep2IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var svc, updatedSvc *corev1.Service switch svcType { case corev1.ServiceTypeClusterIP: // ExternalTrafficPolicy defaults to Cluster. - svc = makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) - updatedSvc = svc.DeepCopy() - updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal + svc = makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), apiProtocol, nil, nil, false, nil) case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } + updatedSvc = svc.DeepCopy() + updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal makeServiceMap(fp, svc) - remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) - localEp, localEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep2IP, int32(svcPort), corev1.ProtocolTCP, true) + remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), apiProtocol, false) + localEp, localEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep2IP, int32(svcPort), apiProtocol, true) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*remoteEp, *localEp}, @@ -2803,13 +2924,6 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), hostname, "", svcPort, true, true, true, false, nil)} expectedAllEps := append(expectedLocalEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, true, false, nil)) - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2829,7 +2943,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -2878,9 +2992,9 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + s1 := mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol).Times(1) s2 := mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, LocalGroupID: 2, @@ -2917,48 +3031,74 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, func TestServiceExternalTrafficPolicyUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv4, nil, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeClusterIP, false) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeClusterIP) + }) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeNodePort) }) - t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, loadBalancerIPModeProxyIPv4, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeLoadBalancer) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv6, nil, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeClusterIP, true) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeNodePort) }) - t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, loadBalancerIPModeProxyIPv6, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeLoadBalancer) }) }) } -func testServiceInternalTrafficPolicyUpdate(t *testing.T, - svcIP net.IP, - ep1IP net.IP, - ep2IP net.IP, - isIPv6 bool) { +func testServiceInternalTrafficPolicyUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + svcIP := svc1IPv4 + ep1IP := ep1IPv4 + ep2IP := ep2IPv4 + if isIPv6 { + svcIP = svc1IPv6 + ep1IP = ep1IPv6 + ep2IP = ep2IPv6 + } fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withProxyAll) internalTrafficPolicyCluster := corev1.ServiceInternalTrafficPolicyCluster internalTrafficPolicyLocal := corev1.ServiceInternalTrafficPolicyLocal - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyCluster, false, nil) - updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicyLocal, false, nil) + svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, nil, &internalTrafficPolicyCluster, false, nil) + updatedSvc := svc.DeepCopy() + updatedSvc.Spec.InternalTrafficPolicy = &internalTrafficPolicyLocal makeServiceMap(fp, svc) - remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) - localEp, localEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep2IP, int32(svcPort), corev1.ProtocolTCP, true) + remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), apiProtocol, false) + localEp, localEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep2IP, int32(svcPort), apiProtocol, true) endpointSlice := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*remoteEp, *localEp}, @@ -2970,11 +3110,6 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, expectedRemoteEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, true, false, nil)} expectedAllEps := append(expectedLocalEps, expectedRemoteEps...) - bindingProtocol := binding.ProtocolTCP - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3002,6 +3137,9 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedRemoteEps).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), ep1IP, bindingProtocol).Times(1) + } mockOFClient.EXPECT().UninstallServiceGroup(binding.GroupIDType(1)).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedLocalEps).Times(1) @@ -3022,28 +3160,49 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, func TestServiceInternalTrafficPolicyUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceInternalTrafficPolicyUpdate(t, svc1IPv4, ep1IPv4, ep2IPv4, false) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceInternalTrafficPolicyUpdate(t, binding.ProtocolTCP, false) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceInternalTrafficPolicyUpdate(t, binding.ProtocolUDP, false) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceInternalTrafficPolicyUpdate(t, svc1IPv6, ep1IPv6, ep2IPv6, true) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceInternalTrafficPolicyUpdate(t, binding.ProtocolTCPv6, true) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceInternalTrafficPolicyUpdate(t, binding.ProtocolUDPv6, true) }) }) } -func testServiceIngressIPsUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - epIP net.IP, - loadBalancerIPs []net.IP, - updatedLoadBalancerIPs []net.IP, - isIPv6 bool) { +func testServiceIngressIPsAndExternalIPsUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIPs := []net.IP{net.ParseIP("169.254.1.1"), net.ParseIP("169.254.1.2")} + updatedLoadBalancerIPs := []net.IP{net.ParseIP("169.254.1.2"), net.ParseIP("169.254.1.3")} + externalIPs := []net.IP{net.ParseIP("192.168.77.101"), net.ParseIP("192.168.77.102")} + updatedExternalIPs := []net.IP{net.ParseIP("192.168.77.102"), net.ParseIP("192.168.77.103")} + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + epIP := ep1IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIPs = []net.IP{net.ParseIP("fec0::169:254:1:1"), net.ParseIP("fec0::169:254:1:2")} + updatedLoadBalancerIPs = []net.IP{net.ParseIP("fec0::169:254:1:2"), net.ParseIP("fec0::169:254:1:3")} + externalIPs = []net.IP{net.ParseIP("2001::192:168:77:101"), net.ParseIP("2001::192:168:77:102")} + updatedExternalIPs = []net.IP{net.ParseIP("2001::192:168:77:102"), net.ParseIP("2001::192:168:77:103")} + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + epIP = ep1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var loadBalancerIPStrs, updatedLoadBalancerIPStrs []string for _, ip := range loadBalancerIPs { @@ -3052,24 +3211,24 @@ func testServiceIngressIPsUpdate(t *testing.T, for _, ip := range updatedLoadBalancerIPs { updatedLoadBalancerIPStrs = append(updatedLoadBalancerIPStrs, ip.String()) } + var externalIPStrs, updatedExternalIPStrs []string + for _, ip := range externalIPs { + externalIPStrs = append(externalIPStrs, ip.String()) + } + for _, ip := range updatedExternalIPs { + updatedExternalIPStrs = append(updatedExternalIPStrs, ip.String()) + } - svc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc := makeTestLoadBalancerService(&svcPortName, svcIP, externalIPs, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, updatedExternalIPs, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3079,7 +3238,7 @@ func testServiceIngressIPsUpdate(t *testing.T, ClusterGroupID: 1, }).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -3095,16 +3254,31 @@ func testServiceIngressIPsUpdate(t *testing.T, IsExternal: true, }).Times(1) } + for _, ip := range externalIPs { + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: ip, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + ClusterGroupID: 1, + IsExternal: true, + }).Times(1) + } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) for _, ip := range loadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(ip).Times(1) } + for _, ip := range externalIPs { + mockRouteClient.EXPECT().AddExternalIPRoute(ip).Times(1) + } toDeleteLoadBalancerIPs := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) toAddLoadBalancerIPs := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) for _, ipStr := range toDeleteLoadBalancerIPs { mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(net.ParseIP(ipStr), uint16(svcPort), nil, bindingProtocol).Times(1) + } } for _, ipStr := range toAddLoadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3116,6 +3290,25 @@ func testServiceIngressIPsUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)).Times(1) } + toDeleteExternalIPs := smallSliceDifference(externalIPStrs, updatedExternalIPStrs) + toAddLoadExternalIPs := smallSliceDifference(updatedExternalIPStrs, externalIPStrs) + for _, ipStr := range toDeleteExternalIPs { + mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(net.ParseIP(ipStr), uint16(svcPort), nil, bindingProtocol).Times(1) + } + } + for _, ipStr := range toAddLoadExternalIPs { + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: net.ParseIP(ipStr), + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + ClusterGroupID: 1, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)).Times(1) + } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -3128,62 +3321,68 @@ func testServiceIngressIPsUpdate(t *testing.T, func TestServiceIngressIPsUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("LoadBalancer", func(t *testing.T) { - loadBalancerIPs := []net.IP{net.ParseIP("169.254.1.1"), net.ParseIP("169.254.1.2")} - updatedLoadBalancerIPs := []net.IP{net.ParseIP("169.254.1.2"), net.ParseIP("169.254.1.3")} - testServiceIngressIPsUpdate(t, nodePortAddressesIPv4, svc1IPv4, ep1IPv4, loadBalancerIPs, updatedLoadBalancerIPs, false) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceIngressIPsAndExternalIPsUpdate(t, binding.ProtocolTCP, false) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceIngressIPsAndExternalIPsUpdate(t, binding.ProtocolUDP, false) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("LoadBalancer", func(t *testing.T) { - loadBalancerIPs := []net.IP{net.ParseIP("fec0::169:254:1:1"), net.ParseIP("fec0::169:254:1:2")} - updatedLoadBalancerIPs := []net.IP{net.ParseIP("fec0::169:254:1:2"), net.ParseIP("fec0::169:254:1:3")} - testServiceIngressIPsUpdate(t, nodePortAddressesIPv6, svc1IPv6, ep1IPv6, loadBalancerIPs, updatedLoadBalancerIPs, true) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceIngressIPsAndExternalIPsUpdate(t, binding.ProtocolTCPv6, true) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceIngressIPsAndExternalIPsUpdate(t, binding.ProtocolUDPv6, true) }) }) } -func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - loadBalancerIP net.IP, - epIP net.IP, - svcType corev1.ServiceType, - isIPv6 bool) { +func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool, svcType corev1.ServiceType) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + epIP := ep1IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + epIP = ep1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var svc, updatedSvc *corev1.Service affinitySeconds := int32(10) updatedAffinitySeconds := int32(100) switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false, nil) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &updatedAffinitySeconds, nil, false, nil) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, &affinitySeconds, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, &updatedAffinitySeconds, nil, false, nil) case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &updatedAffinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &updatedAffinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), true, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3204,7 +3403,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -3213,10 +3412,10 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, AffinityTimeout: uint16(affinitySeconds), }).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -3260,69 +3459,91 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, func TestServiceStickyMaxAgeSecondsUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nil, svc1IPv4, nil, ep1IPv4, corev1.ServiceTypeClusterIP, false) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeClusterIP) }) - t.Run("NodePort", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, ep1IPv4, corev1.ServiceTypeNodePort, false) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, ep1IPv4, corev1.ServiceTypeLoadBalancer, false) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeNodePort) + }) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeLoadBalancer) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nil, svc1IPv6, nil, ep1IPv6, corev1.ServiceTypeClusterIP, true) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeClusterIP) }) - t.Run("NodePort", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, ep1IPv6, corev1.ServiceTypeNodePort, true) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceStickyMaxAgeSecondsUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, ep1IPv6, corev1.ServiceTypeLoadBalancer, true) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeNodePort) + }) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceStickyMaxAgeSecondsUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeLoadBalancer) }) }) } -func testServiceSessionAffinityTypeUpdate(t *testing.T, - nodePortAddresses []net.IP, - svcIP net.IP, - loadBalancerIP net.IP, - epIP net.IP, - svcType corev1.ServiceType, - isIPv6 bool) { +func testServiceSessionAffinityTypeUpdate(t *testing.T, bindingProtocol binding.Protocol, isIPv6 bool, svcType corev1.ServiceType) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator() - fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + nodePortAddresses := nodePortAddressesIPv4 + svcIP := svc1IPv4 + loadBalancerIP := loadBalancerIPv4 + loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIPv4 + virtualNodePortDNATIPv4 := agentconfig.VirtualNodePortDNATIPv4 + epIP := ep1IPv4 + if isIPv6 { + nodePortAddresses = nodePortAddressesIPv6 + svcIP = svc1IPv6 + loadBalancerIP = loadBalancerIPv6 + loadBalancerIPModeProxyIP = loadBalancerIPModeProxyIPv6 + virtualNodePortDNATIPv4 = agentconfig.VirtualNodePortDNATIPv6 + epIP = ep1IPv6 + } + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) var svc, updatedSvc *corev1.Service affinitySeconds := int32(100) switch svcType { case corev1.ServiceTypeClusterIP: - svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) - updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, &affinitySeconds, nil, false, nil) + svc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, nil, nil, false, nil) + updatedSvc = makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), apiProtocol, &affinitySeconds, nil, false, nil) case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} - bindingProtocol := binding.ProtocolTCP - vIP := agentconfig.VirtualNodePortDNATIPv4 - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - vIP = agentconfig.VirtualNodePortDNATIPv6 - } - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3344,7 +3565,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -3353,9 +3574,9 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIPv4, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: vIP, + ServiceIP: virtualNodePortDNATIPv4, ServicePort: uint16(svcNodePort), Protocol: bindingProtocol, ClusterGroupID: 1, @@ -3400,25 +3621,43 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, func TestServiceSessionAffinityTypeUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nil, svc1IPv4, nil, ep1IPv4, corev1.ServiceTypeClusterIP, false) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeClusterIP) }) - t.Run("NodePort", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, ep1IPv4, corev1.ServiceTypeNodePort, false) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeNodePort) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, ep1IPv4, corev1.ServiceTypeLoadBalancer, false) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeNodePort) + }) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCP, false, corev1.ServiceTypeLoadBalancer) + }) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDP, false, corev1.ServiceTypeLoadBalancer) }) }) t.Run("IPv6", func(t *testing.T) { - t.Run("ClusterIP", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nil, svc1IPv6, nil, ep1IPv6, corev1.ServiceTypeClusterIP, true) + t.Run("ClusterIP TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("ClusterIP UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeClusterIP) + }) + t.Run("NodePort TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeNodePort) + }) + t.Run("NodePort UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeNodePort) }) - t.Run("NodePort", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, ep1IPv6, corev1.ServiceTypeNodePort, true) + t.Run("LoadBalancer TCP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolTCPv6, true, corev1.ServiceTypeLoadBalancer) }) - t.Run("LoadBalancer", func(t *testing.T) { - testServiceSessionAffinityTypeUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, ep1IPv6, corev1.ServiceTypeLoadBalancer, true) + t.Run("LoadBalancer UDP", func(t *testing.T) { + testServiceSessionAffinityTypeUpdate(t, binding.ProtocolUDPv6, true, corev1.ServiceTypeLoadBalancer) }) }) } diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 3a8f3d78494..adb630a1324 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -33,8 +33,9 @@ import ( ) var ( - egressStatus string - multicastStatus string + egressStatus string + multicastStatus string + cleanupStaleUDPSvcConntrackStatus string ) func Test_getGatesResponse(t *testing.T) { @@ -54,7 +55,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "AntreaIPAM", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "AntreaPolicy", Status: "Disabled", Version: "BETA"}, {Component: "agent", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, - {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: "Disabled", Version: "ALPHA"}, + {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: cleanupStaleUDPSvcConntrackStatus, Version: "BETA"}, {Component: "agent", Name: "Egress", Status: egressStatus, Version: "BETA"}, {Component: "agent", Name: "EgressSeparateSubnet", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "EgressTrafficShaping", Status: "Disabled", Version: "ALPHA"}, @@ -219,8 +220,10 @@ func Test_getControllerGatesResponse(t *testing.T) { func init() { egressStatus = "Enabled" multicastStatus = "Enabled" + cleanupStaleUDPSvcConntrackStatus = "Enabled" if runtime.IsWindowsPlatform() { egressStatus = "Disabled" multicastStatus = "Disabled" + cleanupStaleUDPSvcConntrackStatus = "Disabled" } } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index fe673654552..05cb51a9df5 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -51,6 +51,7 @@ const ( TopologyAwareHints featuregate.Feature = "TopologyAwareHints" // alpha: v1.13 + // beta: v2.1 // Enable support for cleaning up stale UDP Service conntrack connections in AntreaProxy. CleanupStaleUDPSvcConntrack featuregate.Feature = "CleanupStaleUDPSvcConntrack" @@ -181,7 +182,7 @@ var ( Egress: {Default: true, PreRelease: featuregate.Beta}, EndpointSlice: {Default: true, PreRelease: featuregate.GA}, TopologyAwareHints: {Default: true, PreRelease: featuregate.Beta}, - CleanupStaleUDPSvcConntrack: {Default: false, PreRelease: featuregate.Alpha}, + CleanupStaleUDPSvcConntrack: {Default: true, PreRelease: featuregate.Beta}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, AntreaIPAM: {Default: false, PreRelease: featuregate.Alpha}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha},