diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index ae23d185d54d..f7e8535ab93e 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -2304,19 +2304,15 @@ func TestAgent_Token(t *testing.T) { func TestAgentConnectCARoots_empty(t *testing.T) { t.Parallel() - assert := assert.New(t) require := require.New(t) a := NewTestAgent(t.Name(), "connect { enabled = false }") defer a.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil) resp := httptest.NewRecorder() - obj, err := a.srv.AgentConnectCARoots(resp, req) - require.NoError(err) - - value := obj.(structs.IndexedCARoots) - assert.Equal(value.ActiveRootID, "") - assert.Len(value.Roots, 0) + _, err := a.srv.AgentConnectCARoots(resp, req) + require.Error(err) + require.Contains(err.Error(), "Connect must be enabled") } func TestAgentConnectCARoots_list(t *testing.T) { diff --git a/agent/cache-types/connect_ca_leaf.go b/agent/cache-types/connect_ca_leaf.go index 9d3ef3dc0845..87c3ddf7ff4f 100644 --- a/agent/cache-types/connect_ca_leaf.go +++ b/agent/cache-types/connect_ca_leaf.go @@ -108,7 +108,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache return result, errors.New("invalid RootCA response type") } if roots.TrustDomain == "" { - return result, errors.New("cluster has no CA bootstrapped") + return result, errors.New("cluster has no CA bootstrapped yet") } // Build the service ID diff --git a/agent/cache/cache.go b/agent/cache/cache.go index e566797beb1f..76494168f58a 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -190,7 +190,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { } // Get the actual key for our entry - key := c.entryKey(&info) + key := c.entryKey(t, &info) // First time through first := true @@ -278,8 +278,8 @@ RETRY_GET: // entryKey returns the key for the entry in the cache. See the note // about the entry key format in the structure docs for Cache. -func (c *Cache) entryKey(r *RequestInfo) string { - return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key) +func (c *Cache) entryKey(t string, r *RequestInfo) string { + return fmt.Sprintf("%s/%s/%s/%s", t, r.Datacenter, r.Token, r.Key) } // fetch triggers a new background fetch for the given Request. If a diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 3c20c58a3e56..e22694be6d7d 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -708,6 +708,54 @@ func TestCacheGet_expireResetGet(t *testing.T) { typ.AssertExpectations(t) } +// Test a Get with a request that returns the same cache key across +// two different "types" returns two separate results. +func TestCacheGet_duplicateKeyDifferentType(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + typ2 := TestType(t) + defer typ2.AssertExpectations(t) + + c := TestCache(t) + c.RegisterType("t", typ, nil) + c.RegisterType("t2", typ2, nil) + + // Configure the types + typ.Static(FetchResult{Value: 100}, nil) + typ2.Static(FetchResult{Value: 200}, nil) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "foo"}) + result, meta, err := c.Get("t", req) + require.NoError(err) + require.Equal(100, result) + require.False(meta.Hit) + + // Get from t2 with same key, should fetch + req = TestRequest(t, RequestInfo{Key: "foo"}) + result, meta, err = c.Get("t2", req) + require.NoError(err) + require.Equal(200, result) + require.False(meta.Hit) + + // Get from t again with same key, should cache + req = TestRequest(t, RequestInfo{Key: "foo"}) + result, meta, err = c.Get("t", req) + require.NoError(err) + require.Equal(100, result) + require.True(meta.Hit) + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) + typ2.AssertExpectations(t) +} + // Test that Get partitions the caches based on DC so two equivalent requests // to different datacenters are automatically cached even if their keys are // the same. diff --git a/agent/connect_ca_endpoint_test.go b/agent/connect_ca_endpoint_test.go index afaa5f049b95..14bc358055d6 100644 --- a/agent/connect_ca_endpoint_test.go +++ b/agent/connect_ca_endpoint_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/connect" ca "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/structs" @@ -16,18 +18,15 @@ import ( func TestConnectCARoots_empty(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "connect { enabled = false }") defer a.Shutdown() req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil) resp := httptest.NewRecorder() - obj, err := a.srv.ConnectCARoots(resp, req) - assert.Nil(err) - - value := obj.(structs.IndexedCARoots) - assert.Equal(value.ActiveRootID, "") - assert.Len(value.Roots, 0) + _, err := a.srv.ConnectCARoots(resp, req) + require.Error(err) + require.Contains(err.Error(), "Connect must be enabled") } func TestConnectCARoots_list(t *testing.T) { diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 4cdb72ff7c63..9a02d6763dc1 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -217,6 +217,11 @@ func (s *ConnectCA) Roots( return err } + // Exit early if Connect hasn't been enabled. + if !s.srv.config.ConnectEnabled { + return ErrConnectNotEnabled + } + // Load the ClusterID to generate TrustDomain. We do this outside the loop // since by definition this value should be immutable once set for lifetime of // the cluster so we don't need to look it up more than once. We also don't @@ -230,6 +235,7 @@ func (s *ConnectCA) Roots( if err != nil { return err } + // Check CA is actually bootstrapped... if config != nil { // Build TrustDomain based on the ClusterID stored. diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index b983a868c113..3bd26f430523 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -415,7 +415,18 @@ RUN_QUERY: // Block up to the timeout if we didn't see anything fresh. err := fn(ws, state) - if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { + // Note we check queryOpts.MinQueryIndex is greater than zero to determine if + // blocking was requested by client, NOT meta.Index since the state function + // might return zero if something is not initialised and care wasn't taken to + // handle that special case (in practice this happened a lot so fixing it + // systematically here beats trying to remember to add zero checks in every + // state method). We also need to ensure that unless there is an error, we + // return an index > 0 otherwise the client will never block and burn CPU and + // requests. + if err == nil && queryMeta.Index < 1 { + queryMeta.Index = 1 + } + if err == nil && queryOpts.MinQueryIndex > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { if expired := ws.Watch(timeout.C); !expired { // If a restore may have woken us up then bail out from // the query immediately. This is slightly race-ey since diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 7baa3f235952..62ae3ae246e7 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -12,6 +12,8 @@ import ( "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestRPC_NoLeader_Fail(t *testing.T) { @@ -101,7 +103,12 @@ func TestRPC_blockingQuery(t *testing.T) { defer os.RemoveAll(dir) defer s.Shutdown() - // Perform a non-blocking query. + require := require.New(t) + assert := assert.New(t) + + // Perform a non-blocking query. Note that it's significant that the meta has + // a zero index in response - the implied opts.MinQueryIndex is also zero but + // this should not block still. { var opts structs.QueryOptions var meta structs.QueryMeta @@ -146,6 +153,55 @@ func TestRPC_blockingQuery(t *testing.T) { } } + // Perform a blocking query that returns a zero index from blocking func (e.g. + // no state yet). This should still return an empty response immediately, but + // with index of 1 and then block on the next attempt. In one sense zero index + // is not really a valid response from a state method that is not an error but + // in practice a lot of state store operations do return it unless they + // explicitly special checks to turn 0 into 1. Often this is not caught or + // covered by tests but eventually when hit in the wild causes blocking + // clients to busy loop and burn CPU. This test ensure that blockingQuery + // systematically does the right thing to prevent future bugs like that. + { + opts := structs.QueryOptions{ + MinQueryIndex: 0, + } + var meta structs.QueryMeta + var calls int + fn := func(ws memdb.WatchSet, state *state.Store) error { + if opts.MinQueryIndex > 0 { + // If client requested blocking, block forever. This is simulating + // waiting for the watched resource to be initialized/written to giving + // it a non-zero index. Note the timeout on the query options is relied + // on to stop the test taking forever. + fakeCh := make(chan struct{}) + ws.Add(fakeCh) + } + meta.Index = 0 + calls++ + return nil + } + require.NoError(s.blockingQuery(&opts, &meta, fn)) + assert.Equal(1, calls) + assert.Equal(uint64(1), meta.Index, + "expect fake index of 1 to force client to block on next update") + + // Simulate client making next request + opts.MinQueryIndex = 1 + opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long + + // This time we should block even though the func returns index 0 still + t0 := time.Now() + require.NoError(s.blockingQuery(&opts, &meta, fn)) + t1 := time.Now() + assert.Equal(2, calls) + assert.Equal(uint64(1), meta.Index, + "expect fake index of 1 to force client to block on next update") + assert.True(t1.Sub(t0) > 20*time.Millisecond, + "should have actually blocked waiting for timeout") + + } + // Perform a query that blocks and gets interrupted when the state store // is abandoned. { diff --git a/api/agent_test.go b/api/agent_test.go index c5871a091d6a..8bb9547a5ed1 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1050,10 +1050,9 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) { defer s.Stop() agent := c.Agent() - list, meta, err := agent.ConnectCARoots(nil) - require.NoError(err) - require.Equal(uint64(1), meta.LastIndex) - require.Len(list.Roots, 0) + _, _, err := agent.ConnectCARoots(nil) + require.Error(err) + require.Contains(err.Error(), "Connect must be enabled") } func TestAPI_AgentConnectCARoots_list(t *testing.T) { diff --git a/api/connect_ca_test.go b/api/connect_ca_test.go index 77d047e95386..f5dd6b469509 100644 --- a/api/connect_ca_test.go +++ b/api/connect_ca_test.go @@ -22,11 +22,10 @@ func TestAPI_ConnectCARoots_empty(t *testing.T) { defer s.Stop() connect := c.Connect() - list, meta, err := connect.CARoots(nil) - require.NoError(err) - require.Equal(uint64(1), meta.LastIndex) - require.Len(list.Roots, 0) - require.Empty(list.TrustDomain) + _, _, err := connect.CARoots(nil) + + require.Error(err) + require.Contains(err.Error(), "Connect must be enabled") } func TestAPI_ConnectCARoots_list(t *testing.T) {