Skip to content

Commit

Permalink
Merge pull request #2270 from ctelfer/lbdsr
Browse files Browse the repository at this point in the history
Use direct server return in east-west overlay load balancing
  • Loading branch information
Flavio Crisciani authored Oct 12, 2018
2 parents 8edd197 + 0c76756 commit be20dfe
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 49 deletions.
37 changes: 28 additions & 9 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,9 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
return nil
}

// XXX This should be made driver agnostic. See comment below.
const overlayDSROptionString = "dsr"

// NewNetwork creates a new network of the specified network type. The options
// are network specific and modeled in a generic way.
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
Expand All @@ -723,15 +726,16 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
defaultIpam := defaultIpamForNetworkType(networkType)
// Construct the network object
network := &network{
name: name,
networkType: networkType,
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
ipamType: defaultIpam,
id: id,
created: time.Now(),
ctrlr: c,
persist: true,
drvOnce: &sync.Once{},
name: name,
networkType: networkType,
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
ipamType: defaultIpam,
id: id,
created: time.Now(),
ctrlr: c,
persist: true,
drvOnce: &sync.Once{},
loadBalancerMode: loadBalancerModeDefault,
}

network.processOptions(options...)
Expand Down Expand Up @@ -829,6 +833,21 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
}
}()

// XXX If the driver type is "overlay" check the options for DSR
// being set. If so, set the network's load balancing mode to DSR.
// This should really be done in a network option, but due to
// time pressure to get this in without adding changes to moby,
// swarm and CLI, it is being implemented as a driver-specific
// option. Unfortunately, drivers can't influence the core
// "libnetwork.network" data type. Hence we need this hack code
// to implement in this manner.
if gval, ok := network.generic[netlabel.GenericData]; ok && network.networkType == "overlay" {
optMap := gval.(map[string]string)
if _, ok := optMap[overlayDSROptionString]; ok {
network.loadBalancerMode = loadBalancerModeDSR
}
}

addToStore:
// First store the endpoint count, then the network. To avoid to
// end up with a datastore containing a network and not an epCnt,
Expand Down
20 changes: 20 additions & 0 deletions ipvs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,23 @@ const (
// addresses.
SourceHashing = "sh"
)

const (
// ConnFwdMask is a mask for the fwd methods
ConnFwdMask = 0x0007

// ConnFwdMasq denotes forwarding via masquerading/NAT
ConnFwdMasq = 0x0000

// ConnFwdLocalNode denotes forwarding to a local node
ConnFwdLocalNode = 0x0001

// ConnFwdTunnel denotes forwarding via a tunnel
ConnFwdTunnel = 0x0002

// ConnFwdDirectRoute denotes forwarding via direct routing
ConnFwdDirectRoute = 0x0003

// ConnFwdBypass denotes forwarding while bypassing the cache
ConnFwdBypass = 0x0004
)
81 changes: 47 additions & 34 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,43 +199,50 @@ func (i *IpamInfo) UnmarshalJSON(data []byte) error {
}

type network struct {
ctrlr *controller
name string
networkType string
id string
created time.Time
scope string // network data scope
labels map[string]string
ipamType string
ipamOptions map[string]string
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
ipamV4Info []*IpamInfo
ipamV6Info []*IpamInfo
enableIPv6 bool
postIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
persist bool
stopWatchCh chan struct{}
drvOnce *sync.Once
resolverOnce sync.Once
resolver []Resolver
internal bool
attachable bool
inDelete bool
ingress bool
driverTables []networkDBTable
dynamic bool
configOnly bool
configFrom string
loadBalancerIP net.IP
ctrlr *controller
name string
networkType string
id string
created time.Time
scope string // network data scope
labels map[string]string
ipamType string
ipamOptions map[string]string
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
ipamV4Info []*IpamInfo
ipamV6Info []*IpamInfo
enableIPv6 bool
postIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
persist bool
stopWatchCh chan struct{}
drvOnce *sync.Once
resolverOnce sync.Once
resolver []Resolver
internal bool
attachable bool
inDelete bool
ingress bool
driverTables []networkDBTable
dynamic bool
configOnly bool
configFrom string
loadBalancerIP net.IP
loadBalancerMode string
sync.Mutex
}

const (
loadBalancerModeNAT = "NAT"
loadBalancerModeDSR = "DSR"
loadBalancerModeDefault = loadBalancerModeNAT
)

