Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: measure blocking queries #7224

Merged
merged 11 commits into from
Feb 10, 2020
22 changes: 19 additions & 3 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strings"
"sync/atomic"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -519,8 +520,12 @@ type queryFn func(memdb.WatchSet, *state.Store) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
var timeout *time.Timer
var queriesBlocking uint64
var queryTimeout time.Duration

// Instrument all queries run
metrics.IncrCounter([]string{"rpc", "query"}, 1)

minQueryIndex := queryOpts.GetMinQueryIndex()
// Fast path right to the non-blocking query.
if minQueryIndex == 0 {
Expand All @@ -542,19 +547,28 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
timeout = time.NewTimer(queryTimeout)
defer timeout.Stop()

// instrument blockingQueries
// atomic inc our server's count of in-flight blockingQueries and store the new value
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
// atomic dec when we return from blockingQuery()
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
// set the gauge directly to the new value of s.blockingQueries
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))

RUN_QUERY:
// Setup blocking loop
// Update the query metadata.
s.setQueryMeta(queryMeta)

// Validate
// If the read must be consistent we verify that we are still the leader.
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}

// Run the query.
metrics.IncrCounter([]string{"rpc", "query"}, 1)
// Run query

// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
Expand All @@ -571,7 +585,7 @@ RUN_QUERY:
ws.Add(state.AbandonCh())
}

// Block up to the timeout if we didn't see anything fresh.
// Execute the queryFn
err := fn(ws, state)
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
Expand All @@ -584,6 +598,7 @@ RUN_QUERY:
if err == nil && queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
// block up to the timeout if we don't see anything fresh.
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
// If a restore may have woken us up then bail out from
Expand All @@ -594,6 +609,7 @@ RUN_QUERY:
select {
case <-state.AbandonCh():
default:
// loop back and look for an update again
goto RUN_QUERY
}
}
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ var (
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
// queriesBlocking is a counter that we incr and decr atomically in
// rpc calls to provide telemetry on how many blocking queries are running.
// We interact with queriesBlocking atomically, do not move without ensuring it is
// correctly 64-byte aligned in the struct layout
queriesBlocking uint64

// aclConfig is the configuration for the ACL system
aclConfig *acl.Config

Expand Down
8 changes: 7 additions & 1 deletion website/source/docs/agent/telemetry.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -765,10 +765,16 @@ These metrics are used to monitor the health of the Consul servers.
</tr>
<tr>
<td>`consul.rpc.query`</td>
<td>This increments when a server sends a (potentially blocking) RPC query.</td>
<td>This increments when a server receives a new blocking RPC request, indicating the rate of new blocking query calls. See consul.rpc.queries_blocking for the current number of in-flight blocking RPC calls. This metric changed in 1.7.0 to only increment on the the start of a query. The rate of queries will appear lower, but is more accurate.</td>
<td>queries</td>
banks marked this conversation as resolved.
Show resolved Hide resolved
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.queries_blocking`</td>
<td>This shows the current number of in-flight blocking queries the server is handling.</td>
<td>queries</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.rpc.cross-dc`</td>
<td>This increments when a server sends a (potentially blocking) cross datacenter RPC query.</td>
Expand Down
7 changes: 7 additions & 0 deletions website/source/docs/upgrade-specific.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ users, both the datacenter and the services namespace will be present. For examp
PTR record would previously have contained `web.service.consul`, it will now be `web.service.dc1.consul`
in OSS or `web.service.ns1.dc1.consul` for Enterprise.

### Telemetry: semantics of `consul.rpc.query` changed, see `consul.rpc.queries_blocking`

Consul has changed the semantics of query counts in its [telemetry](/docs/agent/telemetry.html#metrics-reference).
`consul.rpc.query` now only increments on the *start* of a query (blocking or non-blocking), whereas before it would
measure when blocking queries polled for more data. The gauge `consul.rpc.queries_blocking` has been added for a more
to more precisely capture the view of *active* blocking queries.

## Consul 1.6.0

#### Removal of Deprecated Features
Expand Down