Skip to content

Commit

Permalink
Merge 8c3e22f into be83891
Browse files Browse the repository at this point in the history
  • Loading branch information
luolanzone authored Feb 27, 2023
2 parents be83891 + 8c3e22f commit 858cb38
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 87 deletions.
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ func run(o *Options) error {

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v4GroupCounter)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v6GroupCounter)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -84,7 +85,7 @@ type Client interface {

// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error
Expand Down Expand Up @@ -618,11 +619,11 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
return c.getFlowKeysFromCache(c.featurePodConnectivity.podCachedFlows, interfaceName)
}

func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, mcsLocalService, endpoints...)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
Expand Down
19 changes: 17 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
"antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
Expand Down Expand Up @@ -844,12 +845,17 @@ func Test_client_GetPodFlowKeys(t *testing.T) {

func Test_client_InstallServiceGroup(t *testing.T) {
groupID := binding.GroupIDType(100)
mcsLocalService := &types.ServiceGroupInfo{
GroupID: binding.GroupIDType(2),
Endpoint: proxy.NewBaseEndpointInfo("10.10.0.101", "", "", 80, false, true, false, false, nil),
}

testCases := []struct {
name string
withSessionAffinity bool
endpoints []proxy.Endpoint
expectedGroup string
mcsLocalService *types.ServiceGroupInfo
}{
{
name: "IPv4 Endpoints",
Expand All @@ -861,6 +867,16 @@ func Test_client_InstallServiceGroup(t *testing.T) {
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT",
},
{
name: "IPv4 Endpoints with multi-cluster enabled",
endpoints: []proxy.Endpoint{
proxy.NewBaseEndpointInfo("10.10.0.100", "", "", 80, false, true, false, false, nil),
},
mcsLocalService: mcsLocalService,
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=group:2",
},
{
name: "IPv6 Endpoints",
endpoints: []proxy.Endpoint{
Expand Down Expand Up @@ -906,8 +922,7 @@ func Test_client_InstallServiceGroup(t *testing.T) {

m.EXPECT().AddOFEntries(gomock.Any()).Return(nil).Times(1)
m.EXPECT().DeleteOFEntries(gomock.Any()).Return(nil).Times(1)

assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.endpoints))
assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.mcsLocalService, tc.endpoints))
gCacheI, ok := fc.featureService.groupCache.Load(groupID)
require.True(t, ok)
group := getGroupFromCache(gCacheI.(binding.Group))
Expand Down
17 changes: 14 additions & 3 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
Expand Down Expand Up @@ -199,6 +200,9 @@ var (
// between the uplink and its pair port directly.
NonIPTable = newTable("NonIP", stageClassifier, pipelineNonIP, defaultDrop)

// Default bucket weight in an Openflow Group
defaultGroupBucketWeight = uint16(100)

// Flow priority level
priorityHigh = uint16(210)
priorityNormal = uint16(200)
Expand Down Expand Up @@ -2442,7 +2446,9 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16
// serviceEndpointGroup creates/modifies the group/buckets of Endpoints. If the withSessionAffinity is true, then buckets
// will resubmit packets back to ServiceLBTable to trigger the learn flow, the learn flow will then send packets to
// EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
// When mcsLocalService is not nil, the Service is a Multi-cluster Service with a member Service in the local cluster,
// in which case the packets should go to the local Service's group, the action will go to the group of the exported Service.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints ...proxy.Endpoint) binding.Group {
group := f.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID uint8
if withSessionAffinity {
Expand All @@ -2458,20 +2464,25 @@ func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withS

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(endpointIP.To4())
group = group.Bucket().Weight(100).
group = group.Bucket().Weight(defaultGroupBucketWeight).
LoadToRegField(EndpointIPField, ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
group = group.Bucket().Weight(defaultGroupBucketWeight).
LoadXXReg(EndpointIP6Field.GetRegID(), ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
}
}
if mcsLocalService != nil {
group = group.Bucket().Weight(defaultGroupBucketWeight).
Group(uint32(mcsLocalService.GroupID)).
Done()
}
return group
}

Expand Down
47 changes: 24 additions & 23 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions pkg/agent/openflow/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package types

import (
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

// ServiceGroupInfo is used in AntreaProxy for Multi-cluster Service load-balancing.
// It stores a local exported Service's GroupID and its ClusterIP.
type ServiceGroupInfo struct {
// GroupID of an exported Service.
GroupID binding.GroupIDType
// ClusterIP info of an exported Service.
Endpoint k8sproxy.Endpoint
}
Loading

0 comments on commit 858cb38

Please sign in to comment.