Skip to content

Commit

Permalink
Multi-cluster support with more modes
Browse files Browse the repository at this point in the history
In order to support multi-cluster traffic when the member cluster is
deployed with networkPolicyOnly, noEcap or hybrid mode, antrea-agent
will be responsible for the following things:

1. Create tunnel interface `antrea-tun0` for cross-cluster traffic
2. Watch all Pods on the Gateway and set up one rule per Pod in L3Fowarding
table as long as the Pod is running on a regular Node instead of the Gateway.
3. Update container interface's MTU with the tunnel header size deducted.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Jan 6, 2023
1 parent bdc8450 commit b4a2186
Show file tree
Hide file tree
Showing 17 changed files with 865 additions and 145 deletions.
62 changes: 50 additions & 12 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func run(o *Options) error {
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
Expand Down Expand Up @@ -258,7 +259,8 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
l7NetworkPolicyEnabled)
l7NetworkPolicyEnabled,
enableMulticlusterGW)
err = agentInitializer.Initialize()
if err != nil {
return fmt.Errorf("error initializing agent: %v", err)
Expand Down Expand Up @@ -320,20 +322,28 @@ func run(o *Options) error {
)
}

var mcRouteController *mcroute.MCRouteController
var mcDefaultRouteController *mcroute.MCDefaultRouteController
var mcStrechedNetworkPolicyController *mcroute.StretchedNetworkPolicyController
var mcPodRouteController *mcroute.MCPodRouteController
var mcInformerFactory mcinformers.SharedInformerFactory
var clusterServiceCIDR *net.IPNet
var mcInformerFactoryWithOption mcinformers.SharedInformerFactory

notEncapMode := networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap
if enableMulticlusterGW {
mcNamespace := env.GetPodNamespace()
if o.config.Multicluster.Namespace != "" {
mcNamespace = o.config.Multicluster.Namespace
}
mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync)
gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways()
ciImportInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports()
mcRouteController = mcroute.NewMCRouteController(
mcInformerFactoryWithOption = mcinformers.NewSharedInformerFactoryWithOptions(mcClient,
informerDefaultResync,
mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.FieldSelector = fields.Set{"metadata.namespace": mcNamespace}.String()
}),
)
gwInformer := mcInformerFactoryWithOption.Multicluster().V1alpha1().Gateways()
ciImportInformer := mcInformerFactoryWithOption.Multicluster().V1alpha1().ClusterInfoImports()
mcDefaultRouteController = mcroute.NewMCDefaultRouteController(
mcClient,
gwInformer,
ciImportInformer,
Expand All @@ -345,8 +355,21 @@ func run(o *Options) error {
o.config.Multicluster.EnableStretchedNetworkPolicy,
)
_, clusterServiceCIDR, _ = net.ParseCIDR(o.config.Multicluster.ServiceCIDR)
if notEncapMode {
mcPodRouteController = mcroute.NewMCPodRouteController(
mcClient,
podInformer.Informer(),
gwInformer.Informer(),
ofClient,
ovsBridgeClient,
ifaceStore,
nodeConfig,
mcNamespace,
)
}
}
if enableMulticlusterNP {
mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync)
labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities()
mcStrechedNetworkPolicyController = mcroute.NewMCAgentStretchedNetworkPolicyController(
ofClient,
Expand Down Expand Up @@ -485,15 +508,24 @@ func run(o *Options) error {
}
}

