Skip to content

Commit

Permalink
Merge pull request #10707 from hashicorp/dnephin/streaming-setup-defa…
Browse files Browse the repository at this point in the history
…ult-timeout

streaming: set default query timeout
  • Loading branch information
dnephin authored Jul 28, 2021
2 parents ee4c243 + 5edee9b commit d2b58cd
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .changelog/10707.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:bug
streaming: set the default wait timeout for health queries
```
```release-note:bug
http: log cancelled requests as such at the INFO level, instead of logging them as errored requests.
```
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func New(bd BaseDeps) (*Agent, error) {
Logger: bd.Logger.Named("rpcclient.health"),
},
UseStreamingBackend: a.config.UseStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
}

a.serviceManager = NewServiceManager(&a)
Expand Down
13 changes: 13 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1930,3 +1930,16 @@ func isFloat(t reflect.Type) bool { return t.Kind() == reflect.Float32 || t.Kind
func isComplex(t reflect.Type) bool {
return t.Kind() == reflect.Complex64 || t.Kind() == reflect.Complex128
}

// ApplyDefaultQueryOptions returns a function which will set default values on
// the options based on the configuration. The RuntimeConfig must not be nil.
func ApplyDefaultQueryOptions(config *RuntimeConfig) func(options *structs.QueryOptions) {
return func(options *structs.QueryOptions) {
switch {
case options.MaxQueryTime > config.MaxQueryTime:
options.MaxQueryTime = config.MaxQueryTime
case options.MaxQueryTime == 0:
options.MaxQueryTime = config.DefaultQueryTime
}
}
}
20 changes: 14 additions & 6 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,20 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc
}

handleErr := func(err error) {
httpLogger.Error("Request error",
"method", req.Method,
"url", logURL,
"from", req.RemoteAddr,
"error", err,
)
if req.Context().Err() != nil {
httpLogger.Info("Request cancelled",
"method", req.Method,
"url", logURL,
"from", req.RemoteAddr,
"error", err)
} else {
httpLogger.Error("Request error",
"method", req.Method,
"url", logURL,
"from", req.RemoteAddr,
"error", err)
}

switch {
case isForbidden(err):
resp.WriteHeader(http.StatusForbidden)
Expand Down
3 changes: 3 additions & 0 deletions agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Client struct {
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
QueryOptionDefaults func(options *structs.QueryOptions)
}

type NetRPC interface {
Expand All @@ -38,6 +39,8 @@ func (c *Client) ServiceNodes(
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
c.QueryOptionDefaults(&req.QueryOptions)

result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
Expand Down
28 changes: 28 additions & 0 deletions agent/rpcclient/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package health
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
Expand All @@ -25,6 +27,7 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
}

_, _, err := c.ServiceNodes(context.Background(), tc.req)
Expand Down Expand Up @@ -233,3 +236,28 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
})
}
}

func TestClient_ServiceNodes_SetsDefaults(t *testing.T) {
store := &fakeViewStore{}
c := &Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
}

req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
}

_, _, err := c.ServiceNodes(context.Background(), req)
require.NoError(t, err)

require.Len(t, store.calls, 1)
require.Equal(t, 100*time.Second, store.calls[0].CacheInfo().Timeout)
}

0 comments on commit d2b58cd

Please sign in to comment.