Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Scope metrics according to Prometheus conventions. #1615

Merged
merged 2 commits into from
Jun 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/multitenant/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func timeRequestStatus(method string, metric *prometheus.SummaryVec, toStatusCod
startTime := time.Now()
err := f()
duration := time.Now().Sub(startTime)
metric.WithLabelValues(method, toStatusCode(err)).Observe(float64(duration.Nanoseconds()))
metric.WithLabelValues(method, toStatusCode(err)).Observe(duration.Seconds())
return err
}
47 changes: 26 additions & 21 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,57 +33,62 @@ const (
memcacheExpiration = 15 // seconds
memcacheUpdateInterval = 1 * time.Minute
natsTimeout = 10 * time.Second
hitLabel = "hit"
missLabel = "miss"
)

var (
dynamoRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "dynamo_request_duration_nanoseconds",
Help: "Time spent doing DynamoDB requests.",
Name: "dynamo_request_duration_seconds",
Help: "Time in seconds spent doing DynamoDB requests.",
}, []string{"method", "status_code"})
dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "dynamo_consumed_capacity",
Help: "The capacity units consumed by operation.",
Name: "dynamo_consumed_capacity_total",
Help: "Total count of capacity units consumed per operation.",
}, []string{"method"})
dynamoValueSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "dynamo_value_size_bytes",
Help: "Size of data read / written from dynamodb.",
Name: "dynamo_value_size_bytes_total",
Help: "Total size of data read / written from DynamoDB in bytes.",
}, []string{"method"})

inProcessCacheCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
inProcessCacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "in_process_cache",
Help: "Reports fetches that hit in-process cache.",
}, []string{"result"})
Name: "in_process_cache_requests_total",
Help: "Total count of reports requested from the in-process cache.",
})

inProcessCacheHits = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "in_process_cache_hits_total",
Help: "Total count of reports found in the in-process cache.",
})

reportSize = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "report_size_bytes",
Help: "Compressed size of reports received.",
Name: "report_size_bytes_total",
Help: "Total compressed size of reports received in bytes.",
})

s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_nanoseconds",
Help: "Time spent doing S3 requests.",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})

natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "nats_requests",
Help: "Number of NATS requests.",
Name: "nats_requests_total",
Help: "Total count of NATS requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(dynamoRequestDuration)
prometheus.MustRegister(dynamoConsumedCapacity)
prometheus.MustRegister(dynamoValueSize)
prometheus.MustRegister(inProcessCacheCounter)
prometheus.MustRegister(inProcessCacheRequests)
prometheus.MustRegister(inProcessCacheHits)
prometheus.MustRegister(reportSize)
prometheus.MustRegister(s3RequestDuration)
prometheus.MustRegister(natsRequests)
Expand Down Expand Up @@ -580,8 +585,8 @@ func (c inProcessStore) FetchReports(keys []string) ([]report.Report, []string,
missing = append(missing, key)
}
}
inProcessCacheCounter.WithLabelValues(hitLabel).Add(float64(len(found)))
inProcessCacheCounter.WithLabelValues(missLabel).Add(float64(len(missing)))
inProcessCacheHits.Add(float64(len(found)))
inProcessCacheRequests.Add(float64(len(keys)))
return found, missing, nil
}

Expand Down
25 changes: 16 additions & 9 deletions app/multitenant/memcache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@ import (
)

var (
memcacheCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "memcache",
Help: "Reports that missed our in-memory cache but went to our memcache",
}, []string{"result"})
Name: "memcache_requests_total",
Help: "Total count of reports requested from memcache that were not found in our in-memory cache.",
})

memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "memcache_hits_total",
Help: "Total count of reports found in memcache that were not found in our in-memory cache.",
})

memcacheRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "memcache_request_duration_nanoseconds",
Help: "Time spent doing memcache requests.",
Name: "memcache_request_duration_seconds",
Help: "Total time spent in seconds doing memcache requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(memcacheCounter)
prometheus.MustRegister(memcacheRequests)
prometheus.MustRegister(memcacheHits)
prometheus.MustRegister(memcacheRequestDuration)
}

Expand Down Expand Up @@ -158,8 +165,8 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string,
}
}

memcacheCounter.WithLabelValues(hitLabel).Add(float64(len(reports)))
memcacheCounter.WithLabelValues(missLabel).Add(float64(len(missing)))
memcacheHits.Add(float64(len(reports)))
memcacheRequests.Add(float64(len(keys)))
return reports, missing, nil
}

Expand Down
107 changes: 51 additions & 56 deletions app/multitenant/sqs_control_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ var (
rpcTimeout = time.Minute
sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "sqs_request_duration_nanoseconds",
Help: "Time spent doing SQS requests.",
Name: "sqs_request_duration_seconds",
Help: "Time in seconds spent doing SQS requests.",
}, []string{"method", "status_code"})
)

