Skip to content

Commit

Permalink
Refine Endpoint selection for MC Service
Browse files Browse the repository at this point in the history
Add a new flow for the Service's ClusterIP in the EndpointDNAT table with
group action. When an Endpoint of a Multi-cluster Service is a local Service
ClusterIP and being selected, it will go to the corresponding exported Service's
group to select the final Endpoint. This can avoid that the traffic goes out of the
OVS bridge from antrea-gw0 (and handled by kube-proxy when it is running) and
comes back again.

The proposal details can be found in the comment:
antrea-io#4508 (comment)

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Mar 9, 2023
1 parent d0ecfb3 commit cd7e9e3
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 130 deletions.
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ func run(o *Options) error {

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v6GroupCounter)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
Expand Down
33 changes: 30 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,17 @@ type Client interface {
// otherwise the installation will fail.
// nodeLocalExternal represents if the externalTrafficPolicy is Local or not. This field is meaningful only when
// the svcType is NodePort or LoadBalancer.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error
// nestedEndpointSupport represents if the Service has the Endpoints which is other Service's ClusterIP.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nestedEndpointSupport bool) error
// UninstallServiceFlows removes flows installed by InstallServiceFlows.
UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

// InstallServiceClusterIPFlows install flows for accessing Endpoints which is Service's ClusterIP, and has an action
// to Service's corresponding group.
InstallServiceClusterIPFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) error
// UninstallServiceClusterIPFlows removes flows installed by InstallServiceClusterIPFlows.
UninstallServiceClusterIPFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) error

// GetFlowTableStatus should return an array of flow table status, all existing flow tables should be included in the list.
GetFlowTableStatus() []binding.TableStatus

Expand Down Expand Up @@ -658,6 +665,10 @@ func generateServicePortFlowCacheKey(svcIP net.IP, svcPort uint16, protocol bind
return fmt.Sprintf("S%s%s%x", svcIP, protocol, svcPort)
}

func generateServiceClusterIPFlowCacheKey(svcIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) string {
return fmt.Sprintf("S%s%s%x%x", svcIP, protocol, svcPort, groupID)
}

func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down Expand Up @@ -692,11 +703,11 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error {
func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nestedEndpointSupport bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType))
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType, nestedEndpointSupport))
if affinityTimeout != 0 {
flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, svcType))
}
Expand All @@ -711,6 +722,22 @@ func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol bi
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) InstallServiceClusterIPFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.featureService.serviceClusterIPFlow(svcIP, svcPort, protocol, groupID))
cacheKey := generateServiceClusterIPFlowCacheKey(svcIP, svcPort, protocol, groupID)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
}

func (c *client) UninstallServiceClusterIPFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := generateServiceClusterIPFlowCacheKey(svcIP, svcPort, protocol, groupID)
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) GetServiceFlowKeys(svcIP net.IP, svcPort uint16, protocol binding.Protocol, endpoints []proxy.Endpoint) []string {
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
flowKeys := c.getFlowKeysFromCache(c.featureService.cachedFlows, cacheKey)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {

cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol)

assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType))
assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType, false))
fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey)
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
proxy.NewBaseEndpointInfo("10.10.0.12", "", "", 80, true, true, false, false, nil),
}

assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer))
assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer, false))
assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints))
flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints)
expectedFlowKeys := []string{
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ var (
// externalTrafficPolicy is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21)
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlActionField = binding.NewRegField(4, 22, 23)
TrafficControlActionField = binding.NewRegField(4, 22, 23)
// reg4[24]: Mark to indicate whether the Service is a Multi-cluster Service.
MulticlusterServiceRegMark = binding.NewOneBitRegMark(4, 24)
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10)

Expand Down
27 changes: 26 additions & 1 deletion pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2364,7 +2364,8 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
protocol binding.Protocol,
withSessionAffinity,
nodeLocalExternal bool,
serviceType v1.ServiceType) binding.Flow {
serviceType v1.ServiceType,
nestedEndpointSupport bool) binding.Flow {
cookieID := f.cookieAllocator.Request(f.category).Raw()
var lbResultMark *binding.RegMark
if withSessionAffinity {
Expand Down Expand Up @@ -2406,9 +2407,33 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
if f.enableAntreaPolicy {
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
}
if nestedEndpointSupport {
flowBuilder = flowBuilder.Action().LoadRegMark(MulticlusterServiceRegMark)
}
return flowBuilder.Action().Group(groupID).Done()
}

func (f *featureService) serviceClusterIPFlow(clusterIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) binding.Flow {
unionVal := (EpSelectedRegMark.GetValue() << EndpointPortField.GetRange().Length()) + uint32(svcPort)
flowBuilder := EndpointDNATTable.ofTable.BuildFlow(priorityHigh).
MatchProtocol(protocol).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchRegFieldWithValue(EpUnionField, unionVal)
ipProtocol := getIPProtocol(clusterIP)

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(clusterIP.To4())
flowBuilder = flowBuilder.MatchRegFieldWithValue(EndpointIPField, ipVal)
} else {
ipVal := []byte(clusterIP)
flowBuilder = flowBuilder.MatchXXReg(EndpointIP6Field.GetRegID(), ipVal)
}
flowBuilder.MatchRegMark(MulticlusterServiceRegMark)
return flowBuilder.Action().
Group(groupID).
Done()
}

// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint
// selection decision which is stored in regs.
func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protocol binding.Protocol) binding.Flow {
Expand Down
24 changes: 19 additions & 5 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cd7e9e3

Please sign in to comment.