diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index cd2fc265223a..248c0c82cc9b 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -7,6 +7,7 @@ import ( "io" "net" "strings" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -519,8 +520,12 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { var timeout *time.Timer + var queriesBlocking uint64 var queryTimeout time.Duration + // Instrument all queries run + metrics.IncrCounter([]string{"rpc", "query"}, 1) + minQueryIndex := queryOpts.GetMinQueryIndex() // Fast path right to the non-blocking query. if minQueryIndex == 0 { @@ -542,10 +547,20 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s timeout = time.NewTimer(queryTimeout) defer timeout.Stop() + // instrument blockingQueries + // atomic inc our server's count of in-flight blockingQueries and store the new value + queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) + // atomic dec when we return from blockingQuery() + defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) + // set the gauge directly to the new value of s.blockingQueries + metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) + RUN_QUERY: + // Setup blocking loop // Update the query metadata. s.setQueryMeta(queryMeta) + // Validate // If the read must be consistent we verify that we are still the leader. if queryOpts.GetRequireConsistent() { if err := s.consistentRead(); err != nil { @@ -553,8 +568,7 @@ RUN_QUERY: } } - // Run the query. - metrics.IncrCounter([]string{"rpc", "query"}, 1) + // Run query // Operate on a consistent set of state. This makes sure that the // abandon channel goes with the state that the caller is using to @@ -571,7 +585,7 @@ RUN_QUERY: ws.Add(state.AbandonCh()) } - // Block up to the timeout if we didn't see anything fresh. + // Execute the queryFn err := fn(ws, state) // Note we check queryOpts.MinQueryIndex is greater than zero to determine if // blocking was requested by client, NOT meta.Index since the state function @@ -584,6 +598,7 @@ RUN_QUERY: if err == nil && queryMeta.GetIndex() < 1 { queryMeta.SetIndex(1) } + // block up to the timeout if we don't see anything fresh. if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { if expired := ws.Watch(timeout.C); !expired { // If a restore may have woken us up then bail out from @@ -594,6 +609,7 @@ RUN_QUERY: select { case <-state.AbandonCh(): default: + // loop back and look for an update again goto RUN_QUERY } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 332fa9aa8b96..4b592ac94499 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -110,6 +110,12 @@ var ( // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { + // queriesBlocking is a counter that we incr and decr atomically in + // rpc calls to provide telemetry on how many blocking queries are running. + // We interact with queriesBlocking atomically, do not move without ensuring it is + // correctly 64-byte aligned in the struct layout + queriesBlocking uint64 + // aclConfig is the configuration for the ACL system aclConfig *acl.Config diff --git a/website/source/docs/agent/telemetry.html.md b/website/source/docs/agent/telemetry.html.md index 7bb420eb3e84..2f0766d90d07 100644 --- a/website/source/docs/agent/telemetry.html.md +++ b/website/source/docs/agent/telemetry.html.md @@ -765,10 +765,16 @@ These metrics are used to monitor the health of the Consul servers. `consul.rpc.query` - This increments when a server sends a (potentially blocking) RPC query. + This increments when a server receives a new blocking RPC request, indicating the rate of new blocking query calls. See consul.rpc.queries_blocking for the current number of in-flight blocking RPC calls. This metric changed in 1.7.0 to only increment on the the start of a query. The rate of queries will appear lower, but is more accurate. queries counter + + `consul.rpc.queries_blocking` + This shows the current number of in-flight blocking queries the server is handling. + queries + gauge + `consul.rpc.cross-dc` This increments when a server sends a (potentially blocking) cross datacenter RPC query. diff --git a/website/source/docs/upgrade-specific.html.md b/website/source/docs/upgrade-specific.html.md index 6c83337e5aee..5aaf834b48a0 100644 --- a/website/source/docs/upgrade-specific.html.md +++ b/website/source/docs/upgrade-specific.html.md @@ -35,6 +35,13 @@ users, both the datacenter and the services namespace will be present. For examp PTR record would previously have contained `web.service.consul`, it will now be `web.service.dc1.consul` in OSS or `web.service.ns1.dc1.consul` for Enterprise. +### Telemetry: semantics of `consul.rpc.query` changed, see `consul.rpc.queries_blocking` + +Consul has changed the semantics of query counts in its [telemetry](/docs/agent/telemetry.html#metrics-reference). +`consul.rpc.query` now only increments on the *start* of a query (blocking or non-blocking), whereas before it would +measure when blocking queries polled for more data. The gauge `consul.rpc.queries_blocking` has been added for a more +to more precisely capture the view of *active* blocking queries. + ## Consul 1.6.0 #### Removal of Deprecated Features