Skip to content

Commit

Permalink
Set east-west load balancing to use direct routing
Browse files Browse the repository at this point in the history
Modify the loadbalancing for east-west traffic to use direct routing
rather than NAT and update tasks to use direct service return under
linux.  This avoids hiding the source address of the sender and improves
the performance in single-client/single-server tests.

Signed-off-by: Chris Telfer <ctelfer@docker.com>
  • Loading branch information
ctelfer committed Sep 7, 2018
1 parent a9cd636 commit 056e620
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
19 changes: 19 additions & 0 deletions ipvs/ipvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ type SvcStats struct {
BPSIn uint32
}

const (
CONN_F_FWD_MASK = 0x0007 /* mask for the fwd methods */
CONN_F_MASQ = 0x0000 /* masquerading/NAT */
CONN_F_LOCALNODE = 0x0001 /* local node */
CONN_F_TUNNEL = 0x0002 /* tunneling */
CONN_F_DROUTE = 0x0003 /* direct routing */
CONN_F_BYPASS = 0x0004 /* cache bypass */
CONN_F_SYNC = 0x0020 /* entry created by sync */
CONN_F_HASHED = 0x0040 /* hashed entry */
CONN_F_NOOUTPUT = 0x0080 /* no output packets */
CONN_F_INACTIVE = 0x0100 /* not established */
CONN_F_OUT_SEQ = 0x0200 /* must do output seq adjust */
CONN_F_IN_SEQ = 0x0400 /* must do input seq adjust */
CONN_F_SEQ_MASK = 0x0600 /* in/out sequence mask */
CONN_F_NO_CPORT = 0x0800 /* no client port set yet */
CONN_F_TEMPLATE = 0x1000 /* template, not connection */
CONN_F_ONE_PACKET = 0x2000 /* forward only one packet */
)

// Destination defines an IPVS destination (real server) in its
// entirety.
type Destination struct {
Expand Down
23 changes: 20 additions & 3 deletions sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (sb *sandbox) DisableService() (err error) {
return nil
}

func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint, ingress bool) {
for _, i := range osSbox.Info().Interfaces() {
// Only remove the interfaces owned by this endpoint from the sandbox.
if ep.hasInterface(i.SrcName()) {
Expand All @@ -742,8 +742,16 @@ func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {

ep.Lock()
joinInfo := ep.joinInfo
vip := ep.virtualIP
ep.Unlock()

if len(vip) > 0 && !ingress {
ipNet := &net.IPNet{IP: vip, Mask: net.CIDRMask(32, 32)}
if err := osSbox.RemoveAliasIP(osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
logrus.WithError(err).Debugf("failed to remove virtual ip %v to loopback", ipNet)
}
}

if joinInfo == nil {
return
}
Expand All @@ -760,14 +768,15 @@ func (sb *sandbox) releaseOSSbox() {
sb.Lock()
osSbox := sb.osSbox
sb.osSbox = nil
ingress := sb.ingress
sb.Unlock()

if osSbox == nil {
return
}

for _, ep := range sb.getConnectedEndpoints() {
releaseOSSboxResources(osSbox, ep)
releaseOSSboxResources(osSbox, ep, ingress)
}

osSbox.Destroy()
Expand Down Expand Up @@ -856,6 +865,13 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
}
}

if len(ep.virtualIP) > 0 && !sb.ingress {
ipNet := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
if err := sb.osSbox.AddAliasIP(sb.osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
return fmt.Errorf("failed to add virtual ip %v to loopback: %v", ipNet, err)
}
}

if joinInfo != nil {
// Set up non-interface routes.
for _, r := range joinInfo.StaticRoutes {
Expand Down Expand Up @@ -904,9 +920,10 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
sb.Lock()
osSbox := sb.osSbox
inDelete := sb.inDelete
ingress := sb.ingress
sb.Unlock()
if osSbox != nil {
releaseOSSboxResources(osSbox, ep)
releaseOSSboxResources(osSbox, ep, ingress)
}

sb.Lock()
Expand Down
24 changes: 18 additions & 6 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.ingress); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
return
}
Expand All @@ -159,6 +159,9 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
Address: ip,
Weight: 1,
}
if !n.ingress {
d.ConnectionFlags = ipvs.CONN_F_DROUTE
}

// Remove the sched name before using the service to add
// destination.
Expand Down Expand Up @@ -204,6 +207,9 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
Address: ip,
Weight: 1,
}
if !n.ingress {
d.ConnectionFlags = ipvs.CONN_F_DROUTE
}

if fullRemove {
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
Expand Down Expand Up @@ -233,7 +239,7 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
}
}

if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.ingress); err != nil {
logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
}

Expand Down Expand Up @@ -544,7 +550,7 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, isIngress bool) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
Expand All @@ -562,9 +568,14 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
addDelOpt = "-D"
}

isIngressOpt := "false"
if isIngress {
isIngressOpt = "true"
}

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), isIngressOpt),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
Expand All @@ -581,7 +592,7 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 7 {
if len(os.Args) < 8 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
Expand Down Expand Up @@ -623,7 +634,8 @@ func fwMarker() {
os.Exit(5)
}

if addDelOpt == "-A" {
isIngressOpt := os.Args[7]
if addDelOpt == "-A" && isIngressOpt == "true" {
eIP, subnet, err := net.ParseCIDR(os.Args[6])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
Expand Down

0 comments on commit 056e620

Please sign in to comment.