Skip to content

Commit

Permalink
Optimize NodePort performance by reducing request packets CT actions
Browse files Browse the repository at this point in the history
For a NodePort connection sourced from external network or local Node,
destination IP will be DNATed with a virtual IP, then the connection
will be forwarded to OVS via Antrea gateway. However, in UnSNATTable,
a flow is installed to unSNAT replied packets of SNATed connections by
matching the virtual IP as destination IP. The flow is like the following:

```
table=UnSNAT, priority=200,ip,nw_dst=169.254.0.253 actions=ct(table=ConntrackZone,zone=65521,nat)
```

Note that, the request packets of a DNATed NodePort connection are also
matched by the flow above, but it is unnecessary. To optimize the
performance of NodePort, another virtual IP is used to DNAT NodePort
connections.

TCP_RR and TCP_CRR improvement is as fowllows:

```
Test      old TPS     new TPS    delta
TCP_CRR   3510.28     3847.76    +%9.61
TCP_RR    9574.29     10457.6    +%9.23
```

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Jun 8, 2022
1 parent 5115ee2 commit 17a100a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 50 deletions.
20 changes: 12 additions & 8 deletions pkg/agent/config/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,22 @@ const (
)

var (
// VirtualServiceIPv4 / VirtualServiceIPv6 are used in the following situations:
// - Use the virtual IP to perform SNAT for packets of Service from Antrea gateway and the Endpoint is not on
// local Pod CIDR or any remote Pod CIDRs. It is used in OVS flow of table serviceConntrackCommitTable.
// - Use the virtual IP to perform DNAT for packets of NodePort on host. It is used in iptables rules on host.
// - Use the virtual IP as onlink routing entry gateway in host routing entry.
// - Use the virtual IP as destination IP in host routing entry. It is used to forward DNATed NodePort packets
// or replied SNATed Service packets back to Antrea gateway.
// - Use the virtual IP for InternalIPAddress parameter of Add-NetNatStaticMapping.
// VirtualServiceIPv4 or VirtualServiceIPv6 is used in the following scenarios:
// - The IP is used to perform SNAT for packets of Service sourced from Antrea gateway and destined for external
// network via Antrea gateway.
// - The IP is used as destination IP in host routing entry to forward replied SNATed Service packets back to Antrea
// gateway.
// - The IP is used for InternalIPAddress parameter of Add-NetNatStaticMapping.
// The IP cannot be one used in the network, and cannot be within the 169.254.1.0 - 169.254.254.255 range
// according to https://datatracker.ietf.org/doc/html/rfc3927#section-2.1
VirtualServiceIPv4 = net.ParseIP("169.254.0.253")
VirtualServiceIPv6 = net.ParseIP("fc01::aabb:ccdd:eeff")

// VirtualNodePortDNATIPv4 or VirtualNodePortDNATIPv6 is used in the following scenarios:
// - The IP is used to perform DNAT on host for packets of NodePort sourced from local Node or external network.
// - The IP is used as destination IP in host routing entry to forward DNATed NodePort packets to Antrea gateway
VirtualNodePortDNATIPv4 = net.ParseIP("169.254.0.252")
VirtualNodePortDNATIPv6 = net.ParseIP("fc01::aabb:ccdd:eefe")
)

type GatewayConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2211,13 +2211,13 @@ func (f *featureService) nodePortMarkFlows() []binding.Flow {
Action().LoadRegMark(ToNodePortAddressRegMark).
Done())
}
// This generates the flow for the virtual IP. The flow is used to mark the first packet of NodePort connection from
// the Antrea gateway (the connection is performed DNAT with the virtual IP in host netns).
// This generates the flow for the virtual NodePort DNAT IP. The flow is used to mark the first packet of NodePort
// connection sourced from the Antrea gateway (the connection is performed DNAT with the virtual IP in host netns).
flows = append(flows,
NodePortMarkTable.ofTable.BuildFlow(priorityNormal).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchDstIP(f.virtualIPs[ipProtocol]).
MatchDstIP(f.virtualNodePortDNATIPs[ipProtocol]).
Action().LoadRegMark(ToNodePortAddressRegMark).
Done())
}
Expand Down
57 changes: 31 additions & 26 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ type featureService struct {
cachedFlows *flowCategoryCache
groupCache sync.Map

gatewayIPs map[binding.Protocol]net.IP
virtualIPs map[binding.Protocol]net.IP
dnatCtZones map[binding.Protocol]int
snatCtZones map[binding.Protocol]int
gatewayMAC net.HardwareAddr
nodePortAddresses map[binding.Protocol][]net.IP
serviceCIDRs map[binding.Protocol]net.IPNet
networkConfig *config.NetworkConfig
gatewayIPs map[binding.Protocol]net.IP
virtualIPs map[binding.Protocol]net.IP
virtualNodePortDNATIPs map[binding.Protocol]net.IP
dnatCtZones map[binding.Protocol]int
snatCtZones map[binding.Protocol]int
gatewayMAC net.HardwareAddr
nodePortAddresses map[binding.Protocol][]net.IP
serviceCIDRs map[binding.Protocol]net.IPNet
networkConfig *config.NetworkConfig

enableProxy bool
proxyAll bool
Expand All @@ -66,6 +67,7 @@ func newFeatureService(
connectUplinkToBridge bool) *featureService {
gatewayIPs := make(map[binding.Protocol]net.IP)
virtualIPs := make(map[binding.Protocol]net.IP)
virtualNodePortDNATIPs := make(map[binding.Protocol]net.IP)
dnatCtZones := make(map[binding.Protocol]int)
snatCtZones := make(map[binding.Protocol]int)
nodePortAddresses := make(map[binding.Protocol][]net.IP)
Expand All @@ -74,6 +76,7 @@ func newFeatureService(
if ipProtocol == binding.ProtocolIP {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv4
virtualIPs[ipProtocol] = config.VirtualServiceIPv4
virtualNodePortDNATIPs[ipProtocol] = config.VirtualNodePortDNATIPv4
dnatCtZones[ipProtocol] = CtZone
snatCtZones[ipProtocol] = SNATCtZone
nodePortAddresses[ipProtocol] = serviceConfig.NodePortAddressesIPv4
Expand All @@ -83,6 +86,7 @@ func newFeatureService(
} else if ipProtocol == binding.ProtocolIPv6 {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv6
virtualIPs[ipProtocol] = config.VirtualServiceIPv6
virtualNodePortDNATIPs[ipProtocol] = config.VirtualNodePortDNATIPv6
dnatCtZones[ipProtocol] = CtZoneV6
snatCtZones[ipProtocol] = SNATCtZoneV6
nodePortAddresses[ipProtocol] = serviceConfig.NodePortAddressesIPv6
Expand All @@ -93,24 +97,25 @@ func newFeatureService(
}

return &featureService{
cookieAllocator: cookieAllocator,
ipProtocols: ipProtocols,
bridge: bridge,
cachedFlows: newFlowCategoryCache(),
groupCache: sync.Map{},
gatewayIPs: gatewayIPs,
virtualIPs: virtualIPs,
dnatCtZones: dnatCtZones,
snatCtZones: snatCtZones,
nodePortAddresses: nodePortAddresses,
serviceCIDRs: serviceCIDRs,
gatewayMAC: nodeConfig.GatewayConfig.MAC,
networkConfig: networkConfig,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
ctZoneSrcField: getZoneSrcField(connectUplinkToBridge),
category: cookie.Service,
cookieAllocator: cookieAllocator,
ipProtocols: ipProtocols,
bridge: bridge,
cachedFlows: newFlowCategoryCache(),
groupCache: sync.Map{},
gatewayIPs: gatewayIPs,
virtualIPs: virtualIPs,
virtualNodePortDNATIPs: virtualNodePortDNATIPs,
dnatCtZones: dnatCtZones,
snatCtZones: snatCtZones,
nodePortAddresses: nodePortAddresses,
serviceCIDRs: serviceCIDRs,
gatewayMAC: nodeConfig.GatewayConfig.MAC,
networkConfig: networkConfig,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
ctZoneSrcField: getZoneSrcField(connectUplinkToBridge),
category: cookie.Service,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func smallSliceDifference(s1, s2 []string) []string {
}

func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
svcIP := agentconfig.VirtualServiceIPv4
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualServiceIPv6
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort); err != nil {
return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err)
Expand All @@ -301,9 +301,9 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui
}

func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Protocol) error {
svcIP := agentconfig.VirtualServiceIPv4
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualServiceIPv6
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err)
Expand Down
54 changes: 49 additions & 5 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,13 @@ func (c *Client) syncIPTables() error {
})
// Use iptables-restore to configure IPv4 settings.
if c.networkConfig.IPv4Enabled {
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR, antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, config.VirtualServiceIPv4, snatMarkToIPv4)
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR,
antreaPodIPSet,
localAntreaFlexibleIPAMPodIPSet,
antreaNodePortIPSet,
config.VirtualNodePortDNATIPv4,
config.VirtualServiceIPv4,
snatMarkToIPv4)
// Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables.
if err := c.ipt.Restore(iptablesData.Bytes(), false, false); err != nil {
return err
Expand All @@ -451,7 +457,13 @@ func (c *Client) syncIPTables() error {

// Use ip6tables-restore to configure IPv6 settings.
if c.networkConfig.IPv6Enabled {
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR, antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, config.VirtualServiceIPv6, snatMarkToIPv6)
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR,
antreaPodIP6Set,
localAntreaFlexibleIPAMPodIP6Set,
antreaNodePortIP6Set,
config.VirtualNodePortDNATIPv4,
config.VirtualServiceIPv6,
snatMarkToIPv6)
// Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables.
if err := c.ipt.Restore(iptablesData.Bytes(), false, true); err != nil {
return err
Expand All @@ -460,7 +472,13 @@ func (c *Client) syncIPTables() error {
return nil
}

func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, nodePortIPSet string, serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP) *bytes.Buffer {
func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
podIPSet,
localAntreaFlexibleIPAMPodIPSet,
nodePortIPSet string,
nodePortDNATVirtualIP,
serviceVirtualIP net.IP,
snatMarkToIP map[uint32]net.IP) *bytes.Buffer {
// Create required rules in the antrea chains.
// Use iptables-restore as it flushes the involved chains and creates the desired rules
// with a single call, instead of string matching to clean up stale rules.
Expand Down Expand Up @@ -573,15 +591,15 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFl
"-m", "comment", "--comment", `"Antrea: DNAT external to NodePort packets"`,
"-m", "set", "--match-set", nodePortIPSet, "dst,dst",
"-j", iptables.DNATTarget,
"--to-destination", serviceVirtualIP.String(),
"--to-destination", nodePortDNATVirtualIP.String(),
}...)
writeLine(iptablesData, iptables.MakeChainLine(antreaOutputChain))
writeLine(iptablesData, []string{
"-A", antreaOutputChain,
"-m", "comment", "--comment", `"Antrea: DNAT local to NodePort packets"`,
"-m", "set", "--match-set", nodePortIPSet, "dst,dst",
"-j", iptables.DNATTarget,
"--to-destination", serviceVirtualIP.String(),
"--to-destination", nodePortDNATVirtualIP.String(),
}...)
}
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
Expand Down Expand Up @@ -696,11 +714,17 @@ func (c *Client) initServiceIPRoutes() error {
if err := c.addVirtualServiceIPRoute(false); err != nil {
return err
}
if err := c.addVirtualDNATIPRoute(false); err != nil {
return err
}
}
if c.networkConfig.IPv6Enabled {
if err := c.addVirtualServiceIPRoute(true); err != nil {
return err
}
if err := c.addVirtualDNATIPRoute(true); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -1254,6 +1278,26 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
return nil
}

