Skip to content

Commit

Permalink
[1.9.x] api: ensure v1/health/ingress/:service endpoint works properl…
Browse files Browse the repository at this point in the history
…y when streaming is enabled

Backport of #9967 to 1.9.x
  • Loading branch information
rboyer committed Apr 5, 2021
1 parent c99e94a commit 1833e66
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .changelog/9967.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled
```
8 changes: 7 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ type Agent struct {

// TODO: pass directly to HTTPHandlers and DNSServer once those are passed
// into Agent, which will allow us to remove this field.
rpcClientHealth *health.Client
rpcClientHealth *health.Client
rpcClientHealthNoStreaming *health.Client

// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
Expand Down Expand Up @@ -378,6 +379,11 @@ func New(bd BaseDeps) (*Agent, error) {
// Temporarily until streaming supports all connect events
CacheNameConnect: cachetype.HealthServicesName,
}
a.rpcClientHealthNoStreaming = &health.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
}

a.serviceManager = NewServiceManager(&a)

Expand Down
9 changes: 7 additions & 2 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,19 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil
}

useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)

if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"}
}

out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
healthClient := s.agent.rpcClientHealth
if args.Ingress {
healthClient = s.agent.rpcClientHealthNoStreaming
}

out, md, err := healthClient.ServiceNodes(req.Context(), args)
if err != nil {
return nil, err
}
Expand Down
69 changes: 54 additions & 15 deletions agent/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,9 +1310,18 @@ func TestHealthConnectServiceNodes(t *testing.T) {
}

func TestHealthIngressServiceNodes(t *testing.T) {
t.Parallel()
t.Run("no streaming", func(t *testing.T) {
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = false } use_streaming_backend = false `)
})
t.Run("cache with streaming", func(t *testing.T) {
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = true } use_streaming_backend = true `)
})
}

a := NewTestAgent(t, "")
func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
t.Helper()

a := NewTestAgent(t, agentHCL)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

Expand Down Expand Up @@ -1349,34 +1358,64 @@ func TestHealthIngressServiceNodes(t *testing.T) {
require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB))
require.True(t, outB)

t.Run("associated service", func(t *testing.T) {
assert := assert.New(t)
checkResults := func(t *testing.T, obj interface{}) {
nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 1)
require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind)
require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address)
require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy)
}

require.True(t, t.Run("associated service", func(t *testing.T) {
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
assert.Nil(err)
require.NoError(t, err)
assertIndex(t, resp)

nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 1)
require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind)
require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address)
require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy)
})
checkResults(t, obj)
}))

t.Run("non-associated service", func(t *testing.T) {
assert := assert.New(t)
require.True(t, t.Run("non-associated service", func(t *testing.T) {
req, _ := http.NewRequest("GET",
"/v1/health/connect/notexist", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
assert.Nil(err)
require.NoError(t, err)
assertIndex(t, resp)

nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 0)
})
}))

require.True(t, t.Run("test caching miss", func(t *testing.T) {
// List instances with cache enabled
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
require.NoError(t, err)

checkResults(t, obj)

// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
}))

require.True(t, t.Run("test caching hit", func(t *testing.T) {
// List instances with cache enabled
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
require.NoError(t, err)

checkResults(t, obj)

// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
}))
}

func TestHealthConnectServiceNodes_Filter(t *testing.T) {
Expand Down

0 comments on commit 1833e66

Please sign in to comment.