Skip to content

Commit

Permalink
This change implements best of two load balanciung algorithm.
Browse files Browse the repository at this point in the history
This is the next step of the steps outlined in knative#5692.
This change is a drop in replacement of the current iterative algorithm with the
power of two choices algorithm.

/assign @markusthoemmes mattmoor
  • Loading branch information
vagababov committed Sep 27, 2019
1 parent fa69a37 commit 785e7aa
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 28 deletions.
65 changes: 37 additions & 28 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -108,6 +109,34 @@ func newRevisionThrottler(revID types.NamespacedName,
}
}

// pickP2C implements power of two choices algorithm for fast load balancing.
// See here: https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf.
// The function picks a target to send request to, given list of `podIPTracker` objects, a
// callback to release the slot is returned as well.
func pickP2C(tgts []*podIPTracker) (string, func()) {
if len(tgts) == 0 {
return "", nil
}
i1 := rand.Intn(len(tgts))
t1 := tgts[i1]
// Single target, no reason to try to pick another one.
// TODO(vagababov): perhaps carve out |2| case?
if len(tgts) > 1 {
i2 := rand.Intn(len(tgts))
for i1 == i2 {
i2 = rand.Intn(len(tgts))
}
t2 := tgts[i2]
if atomic.LoadInt32(&t1.requests) > atomic.LoadInt32(&t2.requests) {
t1 = t2
}
}
atomic.AddInt32(&t1.requests, 1)
return t1.dest, func() {
atomic.AddInt32(&t1.requests, -1)
}
}

// Returns clusterIP if it is the current valid dest. If neither clusterIP or podIPs are valid
// dests returns error.
func (rt *revisionThrottler) checkClusterIPDest() (string, error) {
Expand All @@ -122,39 +151,19 @@ func (rt *revisionThrottler) checkClusterIPDest() (string, error) {
return rt.clusterIPDest, nil
}

func nonce() {}

// Returns a dest after incrementing its request count and a completion callback
// to be called after request completion. If no dest is found it returns "", nil.
func (rt *revisionThrottler) acquireDest() (string, func()) {
var leastConn *podIPTracker
clusterIPDest := func() string {
rt.mux.Lock()
defer rt.mux.Unlock()
rt.mux.Lock()
defer rt.mux.Unlock()

// This is intended to be called only after performing a read lock check on clusterIPDest
if rt.clusterIPDest != "" {
return rt.clusterIPDest
}

// Find the dest with fewest active connections.
for _, tracker := range rt.podIPTrackers {
if leastConn == nil || atomic.LoadInt32(&leastConn.requests) > tracker.requests {
leastConn = tracker
}
}

return ""
}()
if clusterIPDest != "" {
return clusterIPDest, func() {}
}

if leastConn != nil {
atomic.AddInt32(&leastConn.requests, 1)
return leastConn.dest, func() {
atomic.AddInt32(&leastConn.requests, -1)
}
// This is intended to be called only after performing a read lock check on clusterIPDest
if rt.clusterIPDest != "" {
return rt.clusterIPDest, nonce
}
return "", nil
return pickP2C(rt.podIPTrackers)
}

func (rt *revisionThrottler) try(ctx context.Context, function func(string) error) error {
Expand Down
64 changes: 64 additions & 0 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package activator

import (
"context"
"regexp"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -535,3 +536,66 @@ func TestInferIndex(t *testing.T) {
})
}
}

func TestPickP2C(t *testing.T) {
tests := []struct {
l string
tgts []*podIPTracker
wantRE string
}{{
l: "empty",
tgts: []*podIPTracker{},
wantRE: "",
}, {
l: "single",
tgts: []*podIPTracker{
&podIPTracker{
"good-place",
1,
},
},
wantRE: "good-place",
}, {
l: "two",
tgts: []*podIPTracker{
&podIPTracker{
"bad-place",
11,
},
&podIPTracker{
"good-place",
1,
},
},
wantRE: "good-place",
}, {
l: "three",
tgts: []*podIPTracker{
&podIPTracker{
"bad",
11,
},
&podIPTracker{
"neutral",
5,
},
&podIPTracker{
"good",
1,
},
},
wantRE: "good|neutral",
}}
for _, test := range tests {
t.Run(test.l, func(t *testing.T) {
re := regexp.MustCompile(test.wantRE)
got, cb := pickP2C(test.tgts)
if !re.MatchString(got) {
t.Errorf("target = %s, want to match: %s", got, test.wantRE)
}
if cb != nil {
cb()
}
})
}
}

0 comments on commit 785e7aa

Please sign in to comment.