func (c *Client) addVirtualDNATIPRoute(isIPv6 bool) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
vIP := config.VirtualNodePortDNATIPv4
gw := config.VirtualServiceIPv4
mask := ipv4AddrLength
if isIPv6 {
vIP = config.VirtualNodePortDNATIPv6
gw = config.VirtualServiceIPv6
mask = ipv6AddrLength
}
route := generateRoute(vIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
if err := netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install routing entry for virtual NodePort DNAT IP %s: %w", vIP.String(), err)
}
klog.V(4).InfoS("Added virtual NodePort DNAT IP route", "route", route)
c.serviceRoutes.Store(vIP.String(), route)

return nil
}

// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea
// gateway on host.
func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
)

var (
antreaNat = util.AntreaNatName
virtualServiceIPv4Net = util.NewIPNet(config.VirtualServiceIPv4)
PodCIDRIPv4 *net.IPNet
antreaNat = util.AntreaNatName
virtualServiceIPv4Net = util.NewIPNet(config.VirtualServiceIPv4)
virtualNodePortDNATIPv4Net = util.NewIPNet(config.VirtualNodePortDNATIPv4)
PodCIDRIPv4 *net.IPNet
)

type Client struct {
Expand Down Expand Up @@ -245,8 +246,11 @@ func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error {
}
klog.InfoS("Added virtual Service IP neighbor", "neighbor", vNeighbor)

if err := c.addServiceRoute(config.VirtualNodePortDNATIPv4); err != nil {
return err
}
// For NodePort Service, a new NetNat for NetNatStaticMapping is needed.
err := util.NewNetNat(antreaNatNodePort, virtualServiceIPv4Net)
err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net)
if err != nil {
return err
}
Expand Down

0 comments on commit 17a100a

Please sign in to comment.