func (n *network) Name() string {
n.Lock()
defer n.Unlock()
Expand Down Expand Up @@ -475,6 +482,7 @@ func (n *network) CopyTo(o datastore.KVObject) error {
dstN.configOnly = n.configOnly
dstN.configFrom = n.configFrom
dstN.loadBalancerIP = n.loadBalancerIP
dstN.loadBalancerMode = n.loadBalancerMode

// copy labels
if dstN.labels == nil {
Expand Down Expand Up @@ -592,6 +600,7 @@ func (n *network) MarshalJSON() ([]byte, error) {
netMap["configOnly"] = n.configOnly
netMap["configFrom"] = n.configFrom
netMap["loadBalancerIP"] = n.loadBalancerIP
netMap["loadBalancerMode"] = n.loadBalancerMode
return json.Marshal(netMap)
}

Expand Down Expand Up @@ -705,6 +714,10 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
if v, ok := netMap["loadBalancerIP"]; ok {
n.loadBalancerIP = net.ParseIP(v.(string))
}
n.loadBalancerMode = loadBalancerModeDefault
if v, ok := netMap["loadBalancerMode"]; ok {
n.loadBalancerMode = v.(string)
}
// Reconcile old networks with the recently added `--ipv6` flag
if !n.enableIPv6 {
n.enableIPv6 = len(n.ipamV6Info) > 0
Expand Down
30 changes: 30 additions & 0 deletions osl/namespace_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,36 @@ func (n *networkNamespace) RemoveAliasIP(ifName string, ip *net.IPNet) error {
return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
}

func (n *networkNamespace) DisableARPForVIP(srcName string) (Err error) {
dstName := ""
for _, i := range n.Interfaces() {
if i.SrcName() == srcName {
dstName = i.DstName()
break
}
}
if dstName == "" {
return fmt.Errorf("failed to find interface %s in sandbox", srcName)
}

err := n.InvokeFunc(func() {
path := filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_ignore")
if err := ioutil.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil {
Err = fmt.Errorf("Failed to set %s to 1: %v", path, err)
return
}
path = filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_announce")
if err := ioutil.WriteFile(path, []byte{'2', '\n'}, 0644); err != nil {
Err = fmt.Errorf("Failed to set %s to 2: %v", path, err)
return
}
})
if err != nil {
return err
}
return
}

func (n *networkNamespace) InvokeFunc(f func()) error {
return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
f()
Expand Down
4 changes: 4 additions & 0 deletions osl/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Sandbox interface {
// RemoveAliasIP removes the passed IP address from the named interface
RemoveAliasIP(ifName string, ip *net.IPNet) error

// DisableARPForVIP disables ARP replies and requests for VIP addresses
// on a particular interface
DisableARPForVIP(ifName string) error

// Add a static route to the sandbox.
AddStaticRoute(*types.StaticRoute) error

Expand Down
22 changes: 22 additions & 0 deletions sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,17 @@ func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {

ep.Lock()
joinInfo := ep.joinInfo
vip := ep.virtualIP
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
ep.Unlock()

if len(vip) > 0 && lbModeIsDSR {
ipNet := &net.IPNet{IP: vip, Mask: net.CIDRMask(32, 32)}
if err := osSbox.RemoveAliasIP(osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
logrus.WithError(err).Debugf("failed to remove virtual ip %v to loopback", ipNet)
}
}

if joinInfo == nil {
return
}
Expand Down Expand Up @@ -831,6 +840,7 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
ep.Lock()
joinInfo := ep.joinInfo
i := ep.iface
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
ep.Unlock()

if ep.needResolver() {
Expand All @@ -854,6 +864,18 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
}

if len(ep.virtualIP) > 0 && lbModeIsDSR {
if sb.loadBalancerNID == "" {
if err := sb.osSbox.DisableARPForVIP(i.srcName); err != nil {
return fmt.Errorf("failed disable ARP for VIP: %v", err)
}
}
ipNet := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
if err := sb.osSbox.AddAliasIP(sb.osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
return fmt.Errorf("failed to add virtual ip %v to loopback: %v", ipNet, err)
}
}
}

if joinInfo != nil {
Expand Down
19 changes: 13 additions & 6 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.loadBalancerMode); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
return
}
Expand All @@ -158,6 +158,9 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
Address: ip,
Weight: 1,
}
if n.loadBalancerMode == loadBalancerModeDSR {
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
}

// Remove the sched name before using the service to add
// destination.
Expand Down Expand Up @@ -203,6 +206,9 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
Address: ip,
Weight: 1,
}
if n.loadBalancerMode == loadBalancerModeDSR {
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
}

if fullRemove {
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
Expand Down Expand Up @@ -231,7 +237,7 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
}
}

if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.loadBalancerMode); err != nil {
logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
}

Expand Down Expand Up @@ -566,7 +572,7 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, lbMode string) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
Expand All @@ -586,7 +592,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), lbMode),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
Expand All @@ -603,7 +609,7 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 7 {
if len(os.Args) < 8 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
Expand Down Expand Up @@ -645,7 +651,8 @@ func fwMarker() {
os.Exit(5)
}

if addDelOpt == "-A" {
lbMode := os.Args[7]
if addDelOpt == "-A" && lbMode == loadBalancerModeNAT {
eIP, subnet, err := net.ParseCIDR(os.Args[6])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
Expand Down

0 comments on commit be20dfe

Please sign in to comment.