Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement prepared query upstreams watching for envoy #5224

Merged
merged 11 commits into from
Jan 18, 2019
162 changes: 119 additions & 43 deletions agent/cache/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ package cache
import (
"context"
"fmt"
"reflect"
"time"

"github.com/hashicorp/consul/lib"
)

const (
NotifyDefaultPollingInterval = 30 * time.Second
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
)

// UpdateEvent is a struct summarising an update to a cache entry
Expand Down Expand Up @@ -57,66 +64,135 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request,
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
if !tEntry.Type.SupportsBlocking() {
return fmt.Errorf("watch requires the type to support blocking")
if tEntry.Type.SupportsBlocking() {
go c.notifyBlockingQuery(ctx, t, r, correlationID, ch)
} else {
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
banks marked this conversation as resolved.
Show resolved Hide resolved
}
go c.notifyPollingQuery(info.MaxAge, ctx, t, r, correlationID, ch)
}

// Always start at 0 index to deliver the inital (possibly currently cached
return nil
}

func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)

for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

go func() {
var failures uint
// Blocking request
res, meta, err := c.getWithIndex(t, r, index)

for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Check the index of the value returned in the cache entry to be sure it
// changed
if index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Blocking request
res, meta, err := c.getWithIndex(t, r, index)
// Update index for next request
index = meta.Index
}

// Check context hasn't been cancelled
if ctx.Err() != nil {
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
}
if wait := backOffWait(failures); wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if index < 1 {
index = 1
}
}
}

func (c *Cache) notifyPollingQuery(maxAge time.Duration, ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
index := uint64(0)
failures := uint(0)

var lastValue interface{} = nil

for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Make the request
res, meta, err := c.getWithIndex(t, r, index)

// Check the index of the value returned in the cache entry to be sure it
// changed
if index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Update index for next request
index = meta.Index
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Check for a change in the value and
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
if index < meta.Index || !reflect.DeepEqual(lastValue, res) {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
// Update index and lastResult
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
lastValue = res
index = meta.Index
}

// Handle errors with backoff.
if err == nil {
rboyer marked this conversation as resolved.
Show resolved Hide resolved
failures = 0
} else {
failures += 1
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
}

errWait := backOffWait(failures)
if errWait > 0 {
select {
case <-time.After(errWait):
case <-ctx.Done():
return
}
if wait := backOffWait(failures); wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
} else {
pollWait := 0 * time.Second
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
if meta.Age <= maxAge {
pollWait = maxAge - meta.Age
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
}
// Sanity check we always request blocking on second pass
if index < 1 {
index = 1

pollWait += lib.RandomStagger(maxAge / 16)
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-time.After(pollWait):
case <-ctx.Done():
return
}
}
}()

return nil
}
}
180 changes: 180 additions & 0 deletions agent/cache/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,120 @@ func TestCacheNotify(t *testing.T) {
// important things to get working.
}

func TestCacheNotifyPolling(t *testing.T) {
t.Parallel()

typ := TestTypeNonBlocking(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})

// Configure the type
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
// Assert the right request type - all real Fetch implementations do this so
// it keeps us honest that Watch doesn't require type mangling which will
// break in real life (hint: it did on the first attempt)
_, ok := args.Get(1).(*MockRequest)
require.True(t, ok)
})
typ.Static(FetchResult{Value: 12, Index: 1}, nil).Once()
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()

require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan UpdateEvent)

err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
require.NoError(err)

// Should receive the first result pretty soon
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 1,
Meta: ResultMeta{Hit: false, Index: 1},
Err: nil,
})

// There should be no more updates delivered yet
require.Len(ch, 0)

// make sure the updates do not come too quickly
select {
case <-time.After(50 * time.Millisecond):
case <-ch:
require.Fail("Received update too early")
}

// make sure we get the update not too far out.
select {
case <-time.After(100 * time.Millisecond):
require.Fail("Didn't receive the notification")
case result := <-ch:
require.Equal(result.Result, 12)
require.Equal(result.CorrelationID, "test")
require.Equal(result.Meta.Hit, false)
require.Equal(result.Meta.Index, uint64(1))
// pretty conservative check it should be even newer because without a second
// notifier each value returned will have been executed just then and not served
// from the cache.
require.True(result.Meta.Age < 50*time.Millisecond)
require.NoError(result.Err)
}

require.Len(ch, 0)

// Registere a second observer using same chan and request. Note that this is
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
// testing a few things implicitly:
// - that multiple watchers on the same cache entity are de-duped in their
// requests to the "backend"
// - that multiple watchers can distinguish their results using correlationID
err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test2", ch)
require.NoError(err)

// Should get test2 notify immediately, and it should be a cache hit
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test2",
Result: 12,
Meta: ResultMeta{Hit: true, Index: 1},
Err: nil,
})

require.Len(ch, 0)

// wait for the next batch of responses
events := make([]UpdateEvent, 0)
// 110 is needed to allow for the jitter
timeout := time.After(110 * time.Millisecond)

for i := 0; i < 2; i++ {
select {
case <-timeout:
require.Fail("UpdateEvent not received in time")
case eve := <-ch:
events = append(events, eve)
}
}

require.Equal(events[0].Result, 42)
require.Equal(events[0].Meta.Hit, false)
require.Equal(events[0].Meta.Index, uint64(1))
require.True(events[0].Meta.Age < 50*time.Millisecond)
require.NoError(events[0].Err)
require.Equal(events[1].Result, 42)
// Sometimes this would be a hit and others not. It all depends on when the various getWithIndex calls got fired.
// If both are done concurrently then it will not be a cache hit but the request gets single flighted and both
// get notified at the same time.
// require.Equal(events[1].Meta.Hit, true)
require.Equal(events[1].Meta.Index, uint64(1))
require.True(events[1].Meta.Age < 100*time.Millisecond)
require.NoError(events[1].Err)
}

// Test that a refresh performs a backoff.
func TestCacheWatch_ErrorBackoff(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -206,3 +320,69 @@ OUT:
actual := atomic.LoadUint32(&retries)
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
}

// Test that a refresh performs a backoff.
func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) {
t.Parallel()

typ := TestTypeNonBlocking(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})

// Configure the type
var retries uint32
fetchErr := fmt.Errorf("test fetch error")
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
atomic.AddUint32(&retries, 1)
})

require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan UpdateEvent)

err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
require.NoError(err)

// Should receive the first result pretty soon
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 1,
Meta: ResultMeta{Hit: false, Index: 4},
Err: nil,
})

numErrors := 0
// Loop for a little while and count how many errors we see reported. If this
// was running as fast as it could go we'd expect this to be huge. We have to
// be a little careful here because the watch chan ch doesn't have a large
// buffer so we could be artificially slowing down the loop without the
// backoff actualy taking affect. We can validate that by ensuring this test
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
// fails without the backoff code reliably.
//
// 100 + 500 milliseconds. 100 because the first retry will not happen until
// the 100 + jitter milliseconds have elapsed.
timeoutC := time.After(600 * time.Millisecond)
OUT:
for {
select {
case <-timeoutC:
break OUT
case u := <-ch:
numErrors++
require.Error(u.Err)
}
}
// Must be fewer than 10 failures in that time
require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors))

// Check the number of RPCs as a sanity check too
actual := atomic.LoadUint32(&retries)
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
}
Loading