Skip to content

Commit

Permalink
flowcontrol.Backoff add init Duration and retry nb
Browse files Browse the repository at this point in the history
For the "Backoff policy and failed pod limit" implementation
(kubernetes/community#583), we need to retrieve
the number of backoff retry and also be able to set a different
initial backoff duration for each entry.

This Pull request adds two new methods to ```Backoff```:
- ```NextWithInitDuration```: as ```Next``` the purpose of this method
is to increment the backoff delay exponentially. In addition, you need to
provide the initial delay duration if the entry is not already present.

- ```GetWithRetryNumber```: returns the backoff delay associated to an
entry and also the retry number corresponding to the number of time that
the ```Next|GetWithRetryNumber``` has been call.
  • Loading branch information
clamoriniere1A committed Aug 30, 2017
1 parent e3210c6 commit 06cf54f
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 4 deletions.
40 changes: 36 additions & 4 deletions staging/src/k8s.io/client-go/util/flowcontrol/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type backoffEntry struct {
backoff time.Duration
lastUpdate time.Time
retry int32
}

type Backoff struct {
Expand Down Expand Up @@ -57,25 +58,39 @@ func NewBackOff(initial, max time.Duration) *Backoff {

// Get the current backoff Duration
func (p *Backoff) Get(id string) time.Duration {
delay, _ := p.GetWithRetryNumber(id)
return delay
}

// GetWithRetryNumber the current backoff Duration and the number of previous retry
func (p *Backoff) GetWithRetryNumber(id string) (time.Duration, int32) {
p.Lock()
defer p.Unlock()
var delay time.Duration
var nbRetry int32
entry, ok := p.perItemBackoff[id]
if ok {
delay = entry.backoff
nbRetry = entry.retry
}
return delay
return delay, nbRetry
}

// move backoff to the next mark, capping at maxDuration
func (p *Backoff) Next(id string, eventTime time.Time) {
p.NextWithInitDuration(id, p.defaultDuration, eventTime)
}

// NextWithInitDuration move backoff to the next mark and increment the number of retry, capping at maxDuration
func (p *Backoff) NextWithInitDuration(id string, initial time.Duration, eventTime time.Time) {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
entry = p.initEntryUnsafe(id)
entry = p.initEntryUnsafe(id, initial)
} else {
delay := entry.backoff * 2 // exponential
entry.retry++
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
}
entry.lastUpdate = p.Clock.Now()
Expand Down Expand Up @@ -136,9 +151,26 @@ func (p *Backoff) DeleteEntry(id string) {
delete(p.perItemBackoff, id)
}

// StartBackoffGC used to start the Backoff garbage collection mechanism
// Backoff.GC() will be executed each minute
func StartBackoffGC(backoff *Backoff, stopCh <-chan struct{}) {
go func() {
for {
select {
case <-time.After(time.Minute):
backoff.GC()
case <-stopCh:
return
}
}
}()
}

// Take a lock on *Backoff, before calling initEntryUnsafe
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
entry := &backoffEntry{backoff: p.defaultDuration}
func (p *Backoff) initEntryUnsafe(id string, backoff time.Duration) *backoffEntry {
entry := &backoffEntry{
backoff: backoff,
}
p.perItemBackoff[id] = entry
return entry
}
Expand Down
75 changes: 75 additions & 0 deletions staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,78 @@ func TestIsInBackOffSinceUpdate(t *testing.T) {
}
}
}

func TestSlowBackoffWithNbRetry(t *testing.T) {
id := "_idSlow"
tc := clock.NewFakeClock(time.Now())
step := time.Second
maxDuration := 50 * step

b := NewFakeBackOff(step, maxDuration, tc)
cases := []struct {
//expected
expectedRetry int32
expectedDelay time.Duration
}{
{
0,
time.Duration(0) * time.Second,
},
{
0,
time.Duration(1) * time.Second,
},
{
1,
time.Duration(2) * time.Second,
},
{
2,
time.Duration(4) * time.Second,
},
{
3,
time.Duration(8) * time.Second,
},
{
4,
time.Duration(16) * time.Second,
},
{
5,
time.Duration(32) * time.Second,
},
{
6,
time.Duration(50) * time.Second,
},
{
7,
time.Duration(50) * time.Second,
},
{
8,
time.Duration(50) * time.Second,
},
}
for ix, c := range cases {
tc.Step(step)
w, retry := b.GetWithRetryNumber(id)
if retry != c.expectedRetry {
t.Errorf("input: '%d': retry expected %d, got %d", ix, ix, retry)
}
if w != c.expectedDelay {
t.Errorf("input: '%d': expected %s, got %s", ix, c.expectedDelay, w)
}
b.NextWithInitDuration(id, step, tc.Now())
}

//Now confirm that the Reset cancels backoff.
b.NextWithInitDuration(id, step, tc.Now())
b.Reset(id)
backoff, counter := b.GetWithRetryNumber(id)
if backoff != 0 || counter != 0 {
t.Errorf("Reset didn't clear the backoff.")
}

}

0 comments on commit 06cf54f

Please sign in to comment.