Skip to content

Commit

Permalink
Implement prepared query upstreams watching for envoy (#5224)
Browse files Browse the repository at this point in the history
Fixes #4969 

This implements non-blocking request polling at the cache layer which is currently only used for prepared queries. Additionally this enables the proxycfg manager to poll prepared queries for use in envoy proxy upstreams.
  • Loading branch information
mkeeler authored Jan 18, 2019
1 parent 8c87238 commit 7e6b3e6
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 61 deletions.
3 changes: 2 additions & 1 deletion agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
)

//go:generate mockery -all -inpkg
Expand Down Expand Up @@ -616,7 +617,7 @@ func backOffWait(failures uint) time.Duration {
if waitTime > CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait
}
return waitTime
return waitTime + lib.RandomStagger(waitTime)
}
return 0
}
Expand Down
198 changes: 155 additions & 43 deletions agent/cache/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package cache
import (
"context"
"fmt"
"reflect"
"time"

"github.com/hashicorp/consul/lib"
)

// UpdateEvent is a struct summarising an update to a cache entry
Expand Down Expand Up @@ -57,66 +60,175 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request,
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
if !tEntry.Type.SupportsBlocking() {
return fmt.Errorf("watch requires the type to support blocking")
if tEntry.Type.SupportsBlocking() {
go c.notifyBlockingQuery(ctx, t, r, correlationID, ch)
} else {
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge)
}

// Always start at 0 index to deliver the inital (possibly currently cached
return nil
}

func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)

go func() {
var failures uint
for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Blocking request
res, meta, err := c.getWithIndex(t, r, index)

for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Check the index of the value returned in the cache entry to be sure it
// changed
if index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Blocking request
res, meta, err := c.getWithIndex(t, r, index)
// Update index for next request
index = meta.Index
}

// Check context hasn't been cancelled
if ctx.Err() != nil {
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
wait := 0 * time.Second
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(failures)
}

if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if index < 1 {
index = 1
}
}
}

func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
index := uint64(0)
failures := uint(0)

var lastValue interface{} = nil

// Check the index of the value returned in the cache entry to be sure it
// changed
if index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Update index for next request
index = meta.Index
for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Make the request
res, meta, err := c.getWithIndex(t, r, index)

// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}

// Check for a change in the value or an index change
if index < meta.Index || !reflect.DeepEqual(lastValue, res) {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}

// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
// Update index and lastValue
lastValue = res
index = meta.Index
}

// Reset or increment failure counter
if err == nil {
failures = 0
} else {
failures++
}

// Determining how long to wait before the next poll is complicated.
// First off the happy path and the error path waits are handled distinctly
//
// Once fetching the data through the cache returns an error (and until a
// non-error value is returned) the wait time between each round of the loop
// gets controlled by the backOffWait function. Because we would have waited
// at least until the age of the cached data was too old the error path should
// immediately retry the fetch and backoff on the time as needed for persistent
// failures which potentially will wait much longer than the MaxAge of the request
//
// When on the happy path we just need to fetch from the cache often enough to ensure
// that the data is not older than the MaxAge. Therefore after fetching the data from
// the cache we can sleep until the age of that data would exceed the MaxAge. Sometimes
// this will be for the MaxAge duration (like when only a single notify was executed so
// only 1 go routine is keeping the cache updated). Other times this will be some smaller
// duration than MaxAge (when multiple notify calls were executed and this go routine just
// got data back from the cache that was a cache hit after the other go routine fetched it
// without a hit). We cannot just set MustRevalidate on the request and always sleep for MaxAge
// as this would eliminate the single-flighting of these requests in the cache and
// the efficiencies gained by it.
if failures > 0 {

errWait := backOffWait(failures)
select {
case <-time.After(errWait):
case <-ctx.Done():
return
}
if wait := backOffWait(failures); wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
} else {
// Default to immediately re-poll. This only will happen if the data
// we just got out of the cache is already too stale
pollWait := 0 * time.Second

// Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll
if meta.Age <= maxAge {
pollWait = maxAge - meta.Age
}
// Sanity check we always request blocking on second pass
if index < 1 {
index = 1

// Add a small amount of random jitter to the polling time. One
// purpose of the jitter is to ensure that the next time
// we fetch from the cache the data will be stale (unless another
// notify go routine has updated it while this one is sleeping).
// Without this it would be possible to wake up, fetch the data
// again where the age of the data is strictly equal to the MaxAge
// and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it.
pollWait += lib.RandomStagger(maxAge / 16)

select {
case <-time.After(pollWait):
case <-ctx.Done():
return
}
}
}()

return nil
}
}
Loading

0 comments on commit 7e6b3e6

Please sign in to comment.