diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 1637ecd88c79..225a50e2b999 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -22,6 +22,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib" ) //go:generate mockery -all -inpkg @@ -616,7 +617,7 @@ func backOffWait(failures uint) time.Duration { if waitTime > CacheRefreshMaxWait { waitTime = CacheRefreshMaxWait } - return waitTime + return waitTime + lib.RandomStagger(waitTime) } return 0 } diff --git a/agent/cache/watch.go b/agent/cache/watch.go index 3591b95695b4..b22499f1c62e 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -3,7 +3,10 @@ package cache import ( "context" "fmt" + "reflect" "time" + + "github.com/hashicorp/consul/lib" ) // UpdateEvent is a struct summarising an update to a cache entry @@ -57,66 +60,175 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request, if !ok { return fmt.Errorf("unknown type in cache: %s", t) } - if !tEntry.Type.SupportsBlocking() { - return fmt.Errorf("watch requires the type to support blocking") + if tEntry.Type.SupportsBlocking() { + go c.notifyBlockingQuery(ctx, t, r, correlationID, ch) + } else { + info := r.CacheInfo() + if info.MaxAge == 0 { + return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") + } + go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge) } - // Always start at 0 index to deliver the inital (possibly currently cached + return nil +} + +func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) { + // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) + failures := uint(0) - go func() { - var failures uint + for { + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Blocking request + res, meta, err := c.getWithIndex(t, r, index) - for { - // Check context hasn't been cancelled - if ctx.Err() != nil { + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Check the index of the value returned in the cache entry to be sure it + // changed + if index < meta.Index { + u := UpdateEvent{correlationID, res, meta, err} + select { + case ch <- u: + case <-ctx.Done(): return } - // Blocking request - res, meta, err := c.getWithIndex(t, r, index) + // Update index for next request + index = meta.Index + } - // Check context hasn't been cancelled - if ctx.Err() != nil { + // Handle errors with backoff. Badly behaved blocking calls that returned + // a zero index are considered as failures since we need to not get stuck + // in a busy loop. + wait := 0 * time.Second + if err == nil && meta.Index > 0 { + failures = 0 + } else { + failures++ + wait = backOffWait(failures) + } + + if wait > 0 { + select { + case <-time.After(wait): + case <-ctx.Done(): return } + } + // Sanity check we always request blocking on second pass + if index < 1 { + index = 1 + } + } +} + +func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { + index := uint64(0) + failures := uint(0) + + var lastValue interface{} = nil - // Check the index of the value returned in the cache entry to be sure it - // changed - if index < meta.Index { - u := UpdateEvent{correlationID, res, meta, err} - select { - case ch <- u: - case <-ctx.Done(): - return - } - - // Update index for next request - index = meta.Index + for { + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Make the request + res, meta, err := c.getWithIndex(t, r, index) + + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Check for a change in the value or an index change + if index < meta.Index || !reflect.DeepEqual(lastValue, res) { + u := UpdateEvent{correlationID, res, meta, err} + select { + case ch <- u: + case <-ctx.Done(): + return } - // Handle errors with backoff. Badly behaved blocking calls that returned - // a zero index are considered as failures since we need to not get stuck - // in a busy loop. - if err == nil && meta.Index > 0 { - failures = 0 - } else { - failures++ + // Update index and lastValue + lastValue = res + index = meta.Index + } + + // Reset or increment failure counter + if err == nil { + failures = 0 + } else { + failures++ + } + + // Determining how long to wait before the next poll is complicated. + // First off the happy path and the error path waits are handled distinctly + // + // Once fetching the data through the cache returns an error (and until a + // non-error value is returned) the wait time between each round of the loop + // gets controlled by the backOffWait function. Because we would have waited + // at least until the age of the cached data was too old the error path should + // immediately retry the fetch and backoff on the time as needed for persistent + // failures which potentially will wait much longer than the MaxAge of the request + // + // When on the happy path we just need to fetch from the cache often enough to ensure + // that the data is not older than the MaxAge. Therefore after fetching the data from + // the cache we can sleep until the age of that data would exceed the MaxAge. Sometimes + // this will be for the MaxAge duration (like when only a single notify was executed so + // only 1 go routine is keeping the cache updated). Other times this will be some smaller + // duration than MaxAge (when multiple notify calls were executed and this go routine just + // got data back from the cache that was a cache hit after the other go routine fetched it + // without a hit). We cannot just set MustRevalidate on the request and always sleep for MaxAge + // as this would eliminate the single-flighting of these requests in the cache and + // the efficiencies gained by it. + if failures > 0 { + + errWait := backOffWait(failures) + select { + case <-time.After(errWait): + case <-ctx.Done(): + return } - if wait := backOffWait(failures); wait > 0 { - select { - case <-time.After(wait): - case <-ctx.Done(): - return - } + } else { + // Default to immediately re-poll. This only will happen if the data + // we just got out of the cache is already too stale + pollWait := 0 * time.Second + + // Calculate when the cached data's Age will get too stale and + // need to be re-queried. When the data's Age already exceeds the + // maxAge the pollWait value is left at 0 to immediately re-poll + if meta.Age <= maxAge { + pollWait = maxAge - meta.Age } - // Sanity check we always request blocking on second pass - if index < 1 { - index = 1 + + // Add a small amount of random jitter to the polling time. One + // purpose of the jitter is to ensure that the next time + // we fetch from the cache the data will be stale (unless another + // notify go routine has updated it while this one is sleeping). + // Without this it would be possible to wake up, fetch the data + // again where the age of the data is strictly equal to the MaxAge + // and then immediately have to re-fetch again. That wouldn't + // be terrible but it would expend a bunch more cpu cycles when + // we can definitely avoid it. + pollWait += lib.RandomStagger(maxAge / 16) + + select { + case <-time.After(pollWait): + case <-ctx.Done(): + return } } - }() - - return nil + } } diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 7bd6de94353e..2d166a7d2cb6 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -80,7 +80,7 @@ func TestCacheNotify(t *testing.T) { Err: nil, }) - // Registere a second observer using same chan and request. Note that this is + // Register a second observer using same chan and request. Note that this is // testing a few things implicitly: // - that multiple watchers on the same cache entity are de-duped in their // requests to the "backend" @@ -144,6 +144,120 @@ func TestCacheNotify(t *testing.T) { // important things to get working. } +func TestCacheNotifyPolling(t *testing.T) { + t.Parallel() + + typ := TestTypeNonBlocking(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: false, + }) + + // Configure the type + typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) { + // Assert the right request type - all real Fetch implementations do this so + // it keeps us honest that Watch doesn't require type mangling which will + // break in real life (hint: it did on the first attempt) + _, ok := args.Get(1).(*MockRequest) + require.True(t, ok) + }) + typ.Static(FetchResult{Value: 12, Index: 1}, nil).Once() + typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once() + + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan UpdateEvent) + + err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch) + require.NoError(err) + + // Should receive the first result pretty soon + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 1}, + Err: nil, + }) + + // There should be no more updates delivered yet + require.Len(ch, 0) + + // make sure the updates do not come too quickly + select { + case <-time.After(50 * time.Millisecond): + case <-ch: + require.Fail("Received update too early") + } + + // make sure we get the update not too far out. + select { + case <-time.After(100 * time.Millisecond): + require.Fail("Didn't receive the notification") + case result := <-ch: + require.Equal(result.Result, 12) + require.Equal(result.CorrelationID, "test") + require.Equal(result.Meta.Hit, false) + require.Equal(result.Meta.Index, uint64(1)) + // pretty conservative check it should be even newer because without a second + // notifier each value returned will have been executed just then and not served + // from the cache. + require.True(result.Meta.Age < 50*time.Millisecond) + require.NoError(result.Err) + } + + require.Len(ch, 0) + + // Register a second observer using same chan and request. Note that this is + // testing a few things implicitly: + // - that multiple watchers on the same cache entity are de-duped in their + // requests to the "backend" + // - that multiple watchers can distinguish their results using correlationID + err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test2", ch) + require.NoError(err) + + // Should get test2 notify immediately, and it should be a cache hit + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test2", + Result: 12, + Meta: ResultMeta{Hit: true, Index: 1}, + Err: nil, + }) + + require.Len(ch, 0) + + // wait for the next batch of responses + events := make([]UpdateEvent, 0) + // 110 is needed to allow for the jitter + timeout := time.After(110 * time.Millisecond) + + for i := 0; i < 2; i++ { + select { + case <-timeout: + require.Fail("UpdateEvent not received in time") + case eve := <-ch: + events = append(events, eve) + } + } + + require.Equal(events[0].Result, 42) + require.Equal(events[0].Meta.Hit, false) + require.Equal(events[0].Meta.Index, uint64(1)) + require.True(events[0].Meta.Age < 50*time.Millisecond) + require.NoError(events[0].Err) + require.Equal(events[1].Result, 42) + // Sometimes this would be a hit and others not. It all depends on when the various getWithIndex calls got fired. + // If both are done concurrently then it will not be a cache hit but the request gets single flighted and both + // get notified at the same time. + // require.Equal(events[1].Meta.Hit, true) + require.Equal(events[1].Meta.Index, uint64(1)) + require.True(events[1].Meta.Age < 100*time.Millisecond) + require.NoError(events[1].Err) +} + // Test that a refresh performs a backoff. func TestCacheWatch_ErrorBackoff(t *testing.T) { t.Parallel() @@ -186,7 +300,7 @@ func TestCacheWatch_ErrorBackoff(t *testing.T) { // was running as fast as it could go we'd expect this to be huge. We have to // be a little careful here because the watch chan ch doesn't have a large // buffer so we could be artificially slowing down the loop without the - // backoff actualy taking affect. We can validate that by ensuring this test + // backoff actually taking effect. We can validate that by ensuring this test // fails without the backoff code reliably. timeoutC := time.After(500 * time.Millisecond) OUT: @@ -206,3 +320,69 @@ OUT: actual := atomic.LoadUint32(&retries) require.True(actual < 10, fmt.Sprintf("actual: %d", actual)) } + +// Test that a refresh performs a backoff. +func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) { + t.Parallel() + + typ := TestTypeNonBlocking(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: false, + }) + + // Configure the type + var retries uint32 + fetchErr := fmt.Errorf("test fetch error") + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) { + atomic.AddUint32(&retries, 1) + }) + + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan UpdateEvent) + + err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch) + require.NoError(err) + + // Should receive the first result pretty soon + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 4}, + Err: nil, + }) + + numErrors := 0 + // Loop for a little while and count how many errors we see reported. If this + // was running as fast as it could go we'd expect this to be huge. We have to + // be a little careful here because the watch chan ch doesn't have a large + // buffer so we could be artificially slowing down the loop without the + // backoff actually taking effect. We can validate that by ensuring this test + // fails without the backoff code reliably. + // + // 100 + 500 milliseconds. 100 because the first retry will not happen until + // the 100 + jitter milliseconds have elapsed. + timeoutC := time.After(600 * time.Millisecond) +OUT: + for { + select { + case <-timeoutC: + break OUT + case u := <-ch: + numErrors++ + require.Error(u.Err) + } + } + // Must be fewer than 10 failures in that time + require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors)) + + // Check the number of RPCs as a sanity check too + actual := atomic.LoadUint32(&retries) + require.True(actual < 10, fmt.Sprintf("actual: %d", actual)) +} diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 956b4f035dec..a1b927569882 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -16,12 +16,13 @@ import ( ) const ( - coalesceTimeout = 200 * time.Millisecond - rootsWatchID = "roots" - leafWatchID = "leaf" - intentionsWatchID = "intentions" - serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":" - preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" + coalesceTimeout = 200 * time.Millisecond + rootsWatchID = "roots" + leafWatchID = "leaf" + intentionsWatchID = "intentions" + serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":" + preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" + defaultPreparedQueryPollInterval = 30 * time.Second ) // state holds all the state needed to maintain the config for a registered @@ -164,15 +165,12 @@ func (s *state) initWatches() error { switch u.DestinationType { case structs.UpstreamDestTypePreparedQuery: - // TODO(banks): prepared queries don't support blocking. We need to come - // up with an alternative to Notify that will poll at a sensible rate. - - // err = c.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{ - // Datacenter: dc, - // QueryOptions: structs.QueryOptions{Token: token}, - // QueryIDOrName: u.DestinationName, - // Connect: true, - // }, u.Identifier(), ch) + err = s.cache.Notify(s.ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{ + Datacenter: dc, + QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval}, + QueryIDOrName: u.DestinationName, + Connect: true, + }, u.Identifier(), s.ch) case structs.UpstreamDestTypeService: fallthrough case "": // Treat unset as the default Service type