Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Write buffer metrics #960

Merged
merged 20 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ func (this *Configuration) postReadAdjustments() error {
if u.Path != "" {
return fmt.Errorf("If specified, HTTPAdvertise must not specify a path")
}
if this.InstanceWriteBufferSize <= 0 {
this.BufferInstanceWrites = false
}
}
return nil
}
Expand Down
42 changes: 41 additions & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/github/orchestrator/go/logic"
"github.com/github/orchestrator/go/metrics/query"
"github.com/github/orchestrator/go/process"
"github.com/github/orchestrator/go/raft"
orcraft "github.com/github/orchestrator/go/raft"
)

// APIResponseCode is an OK/ERROR response code
Expand Down Expand Up @@ -123,6 +123,7 @@ type HttpAPI struct {
var API HttpAPI = HttpAPI{}
var discoveryMetrics = collection.CreateOrReturnCollection("DISCOVERY_METRICS")
var queryMetrics = collection.CreateOrReturnCollection("BACKEND_WRITES")
var writeBufferMetrics = collection.CreateOrReturnCollection("WRITE_BUFFER")

func (this *HttpAPI) getInstanceKeyInternal(host string, port string, resolve bool) (inst.InstanceKey, error) {
var instanceKey *inst.InstanceKey
Expand Down Expand Up @@ -2390,6 +2391,43 @@ func (this *HttpAPI) BackendQueryMetricsAggregated(params martini.Params, r rend
r.JSON(http.StatusOK, aggregated)
}

// WriteBufferMetricsRaw returns the raw instance write buffer metrics
func (this *HttpAPI) WriteBufferMetricsRaw(params martini.Params, r render.Render, req *http.Request, user auth.User) {
seconds, err := strconv.Atoi(params["seconds"])
log.Debugf("WriteBufferMetricsRaw: seconds: %d", seconds)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate raw instance write buffer metrics"})
return
}

refTime := time.Now().Add(-time.Duration(seconds) * time.Second)
m, err := writeBufferMetrics.Since(refTime)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to return instance write buffermetrics"})
return
}

log.Debugf("WriteBufferMetricsRaw data: %+v", m)

r.JSON(http.StatusOK, m)
}

// WriteBufferMetricsAggregated provides aggregate metrics of instance write buffer metrics
func (this *HttpAPI) WriteBufferMetricsAggregated(params martini.Params, r render.Render, req *http.Request, user auth.User) {
seconds, err := strconv.Atoi(params["seconds"])
log.Debugf("WriteBufferMetricsAggregated: seconds: %d", seconds)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to aggregated instance write buffer metrics"})
return
}

refTime := time.Now().Add(-time.Duration(seconds) * time.Second)
aggregated := inst.AggregatedSince(writeBufferMetrics, refTime)
log.Debugf("WriteBufferMetricsAggregated data: %+v", aggregated)

r.JSON(http.StatusOK, aggregated)
}

// Agents provides complete list of registered agents (See https://github.com/github/orchestrator-agent)
func (this *HttpAPI) Agents(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -3778,6 +3816,8 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "discovery-queue-metrics-aggregated/:seconds", this.DiscoveryQueueMetricsAggregated)
this.registerAPIRequest(m, "backend-query-metrics-raw/:seconds", this.BackendQueryMetricsRaw)
this.registerAPIRequest(m, "backend-query-metrics-aggregated/:seconds", this.BackendQueryMetricsAggregated)
this.registerAPIRequest(m, "write-buffer-metrics-raw/:seconds", this.WriteBufferMetricsRaw)
this.registerAPIRequest(m, "write-buffer-metrics-aggregated/:seconds", this.WriteBufferMetricsAggregated)

// Agents
this.registerAPIRequest(m, "agents", this.Agents)
Expand Down
40 changes: 37 additions & 3 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var readTopologyInstanceCounter = metrics.NewCounter()
var readInstanceCounter = metrics.NewCounter()
var writeInstanceCounter = metrics.NewCounter()
var backendWrites = collection.CreateOrReturnCollection("BACKEND_WRITES")
var writeBufferMetrics = collection.CreateOrReturnCollection("WRITE_BUFFER")
var writeBufferLatency = stopwatch.NewNamedStopwatch()

