Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Agent Caching for Service Discovery Results #4541

Merged
merged 8 commits into from
Sep 6, 2018
35 changes: 32 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3174,8 +3174,12 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// care should be taken to call this exactly once after the cache
// field has been initialized.
func (a *Agent) registerCache() {
// Note that you should register the _agent_ as the RPC implementation and not
// the a.delegate directly, otherwise tests that rely on overriding RPC
// routing via a.registerEndpoint will not work.

a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{
RPC: a.delegate,
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
Expand All @@ -3184,7 +3188,7 @@ func (a *Agent) registerCache() {
})

a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a.delegate,
RPC: a,
Cache: a.cache,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Expand All @@ -3194,13 +3198,38 @@ func (a *Agent) registerCache() {
})

a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{
RPC: a.delegate,
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{
RPC: a,
}, &cache.RegisterOptions{
// Prepared queries don't support blocking
Refresh: false,
})
}

// defaultProxyCommand returns the default Connect managed proxy command.
Expand Down
52 changes: 52 additions & 0 deletions agent/cache-types/catalog_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const CatalogServicesName = "catalog-services"

// CatalogServices supports fetching discovering service instances via the
// catalog.
type CatalogServices struct {
RPC RPC
}

func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedServiceNodes
if err := c.RPC.RPC("Catalog.ServiceNodes", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *CatalogServices) SupportsBlocking() bool {
return true
}
64 changes: 64 additions & 0 deletions agent/cache-types/catalog_services_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestCatalogServices(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &CatalogServices{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceNodes
rpc.On("RPC", "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedServiceNodes)
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}

func TestCatalogServices_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &CatalogServices{RPC: rpc}

// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")

}
4 changes: 4 additions & 0 deletions agent/cache-types/connect_ca_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
ch <- nil
}

func (c *ConnectCALeaf) SupportsBlocking() bool {
return true
}

// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
Expand Down
4 changes: 4 additions & 0 deletions agent/cache-types/connect_ca_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *ConnectCARoot) SupportsBlocking() bool {
return true
}
52 changes: 52 additions & 0 deletions agent/cache-types/health_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const HealthServicesName = "health-services"

// HealthServices supports fetching discovering service instances via the
// catalog.
type HealthServices struct {
RPC RPC
}

func (c *HealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedCheckServiceNodes
if err := c.RPC.RPC("Health.ServiceNodes", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *HealthServices) SupportsBlocking() bool {
return true
}
64 changes: 64 additions & 0 deletions agent/cache-types/health_services_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestHealthServices(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &HealthServices{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCheckServiceNodes
rpc.On("RPC", "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}

func TestHealthServices_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &HealthServices{RPC: rpc}

// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")

}
4 changes: 4 additions & 0 deletions agent/cache-types/intention_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ func (c *IntentionMatch) Fetch(opts cache.FetchOptions, req cache.Request) (cach
result.Index = reply.Index
return result, nil
}

func (c *IntentionMatch) SupportsBlocking() bool {
return true
}
50 changes: 50 additions & 0 deletions agent/cache-types/prepared_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const PreparedQueryName = "prepared-query"

// PreparedQuery supports fetching discovering service instances via prepared
// queries.
type PreparedQuery struct {
RPC RPC
}

func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a PreparedQueryExecuteRequest.
reqReal, ok := req.(*structs.PreparedQueryExecuteRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.PreparedQueryExecuteResponse
if err := c.RPC.RPC("PreparedQuery.Execute", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index

return result, nil
}

func (c *PreparedQuery) SupportsBlocking() bool {
// Prepared queries don't support blocking.
return false
}
Loading