Skip to content

Commit

Permalink
Fixes #4421: General solution to stop blocking queries with index 0 (#…
Browse files Browse the repository at this point in the history
…4437)

* Fix theoretical cache collision bug if/when we use more cache types with same result type

* Generalized fix for blocking query handling when state store methods return zero index

* Refactor test retry to only affect CI

* Undo make file merge

* Add hint to error message returned to end-user requests if Connect is not enabled when they try to request cert

* Explicit error for Roots endpoint if connect is disabled

* Fix tests that were asserting old behaviour
  • Loading branch information
banks authored Jul 25, 2018
1 parent 5635227 commit 8cbeb29
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 29 deletions.
10 changes: 3 additions & 7 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2304,19 +2304,15 @@ func TestAgent_Token(t *testing.T) {
func TestAgentConnectCARoots_empty(t *testing.T) {
t.Parallel()

assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "connect { enabled = false }")
defer a.Shutdown()

req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.AgentConnectCARoots(resp, req)
require.NoError(err)

value := obj.(structs.IndexedCARoots)
assert.Equal(value.ActiveRootID, "")
assert.Len(value.Roots, 0)
_, err := a.srv.AgentConnectCARoots(resp, req)
require.Error(err)
require.Contains(err.Error(), "Connect must be enabled")
}

func TestAgentConnectCARoots_list(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion agent/cache-types/connect_ca_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
return result, errors.New("invalid RootCA response type")
}
if roots.TrustDomain == "" {
return result, errors.New("cluster has no CA bootstrapped")
return result, errors.New("cluster has no CA bootstrapped yet")
}

// Build the service ID
Expand Down
6 changes: 3 additions & 3 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
}

// Get the actual key for our entry
key := c.entryKey(&info)
key := c.entryKey(t, &info)

// First time through
first := true
Expand Down Expand Up @@ -278,8 +278,8 @@ RETRY_GET:

// entryKey returns the key for the entry in the cache. See the note
// about the entry key format in the structure docs for Cache.
func (c *Cache) entryKey(r *RequestInfo) string {
return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key)
func (c *Cache) entryKey(t string, r *RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", t, r.Datacenter, r.Token, r.Key)
}

// fetch triggers a new background fetch for the given Request. If a
Expand Down
48 changes: 48 additions & 0 deletions agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,54 @@ func TestCacheGet_expireResetGet(t *testing.T) {
typ.AssertExpectations(t)
}

// Test a Get with a request that returns the same cache key across
// two different "types" returns two separate results.
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
t.Parallel()

require := require.New(t)

typ := TestType(t)
defer typ.AssertExpectations(t)
typ2 := TestType(t)
defer typ2.AssertExpectations(t)

c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t2", typ2, nil)

// Configure the types
typ.Static(FetchResult{Value: 100}, nil)
typ2.Static(FetchResult{Value: 200}, nil)

// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "foo"})
result, meta, err := c.Get("t", req)
require.NoError(err)
require.Equal(100, result)
require.False(meta.Hit)

// Get from t2 with same key, should fetch
req = TestRequest(t, RequestInfo{Key: "foo"})
result, meta, err = c.Get("t2", req)
require.NoError(err)
require.Equal(200, result)
require.False(meta.Hit)

// Get from t again with same key, should cache
req = TestRequest(t, RequestInfo{Key: "foo"})
result, meta, err = c.Get("t", req)
require.NoError(err)
require.Equal(100, result)
require.True(meta.Hit)

// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
typ2.AssertExpectations(t)
}

// Test that Get partitions the caches based on DC so two equivalent requests
// to different datacenters are automatically cached even if their keys are
// the same.
Expand Down
13 changes: 6 additions & 7 deletions agent/connect_ca_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/connect"
ca "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -16,18 +18,15 @@ import (
func TestConnectCARoots_empty(t *testing.T) {
t.Parallel()

assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "connect { enabled = false }")
defer a.Shutdown()

req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.ConnectCARoots(resp, req)
assert.Nil(err)

value := obj.(structs.IndexedCARoots)
assert.Equal(value.ActiveRootID, "")
assert.Len(value.Roots, 0)
_, err := a.srv.ConnectCARoots(resp, req)
require.Error(err)
require.Contains(err.Error(), "Connect must be enabled")
}