var emptyQuotesRegexp = regexp.MustCompile(`^""$`)

Expand All @@ -85,6 +87,8 @@ func init() {
metrics.Register("instance.read_topology", readTopologyInstanceCounter)
metrics.Register("instance.read", readInstanceCounter)
metrics.Register("instance.write", writeInstanceCounter)
writeBufferLatency.AddMany([]string{"wait", "write"})
writeBufferLatency.Start("wait")

go initializeInstanceDao()
}
Expand Down Expand Up @@ -2523,9 +2527,14 @@ var forceFlushInstanceWriteBuffer = make(chan bool)

func enqueueInstanceWrite(instance *Instance, instanceWasActuallyFound bool, lastError error) {
if len(instanceWriteBuffer) == config.Config.InstanceWriteBufferSize {
luisyonaldo marked this conversation as resolved.
Show resolved Hide resolved
// Signal the "flushing" gorouting that there's work.
// Signal the "flushing" goroutine that there's work.
// We prefer doing all bulk flushes from one goroutine.
forceFlushInstanceWriteBuffer <- true
// Non blocking send to avoid blocking goroutines on sending a flush,
// if the "flushing" goroutine is not able read is because a flushing is ongoing.
select {
case forceFlushInstanceWriteBuffer <- true:
default:
}
luisyonaldo marked this conversation as resolved.
Show resolved Hide resolved
}
instanceWriteBuffer <- instanceUpdateObject{instance, instanceWasActuallyFound, lastError}
}
Expand All @@ -2535,11 +2544,24 @@ func flushInstanceWriteBuffer() {
var instances []*Instance
var lastseen []*Instance // instances to update with last_seen field

defer func() {
// reset stopwatches (TODO: .ResetAll())
writeBufferLatency.Reset("wait")
writeBufferLatency.Reset("write")
writeBufferLatency.Start("wait") // waiting for next flush
}()

writeBufferLatency.Stop("wait")

if len(instanceWriteBuffer) == 0 {
return
}

for i := 0; i < len(instanceWriteBuffer); i++ {
// There are `DiscoveryMaxConcurrency` many goroutines trying to enqueue an instance into the buffer
// when one instance is flushed from the buffer then one discovery goroutine is ready to enqueue a new instance
// this is why we want to flush all instances in the buffer untill a max of `InstanceWriteBufferSize`.
// Otherwise we can flush way more instances than what's expected.
for i := 0; i < config.Config.InstanceWriteBufferSize && len(instanceWriteBuffer) > 0; i++ {
upd := <-instanceWriteBuffer
if upd.instanceWasActuallyFound && upd.lastError == nil {
lastseen = append(lastseen, upd.instance)
Expand All @@ -2548,6 +2570,9 @@ func flushInstanceWriteBuffer() {
log.Debugf("flushInstanceWriteBuffer: will not update database_instance.last_seen due to error: %+v", upd.lastError)
}
}

writeBufferLatency.Start("write")

// sort instances by instanceKey (table pk) to make locking predictable
sort.Sort(byInstanceKey(instances))
sort.Sort(byInstanceKey(lastseen))
Expand All @@ -2569,6 +2594,15 @@ func flushInstanceWriteBuffer() {
if err != nil {
log.Errorf("flushInstanceWriteBuffer: %v", err)
}

writeBufferLatency.Stop("write")

writeBufferMetrics.Append(&WriteBufferMetric{
Timestamp: time.Now(),
WaitLatency: writeBufferLatency.Elapsed("wait"),
WriteLatency: writeBufferLatency.Elapsed("write"),
Instances: len(lastseen) + len(instances),
})
}

// WriteInstance stores an instance in the orchestrator backend
Expand Down
133 changes: 133 additions & 0 deletions go/inst/write_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2017 Simon J Mudd

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inst

/*
query holds information about query metrics and records the time taken
waiting before doing the query plus the time taken executing the query.
*/
import (
"time"

"github.com/github/orchestrator/go/collection"
"github.com/github/orchestrator/go/config"

"github.com/montanaflynn/stats"
)

// Metric records query metrics of backend writes that go through
// a sized channel. It allows us to compare the time waiting to
// execute the query against the time needed to run it and in a
// "sized channel" the wait time may be significant and is good to
// measure.
type WriteBufferMetric struct {
Timestamp time.Time // time the metric was started
Instances int // number of flushed instances
WaitLatency time.Duration // waiting before flush
WriteLatency time.Duration // time writing to backend
}

// When records the timestamp of the start of the recording
func (m WriteBufferMetric) When() time.Time {
return m.Timestamp
}

type AggregatedWriteBufferMetric struct {
InstanceWriteBufferSize int // config setting
InstanceFlushIntervalMilliseconds int // config setting
CountInstances int
MaxInstances float64
MeanInstances float64
MedianInstances float64
P95Instances float64
MaxWaitSeconds float64
MeanWaitSeconds float64
MedianWaitSeconds float64
P95WaitSeconds float64
MaxWriteSeconds float64
MeanWriteSeconds float64
MedianWriteSeconds float64
P95WriteSeconds float64
}

// AggregatedSince returns the aggregated query metrics for the period
// given from the values provided.
func AggregatedSince(c *collection.Collection, t time.Time) AggregatedWriteBufferMetric {

// Initialise timing metrics
var instancesCounter []float64
var waitTimings []float64
var writeTimings []float64

// Retrieve values since the time specified
values, err := c.Since(t)
a := AggregatedWriteBufferMetric{
InstanceWriteBufferSize: config.Config.InstanceWriteBufferSize,
InstanceFlushIntervalMilliseconds: config.Config.InstanceFlushIntervalMilliseconds,
}
if err != nil {
return a // empty data
}

// generate the metrics
for _, v := range values {
instancesCounter = append(instancesCounter, float64(v.(*WriteBufferMetric).Instances))
waitTimings = append(waitTimings, v.(*WriteBufferMetric).WaitLatency.Seconds())
writeTimings = append(writeTimings, v.(*WriteBufferMetric).WriteLatency.Seconds())
a.CountInstances += v.(*WriteBufferMetric).Instances
}

// generate aggregate values
if s, err := stats.Max(stats.Float64Data(instancesCounter)); err == nil {
a.MaxInstances = s
}
if s, err := stats.Mean(stats.Float64Data(instancesCounter)); err == nil {
a.MeanInstances = s
}
if s, err := stats.Median(stats.Float64Data(instancesCounter)); err == nil {
a.MedianInstances = s
}
if s, err := stats.Percentile(stats.Float64Data(instancesCounter), 95); err == nil {
a.P95Instances = s
}
if s, err := stats.Max(stats.Float64Data(waitTimings)); err == nil {
a.MaxWaitSeconds = s
}
if s, err := stats.Mean(stats.Float64Data(waitTimings)); err == nil {
a.MeanWaitSeconds = s
}
if s, err := stats.Median(stats.Float64Data(waitTimings)); err == nil {
a.MedianWaitSeconds = s
}
if s, err := stats.Percentile(stats.Float64Data(waitTimings), 95); err == nil {
a.P95WaitSeconds = s
}
if s, err := stats.Max(stats.Float64Data(writeTimings)); err == nil {
a.MaxWriteSeconds = s
}
if s, err := stats.Mean(stats.Float64Data(writeTimings)); err == nil {
a.MeanWriteSeconds = s
}
if s, err := stats.Median(stats.Float64Data(writeTimings)); err == nil {
a.MedianWriteSeconds = s
}
if s, err := stats.Percentile(stats.Float64Data(writeTimings), 95); err == nil {
a.P95WriteSeconds = s
}

return a
}