var mtuDeduction int
var cniServer *cniserver.CNIServer
var cniPodInfoStore cnipodcache.CNIPodInfoStore
var externalNodeController *externalnode.ExternalNodeController
var localExternalNodeInformer cache.SharedIndexInformer
if o.nodeType == config.K8sNode {
isChaining := false
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
if enableMulticlusterGW {
tunnelType := ovsconfig.TunnelType(o.config.TunnelType)
switch tunnelType {
case ovsconfig.GeneveTunnel:
mtuDeduction = config.GeneveOverhead
case ovsconfig.GRETunnel:
mtuDeduction = config.GREOverhead
case ovsconfig.VXLANTunnel:
mtuDeduction = config.VXLANOverhead
}
}
if o.nodeType == config.K8sNode {
isChaining := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()
cniServer = cniserver.New(
o.config.CNISocket,
o.config.HostProcPathPrefix,
Expand All @@ -504,6 +536,7 @@ func run(o *Options) error {
enableBridgingMode,
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
mtuDeduction,
networkReadyCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
Expand Down Expand Up @@ -756,10 +789,15 @@ func run(o *Options) error {
}

if enableMulticlusterGW {
mcInformerFactory.Start(stopCh)
go mcRouteController.Run(stopCh)
mcInformerFactoryWithOption.Start(stopCh)
go mcDefaultRouteController.Run(stopCh)
if notEncapMode {
go mcPodRouteController.Run(stopCh)
}
}

if enableMulticlusterNP {
mcInformerFactory.Start(stopCh)
go mcStrechedNetworkPolicyController.Run(stopCh)
}

Expand Down
9 changes: 4 additions & 5 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (o *Options) validateAntreaIPAMConfig() error {
return nil
}

func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType) error {
func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType, encryptionMode config.TrafficEncryptionModeType) error {
if !o.config.Multicluster.EnableGateway && !o.config.Multicluster.EnableStretchedNetworkPolicy {
return nil
}
Expand All @@ -320,9 +320,8 @@ func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeTy
return fmt.Errorf("Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy")
}

if encapMode != config.TrafficEncapModeEncap {
// Only Encap mode is supported for Multi-cluster Gateway.
return fmt.Errorf("Multicluster is only applicable to the %s mode", config.TrafficEncapModeEncap)
if encapMode.SupportsEncap() && encryptionMode == config.TrafficEncryptionModeWireGuard {
return fmt.Errorf("Multi-cluster Gateway doesn't support WireGuard encryption mode")
}

if o.config.Multicluster.ServiceCIDR != "" {
Expand Down Expand Up @@ -488,7 +487,7 @@ func (o *Options) validateK8sNodeOptions() error {
}
}
}
if err := o.validateMulticlusterConfig(encapMode); err != nil {
if err := o.validateMulticlusterConfig(encapMode, encryptionMode); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,12 @@ func (r *ResourceExportReconciler) refreshEndpointsResourceImport(
}
if len(svcResExport.Status.Conditions) > 0 {
if svcResExport.Status.Conditions[0].Status != corev1.ConditionTrue {
return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() +
"has not been converged successfully, retry later")
err := fmt.Errorf("the Service type of ResourceExport %s has not been converged successfully, retry later", svcResExportName.String())
return newResImport, false, err
}
} else {
return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() +
"has not been converged yet, retry later")
err := fmt.Errorf("the Service type of ResourceExport %s has not been converged yet, retry later", svcResExportName.String())
return newResImport, false, err
}
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
proxyAll bool
enableMulticluster bool
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
nodeType config.NodeType
Expand Down Expand Up @@ -144,6 +145,7 @@ func NewInitializer(
proxyAll bool,
connectUplinkToBridge bool,
enableL7NetworkPolicy bool,
enableMulticluster bool,
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -169,6 +171,7 @@ func NewInitializer(
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableMulticluster: enableMulticluster,
}
}

Expand Down Expand Up @@ -787,10 +790,11 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
// It's not necessary for new Linux kernel versions with the following patch:
// https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063.
shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel)
tunnelInterfaceSupported := i.networkConfig.TrafficEncapMode.SupportsEncap() || i.enableMulticluster

// Check the default tunnel port.
if portExists {
if i.networkConfig.TrafficEncapMode.SupportsEncap() &&
if tunnelInterfaceSupported &&
tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType &&
tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort &&
tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) {
Expand All @@ -807,7 +811,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil {
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if tunnelInterfaceSupported {
return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err)
}
klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err)
Expand All @@ -818,7 +822,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

// Create the default tunnel port and interface.
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if tunnelInterfaceSupported {
if tunnelPortName != defaultTunInterfaceName {
// Reset the tunnel interface name to the desired name before
// recreating the tunnel port and interface.
Expand Down Expand Up @@ -1141,7 +1145,8 @@ func (i *Initializer) getNodeMTU(transportInterface *net.Interface) (int, error)
if mtu <= 0 {
return 0, fmt.Errorf("Failed to fetch Node MTU : %v", mtu)
}
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
// When the multi-cluster is enabled, we need deduct MTU for any potential cross-cluster traffic.
if i.networkConfig.TrafficEncapMode.SupportsEncap() || i.enableMulticluster {
if i.networkConfig.TunnelType == ovsconfig.VXLANTunnel {
mtu -= config.VXLANOverhead
} else if i.networkConfig.TunnelType == ovsconfig.GeneveTunnel {
Expand Down
37 changes: 37 additions & 0 deletions pkg/agent/cniserver/interface_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,43 @@ func (ic *ifConfigurator) configureContainerLink(
}
}

func (ic *ifConfigurator) configureContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error {
var peerIdx int
if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error {
link, err := netlink.LinkByName(containerIFDev)
if err != nil {
return fmt.Errorf("failed to find interface %s in container %s: %v", containerIFDev, containerNetNS, err)
}
_, peerIdx, err = ip.GetVethPeerIfindex(containerIFDev)
if err != nil {
return fmt.Errorf("failed to get peer index for dev %s in container %s: %w", containerIFDev, containerNetNS, err)
}
err = netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction)
if err != nil {
return fmt.Errorf("failed to set MTU for interface %s in container %s: %v", containerIFDev, containerNetNS, err)
}
return nil
}); err != nil {
return err
}

peerIntf, err := net.InterfaceByIndex(peerIdx)
if err != nil {
return fmt.Errorf("failed to get host interface for index %d: %w", peerIdx, err)
}

hostInterfaceName := peerIntf.Name
link, err := netlink.LinkByName(hostInterfaceName)
if err != nil {
return fmt.Errorf("failed to find host interface %s: %v", hostInterfaceName, err)
}
err = netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction)
if err != nil {
return fmt.Errorf("failed to set MTU for host interface %s: %v", hostInterfaceName, err)
}
return nil
}

func (ic *ifConfigurator) removeContainerLink(containerID, hostInterfaceName string) error {
klog.V(2).Infof("Deleting veth devices for container %s", containerID)
// Don't return an error if the device is already removed as CniDel can be called multiple times.
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/cniserver/interface_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func (ic *ifConfigurator) configureContainerLink(
return nil
}

// configureContainerMTU is only used for Antrea Multi-cluster with networkPolicyOnly
// mode, and this mode doesn't support Windows platform yet.
func (ic *ifConfigurator) configureContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error {
return nil
}

// createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result.
func (ic *ifConfigurator) createContainerLink(endpointName string, result *current.Result, containerID, podName, podNamespace string) (hostLink *hcsshim.HNSEndpoint, err error) {
containerIP, err := findContainerIPConfig(result.IPs)
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in
return interfaceConfig
}

func (pc *podConfigurator) configureInterfacesMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error {
if err := pc.ifConfigurator.configureContainerMTU(containerNetNS, containerIFDev, mtuDeduction); err != nil {
return err
}
return nil
}

func (pc *podConfigurator) configureInterfaces(
podName string,
podNameSpace string,
Expand Down
16 changes: 15 additions & 1 deletion pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type CNIServer struct {
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
secondaryNetworkEnabled bool
mtuDeduction int
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
}
Expand Down Expand Up @@ -618,7 +619,7 @@ func New(
nodeConfig *config.NodeConfig,
kubeClient clientset.Interface,
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, mtuDeduction int,
networkReadyCh <-chan struct{},
) *CNIServer {
return &CNIServer{
Expand All @@ -634,6 +635,7 @@ func New(
enableBridgingMode: enableBridgingMode,
disableTXChecksumOffload: disableTXChecksumOffload,
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
mtuDeduction: mtuDeduction,
networkReadyCh: networkReadyCh,
}
}
Expand Down Expand Up @@ -710,6 +712,18 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e
return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to connect container %s to ovs: %w", cniConfig.ContainerId, err)
}

// Packets for multi-cluster traffic will always be encapsulated and go through
// tunnel interface. So here we need to deduce mtu for different tunnel types.
if s.mtuDeduction != 0 {
if err := s.podConfigurator.configureInterfacesMTU(
s.hostNetNsPath(cniConfig.Netns),
cniConfig.Ifname,
s.mtuDeduction,
); err != nil {
return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to configure container %s's MTU: %w", cniConfig.ContainerId, err)
}
}

// we return prevResult, which should be exactly what we received from
// the runtime, potentially converted to the current CNI version used by
// Antrea.
Expand Down
Loading

0 comments on commit b4a2186

Please sign in to comment.