Expand Down Expand Up @@ -92,16 +92,17 @@ func (cr *sqsControlRouter) getResponseQueueURL() *string {

func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) {
// CreateQueue creates a queue or if it already exists, returns url of said queue
start := time.Now()
createQueueRes, err := cr.service.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
var createQueueRes *sqs.CreateQueueOutput
var err error
err = timeRequestStatus("CreateQueue", sqsRequestDuration, nil, func() error {
createQueueRes, err = cr.service.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("CreateQueue", "500").Observe(float64(duration.Nanoseconds()))
return nil, err
}
sqsRequestDuration.WithLabelValues("CreateQueue", "200").Observe(float64(duration.Nanoseconds()))
return createQueueRes.QueueUrl, nil
}

Expand All @@ -124,18 +125,19 @@ func (cr *sqsControlRouter) loop() {
}

for {
start := time.Now()
res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
WaitTimeSeconds: longPollTime,
var res *sqs.ReceiveMessageOutput
var err error
err = timeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
res, err = cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
WaitTimeSeconds: longPollTime,
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error receiving message from %s: %v", *responseQueueURL, err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
Expand All @@ -155,18 +157,13 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes
Id: message.MessageId,
})
}
start := time.Now()
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: queueURL,
Entries: entries,
return timeRequestStatus("DeleteMessageBatch", sqsRequestDuration, nil, func() error {
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: queueURL,
Entries: entries,
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

func (cr *sqsControlRouter) handleResponses(res *sqs.ReceiveMessageOutput) {
Expand Down Expand Up @@ -195,18 +192,14 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e
return err
}
log.Infof("sendMessage to %s: %s", *queueURL, buf.String())
start := time.Now()
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
QueueUrl: queueURL,
MessageBody: aws.String(buf.String()),

return timeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
QueueUrl: queueURL,
MessageBody: aws.String(buf.String()),
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("SendMessage", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("SendMessage", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer.Request) (xfer.Response, error) {
Expand All @@ -222,17 +215,17 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
return xfer.Response{}, fmt.Errorf("No SQS queue yet!")
}

probeQueueName := fmt.Sprintf("%sprobe-%s-%s", cr.prefix, userID, probeID)
start := time.Now()
probeQueueURL, err := cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(probeQueueName),
var probeQueueURL *sqs.GetQueueUrlOutput
err = timeRequestStatus("GetQueueUrl", sqsRequestDuration, nil, func() error {
probeQueueName := fmt.Sprintf("%sprobe-%s-%s", cr.prefix, userID, probeID)
probeQueueURL, err = cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(probeQueueName),
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}
sqsRequestDuration.WithLabelValues("GetQueueUrl", "200").Observe(float64(duration.Nanoseconds()))

// Add a response channel before we send the request, to prevent races
id := fmt.Sprintf("request-%s-%d", userID, rand.Int63())
Expand All @@ -247,12 +240,13 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
}()

// Next, send the request to that queue
if err := cr.sendMessage(probeQueueURL.QueueUrl, sqsRequestMessage{
ID: id,
Request: req,
ResponseQueueURL: *responseQueueURL,
if err := timeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
return cr.sendMessage(probeQueueURL.QueueUrl, sqsRequestMessage{
ID: id,
Request: req,
ResponseQueueURL: *responseQueueURL,
})
}); err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}

Expand Down Expand Up @@ -327,18 +321,19 @@ func (pw *probeWorker) loop() {
default:
}

start := time.Now()
res, err := pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.requestQueueURL,
WaitTimeSeconds: longPollTime,
var res *sqs.ReceiveMessageOutput
var err error
err = timeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
res, err = pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.requestQueueURL,
WaitTimeSeconds: longPollTime,
})
return err
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error recieving message: %v", err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
Expand Down
2 changes: 1 addition & 1 deletion common/middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (i Instrument) Wrap(next http.Handler) http.Handler {
status = strconv.Itoa(interceptor.statusCode)
took = time.Since(begin)
)
i.Duration.WithLabelValues(r.Method, route, status, isWS).Observe(float64(took.Nanoseconds()))
i.Duration.WithLabelValues(r.Method, route, status, isWS).Observe(took.Seconds())
})
}

Expand Down
6 changes: 3 additions & 3 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var SpyDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "scope",
Subsystem: "probe",
Name: "spy_time_nanoseconds",
Help: "Total time spent spying on active connections.",
Name: "spy_duration_seconds",
Help: "Time in seconds spent spying on active connections.",
MaxAge: 10 * time.Second, // like statsd
},
[]string{},
Expand Down Expand Up @@ -100,7 +100,7 @@ func (t *fourTuple) reverse() {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
SpyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
SpyDuration.WithLabelValues().Observe(time.Since(begin).Seconds())
}(time.Now())

hostNodeID := report.MakeHostNodeID(r.hostID)
Expand Down
4 changes: 2 additions & 2 deletions prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
var (
requestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "request_duration_nanoseconds",
Help: "Time spent serving HTTP requests.",
Name: "request_duration_seconds",
Help: "Time in seconds spent serving HTTP requests.",
}, []string{"method", "route", "status_code", "ws"})
)

Expand Down