func TestConnectCARoots_list(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/connect_ca_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func (s *ConnectCA) Roots(
return err
}

// Exit early if Connect hasn't been enabled.
if !s.srv.config.ConnectEnabled {
return ErrConnectNotEnabled
}

// Load the ClusterID to generate TrustDomain. We do this outside the loop
// since by definition this value should be immutable once set for lifetime of
// the cluster so we don't need to look it up more than once. We also don't
Expand All @@ -230,6 +235,7 @@ func (s *ConnectCA) Roots(
if err != nil {
return err
}

// Check CA is actually bootstrapped...
if config != nil {
// Build TrustDomain based on the ClusterID stored.
Expand Down
13 changes: 12 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,18 @@ RUN_QUERY:

// Block up to the timeout if we didn't see anything fresh.
err := fn(ws, state)
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
// might return zero if something is not initialised and care wasn't taken to
// handle that special case (in practice this happened a lot so fixing it
// systematically here beats trying to remember to add zero checks in every
// state method). We also need to ensure that unless there is an error, we
// return an index > 0 otherwise the client will never block and burn CPU and
// requests.
if err == nil && queryMeta.Index < 1 {
queryMeta.Index = 1
}
if err == nil && queryOpts.MinQueryIndex > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
// If a restore may have woken us up then bail out from
// the query immediately. This is slightly race-ey since
Expand Down
58 changes: 57 additions & 1 deletion agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRPC_NoLeader_Fail(t *testing.T) {
Expand Down Expand Up @@ -101,7 +103,12 @@ func TestRPC_blockingQuery(t *testing.T) {
defer os.RemoveAll(dir)
defer s.Shutdown()

// Perform a non-blocking query.
require := require.New(t)
assert := assert.New(t)

// Perform a non-blocking query. Note that it's significant that the meta has
// a zero index in response - the implied opts.MinQueryIndex is also zero but
// this should not block still.
{
var opts structs.QueryOptions
var meta structs.QueryMeta
Expand Down Expand Up @@ -146,6 +153,55 @@ func TestRPC_blockingQuery(t *testing.T) {
}
}

// Perform a blocking query that returns a zero index from blocking func (e.g.
// no state yet). This should still return an empty response immediately, but
// with index of 1 and then block on the next attempt. In one sense zero index
// is not really a valid response from a state method that is not an error but
// in practice a lot of state store operations do return it unless they
// explicitly special checks to turn 0 into 1. Often this is not caught or
// covered by tests but eventually when hit in the wild causes blocking
// clients to busy loop and burn CPU. This test ensure that blockingQuery
// systematically does the right thing to prevent future bugs like that.
{
opts := structs.QueryOptions{
MinQueryIndex: 0,
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.Store) error {
if opts.MinQueryIndex > 0 {
// If client requested blocking, block forever. This is simulating
// waiting for the watched resource to be initialized/written to giving
// it a non-zero index. Note the timeout on the query options is relied
// on to stop the test taking forever.
fakeCh := make(chan struct{})
ws.Add(fakeCh)
}
meta.Index = 0
calls++
return nil
}
require.NoError(s.blockingQuery(&opts, &meta, fn))
assert.Equal(1, calls)
assert.Equal(uint64(1), meta.Index,
"expect fake index of 1 to force client to block on next update")

// Simulate client making next request
opts.MinQueryIndex = 1
opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long

// This time we should block even though the func returns index 0 still
t0 := time.Now()
require.NoError(s.blockingQuery(&opts, &meta, fn))
t1 := time.Now()
assert.Equal(2, calls)
assert.Equal(uint64(1), meta.Index,
"expect fake index of 1 to force client to block on next update")
assert.True(t1.Sub(t0) > 20*time.Millisecond,
"should have actually blocked waiting for timeout")

}

// Perform a query that blocks and gets interrupted when the state store
// is abandoned.
{
Expand Down
7 changes: 3 additions & 4 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,9 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) {
defer s.Stop()

agent := c.Agent()
list, meta, err := agent.ConnectCARoots(nil)
require.NoError(err)
require.Equal(uint64(1), meta.LastIndex)
require.Len(list.Roots, 0)
_, _, err := agent.ConnectCARoots(nil)
require.Error(err)
require.Contains(err.Error(), "Connect must be enabled")
}

func TestAPI_AgentConnectCARoots_list(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions api/connect_ca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ func TestAPI_ConnectCARoots_empty(t *testing.T) {
defer s.Stop()

connect := c.Connect()
list, meta, err := connect.CARoots(nil)
require.NoError(err)
require.Equal(uint64(1), meta.LastIndex)
require.Len(list.Roots, 0)
require.Empty(list.TrustDomain)
_, _, err := connect.CARoots(nil)

require.Error(err)
require.Contains(err.Error(), "Connect must be enabled")
}

func TestAPI_ConnectCARoots_list(t *testing.T) {
Expand Down

0 comments on commit 8cbeb29

Please sign in to comment.