Skip to content

Commit

Permalink
rpc: measure blocking queries (#7224)
Browse files Browse the repository at this point in the history
* agent: measure blocking queries

* agent.rpc: update docs to mention we only record blocking queries

* agent.rpc: make go fmt happy

* agent.rpc: fix non-atomic read and decrement with bitwise xor of uint64 0

* agent.rpc: clarify review question

* agent.rpc: today I learned that one must declare all variables before interacting with goto labels

* Update agent/consul/server.go

agent.rpc: more precise comment on `Server.queriesBlocking`

Co-Authored-By: Paul Banks <banks@banksco.de>

* Update website/source/docs/agent/telemetry.html.md

agent.rpc: improve queries_blocking description

Co-Authored-By: Paul Banks <banks@banksco.de>

* agent.rpc: fix some bugs found in review

* add a note about the updated counter behavior to telemetry.md

* docs: add upgrade-specific note on consul.rpc.quer{y,ies_blocking} behavior

Co-authored-by: Paul Banks <banks@banksco.de>
  • Loading branch information
mkcp and banks authored Feb 10, 2020
1 parent b42735f commit 55f19a9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
22 changes: 19 additions & 3 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strings"
"sync/atomic"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -519,8 +520,12 @@ type queryFn func(memdb.WatchSet, *state.Store) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
var timeout *time.Timer
var queriesBlocking uint64
var queryTimeout time.Duration

// Instrument all queries run
metrics.IncrCounter([]string{"rpc", "query"}, 1)

minQueryIndex := queryOpts.GetMinQueryIndex()
// Fast path right to the non-blocking query.
if minQueryIndex == 0 {
Expand All @@ -542,19 +547,28 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
timeout = time.NewTimer(queryTimeout)
defer timeout.Stop()

// instrument blockingQueries
// atomic inc our server's count of in-flight blockingQueries and store the new value
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
// atomic dec when we return from blockingQuery()
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
// set the gauge directly to the new value of s.blockingQueries
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))

RUN_QUERY:
// Setup blocking loop
// Update the query metadata.
s.setQueryMeta(queryMeta)

// Validate
// If the read must be consistent we verify that we are still the leader.
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}

// Run the query.
metrics.IncrCounter([]string{"rpc", "query"}, 1)
// Run query

// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
Expand All @@ -571,7 +585,7 @@ RUN_QUERY:
ws.Add(state.AbandonCh())
}

// Block up to the timeout if we didn't see anything fresh.
// Execute the queryFn
err := fn(ws, state)
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
Expand All @@ -584,6 +598,7 @@ RUN_QUERY:
if err == nil && queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
// block up to the timeout if we don't see anything fresh.
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
// If a restore may have woken us up then bail out from
Expand All @@ -594,6 +609,7 @@ RUN_QUERY:
select {
case <-state.AbandonCh():
default:
// loop back and look for an update again
goto RUN_QUERY
}
}
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ var (
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
// queriesBlocking is a counter that we incr and decr atomically in
// rpc calls to provide telemetry on how many blocking queries are running.
// We interact with queriesBlocking atomically, do not move without ensuring it is
// correctly 64-byte aligned in the struct layout
queriesBlocking uint64

// aclConfig is the configuration for the ACL system
aclConfig *acl.Config

Expand Down
8 changes: 7 additions & 1 deletion website/source/docs/agent/telemetry.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -765,10 +765,16 @@ These metrics are used to monitor the health of the Consul servers.
</tr>
<tr>
<td>`consul.rpc.query`</td>
<td>This increments when a server sends a (potentially blocking) RPC query.</td>
<td>This increments when a server receives a new blocking RPC request, indicating the rate of new blocking query calls. See consul.rpc.queries_blocking for the current number of in-flight blocking RPC calls. This metric changed in 1.7.0 to only increment on the the start of a query. The rate of queries will appear lower, but is more accurate.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.queries_blocking`</td>
<td>This shows the current number of in-flight blocking queries the server is handling.</td>
<td>queries</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.rpc.cross-dc`</td>
<td>This increments when a server sends a (potentially blocking) cross datacenter RPC query.</td>
Expand Down
7 changes: 7 additions & 0 deletions website/source/docs/upgrade-specific.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ users, both the datacenter and the services namespace will be present. For examp
PTR record would previously have contained `web.service.consul`, it will now be `web.service.dc1.consul`
in OSS or `web.service.ns1.dc1.consul` for Enterprise.

### Telemetry: semantics of `consul.rpc.query` changed, see `consul.rpc.queries_blocking`

Consul has changed the semantics of query counts in its [telemetry](/docs/agent/telemetry.html#metrics-reference).
`consul.rpc.query` now only increments on the *start* of a query (blocking or non-blocking), whereas before it would
measure when blocking queries polled for more data. The gauge `consul.rpc.queries_blocking` has been added for a more
to more precisely capture the view of *active* blocking queries.

## Consul 1.6.0

#### Removal of Deprecated Features
Expand Down

0 comments on commit 55f19a9

Please sign in to comment.