Skip to content

Commit

Permalink
This change implements best of two load balanciung algorithm.
Browse files Browse the repository at this point in the history
This is the next step of the steps outlined in knative#5692.
This change is a drop in replacement of the current iterative algorithm with the
power of two choices algorithm.

/assign @markusthoemmes mattmoor
  • Loading branch information
vagababov committed Sep 27, 2019
1 parent fa69a37 commit 747b4af
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 28 deletions.
65 changes: 37 additions & 28 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -108,6 +109,34 @@ func newRevisionThrottler(revID types.NamespacedName,
}
}

// pickP2C implements power of two choices algorithm for fast load balancing.
// See here: https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf.
// The function picks a target to send request to, given list of `podIPTracker` objects, a
// callback to release the slot is returned as well.
func pickP2C(tgts []*podIPTracker) (string, func()) {
if len(tgts) == 0 {
return "", nil
}
i1 := rand.Intn(len(tgts))
t1 := tgts[i1]
// Single target, no reason to try to pick another one.
// TODO(vagababov): perhaps carve out |2| case?
if len(tgts) > 1 {
i2 := rand.Intn(len(tgts))
for i1 == i2 {
i2 = rand.Intn(len(tgts))
}
t2 := tgts[i2]
if atomic.LoadInt32(&t1.requests) > atomic.LoadInt32(&t2.requests) {
t1 = t2
}
}
atomic.AddInt32(&t1.requests, 1)
return t1.dest, func() {
atomic.AddInt32(&t1.requests, -1)
}
}

// Returns clusterIP if it is the current valid dest. If neither clusterIP or podIPs are valid
// dests returns error.
func (rt *revisionThrottler) checkClusterIPDest() (string, error) {
Expand All @@ -122,39 +151,19 @@ func (rt *revisionThrottler) checkClusterIPDest() (string, error) {
return rt.clusterIPDest, nil
}

func nonce() {}

// Returns a dest after incrementing its request count and a completion callback
// to be called after request completion. If no dest is found it returns "", nil.
func (rt *revisionThrottler) acquireDest() (string, func()) {
var leastConn *podIPTracker
clusterIPDest := func() string {
rt.mux.Lock()
defer rt.mux.Unlock()
rt.mux.Lock()
defer rt.mux.Unlock()

// This is intended to be called only after performing a read lock check on clusterIPDest
if rt.clusterIPDest != "" {
return rt.clusterIPDest
}

// Find the dest with fewest active connections.
for _, tracker := range rt.podIPTrackers {
if leastConn == nil || atomic.LoadInt32(&leastConn.requests) > tracker.requests {
leastConn = tracker
}
}

return ""
}()
if clusterIPDest != "" {
return clusterIPDest, func() {}
}

if leastConn != nil {
atomic.AddInt32(&leastConn.requests, 1)
return leastConn.dest, func() {
atomic.AddInt32(&leastConn.requests, -1)
}
// This is intended to be called only after performing a read lock check on clusterIPDest
if rt.clusterIPDest != "" {
return rt.clusterIPDest, nonce
}
return "", nil
return pickP2C(rt.podIPTrackers)
}

func (rt *revisionThrottler) try(ctx context.Context, function func(string) error) error {
Expand Down
101 changes: 101 additions & 0 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package activator

import (
"context"
"regexp"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -535,3 +536,103 @@ func TestInferIndex(t *testing.T) {
})
}
}

func TestPickP2C(t *testing.T) {
tests := []struct {
l string
tgts []*podIPTracker
wantRE string
}{{
l: "empty",
tgts: []*podIPTracker{},
wantRE: "",
}, {
l: "single",
tgts: []*podIPTracker{
&podIPTracker{
"good-place",
1,
},
},
wantRE: "good-place",
}, {
l: "two",
tgts: []*podIPTracker{
&podIPTracker{
"bad-place",
11,
},
&podIPTracker{
"good-place",
1,
},
},
wantRE: "good-place",
}, {
l: "three",
tgts: []*podIPTracker{
&podIPTracker{
"bad",
7,
},
&podIPTracker{
"neutral",
5,
},
&podIPTracker{
"good",
1,
},
},
wantRE: "good|neutral",
}}
for _, test := range tests {
t.Run(test.l, func(t *testing.T) {
// Run the same code several times to make sure the selection is stable and callback
// properly removes the load.
for i := 0; i < 5; i++ {
re := regexp.MustCompile(test.wantRE)
got, cb := pickP2C(test.tgts)
if !re.MatchString(got) {
t.Errorf("target = %s, want to match: %s", got, test.wantRE)
}
if cb != nil {
cb()
}
}
})
}
t.Run("multiple", func(t *testing.T) {
tgts := []*podIPTracker{
&podIPTracker{
"bad-place",
3,
},
&podIPTracker{
"good-place",
1,
},
}
cbs := make([]func(), 0, 4)
for i := 0; i < 2; i++ {
got, cb := pickP2C(tgts)
if got != "good-place" {
t.Fatalf("Got = %s, want: good-place", got)
}
cbs = append(cbs, cb)
}
// Now this next request can be `good` or `bad` depending on random # generator
got, cb := pickP2C(tgts)
cbs = append(cbs, cb)
if got == "good-place" {
got, cb = pickP2C(tgts)
cbs = append(cbs, cb)
if got != "bad-place" {
t.Errorf("Got = %s, want: bad-place", got)
}
}
for _, cb := range cbs {
cb()
}
})
}

0 comments on commit 747b4af

Please sign in to comment.