Skip to content

Commit

Permalink
Add State storage and LastResult argument into Cache so that cache.Ty…
Browse files Browse the repository at this point in the history
…pes can safely store additional data that is eventually expired.
  • Loading branch information
banks committed Dec 12, 2018
1 parent 43d882c commit 718cb2c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 14 deletions.
8 changes: 8 additions & 0 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,13 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
fOpts.MinIndex = entry.Index
fOpts.Timeout = tEntry.Opts.RefreshTimeout
}
if entry.Valid {
fOpts.LastResult = &FetchResult{
Value: entry.Value,
State: entry.State,
Index: entry.Index,
}
}

// Start building the new entry by blocking on the fetch.
result, err := tEntry.Type.Fetch(fOpts, r)
Expand All @@ -457,6 +464,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
if result.Value != nil {
// A new value was given, so we create a brand new entry.
newEntry.Value = result.Value
newEntry.State = result.State
newEntry.Index = result.Index
newEntry.FetchedAt = time.Now()
if newEntry.Index < 1 {
Expand Down
35 changes: 33 additions & 2 deletions agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,17 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
c := TestCache(t)
c.RegisterType("t", typ, nil)

stateCh := make(chan int, 1)

// Configure the type
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1)
typ.Static(FetchResult{Value: nil}, nil)
typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1)
// Return different State, it should be ignored
typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) {
// We should get back the original state
opts := args.Get(0).(FetchOptions)
require.NotNil(opts.LastResult)
stateCh <- opts.LastResult.State.(int)
})

// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
Expand All @@ -337,6 +345,29 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
require.Equal(42, result)
require.False(meta.Hit)

// State delivered to second call should be the result from first call.
select {
case state := <-stateCh:
require.Equal(31, state)
case <-time.After(20 * time.Millisecond):
t.Fatal("timed out")
}

// Next request should get the first returned state too since last Fetch
// returned nil result.
req = TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
result, meta, err = c.Get("t", req)
require.NoError(err)
require.Equal(42, result)
require.False(meta.Hit)
select {
case state := <-stateCh:
require.Equal(31, state)
case <-time.After(20 * time.Millisecond):
t.Fatal("timed out")
}

// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
Expand Down
7 changes: 7 additions & 0 deletions agent/cache/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
type cacheEntry struct {
// Fields pertaining to the actual value
Value interface{}
// State can be used to store info needed by the cache type but that should
// not be part of the result the client gets. For example the Connect Leaf
// type needs to store additional data about when it last attempted a renewal
// that is not part of the actual IssuedCert struct it returns. It's opaque to
// the Cache but allows types to store additional data that is coupled to the
// cache entry's lifetime and will be aged out by TTL etc.
State interface{}
Error error
Index uint64

Expand Down
43 changes: 31 additions & 12 deletions agent/cache/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
type Type interface {
// Fetch fetches a single unique item.
//
// The FetchOptions contain the index and timeouts for blocking queries.
// The MinIndex value on the Request itself should NOT be used
// as the blocking index since a request may be reused multiple times
// as part of Refresh behavior.
// The FetchOptions contain the index and timeouts for blocking queries. The
// MinIndex value on the Request itself should NOT be used as the blocking
// index since a request may be reused multiple times as part of Refresh
// behavior.
//
// The return value is a FetchResult which contains information about
// the fetch. If an error is given, the FetchResult is ignored. The
// cache does not support backends that return partial values.
// The return value is a FetchResult which contains information about the
// fetch. If an error is given, the FetchResult is ignored. The cache does not
// support backends that return partial values. Optional State can be added to
// the FetchResult which will be stored with the cache entry and provided to
// the next Fetch call but will not be returned to clients. This allows types
// to add additional bookkeeping data per cache entry that will still be aged
// out along with the entry's TTL.
//
// On timeout, FetchResult can behave one of two ways. First, it can
// return the last known value. This is the default behavior of blocking
// RPC calls in Consul so this allows cache types to be implemented with
// no extra logic. Second, FetchResult can return an unset value and index.
// In this case, the cache will reuse the last value automatically.
// On timeout, FetchResult can behave one of two ways. First, it can return
// the last known value. This is the default behavior of blocking RPC calls in
// Consul so this allows cache types to be implemented with no extra logic.
// Second, FetchResult can return an unset value and index. In this case, the
// cache will reuse the last value automatically. If a nil Value is returned,
// the State field will also be ignored currently.
Fetch(FetchOptions, Request) (FetchResult, error)

// SupportsBlocking should return true if the type supports blocking queries.
Expand All @@ -41,6 +46,14 @@ type FetchOptions struct {
// Timeout is the maximum time for the query. This must be implemented
// in the Fetch itself.
Timeout time.Duration

// LastResult is the result from the last successful Fetch and represents the
// value currently stored in the cache at the time Fetch is invoked. It will
// be nil on first call where there is no current cache value. There may have
// been other Fetch attempts that resulted in an error in the mean time. These
// are not explicitly represented currently. We could add that if needed this
// was just simpler for now.
LastResult *FetchResult
}

// FetchResult is the result of a Type Fetch operation and contains the
Expand All @@ -49,6 +62,12 @@ type FetchResult struct {
// Value is the result of the fetch.
Value interface{}

// State is opaque data stored in the cache but not returned to clients. It
// can be used by Types to maintain any bookkeeping they need between fetches
// (using FetchOptions.LastResult) in a way that gets automatically cleaned up
// by TTL expiry etc.
State interface{}

// Index is the corresponding index value for this data.
Index uint64
}

0 comments on commit 718cb2c

Please sign in to comment.