diff --git a/CHANGELOG.md b/CHANGELOG.md index 67c439c07..80cef351e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,18 @@ ### Release Notes +Kapacitor now exposes more internal metrics for determining the performance of a given task. +The internal stats now includes a measurement `node` that contains an averaged execution time for the node, tagged by the task, node, task type and kind of node (i.e. window vs union). +These stats are also available in the DOT output of the Kapacitor show command. + + ### Features - [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by - [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons. - [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data. The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`. - [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers. +- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values. ### Bugfixes diff --git a/batch.go b/batch.go index bfcf50587..eee35f23b 100644 --- a/batch.go +++ b/batch.go @@ -96,7 +96,7 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string { // Do not add the source batch node to the dot output // since its not really an edge. -func (s *SourceBatchNode) edot(*bytes.Buffer, time.Duration) {} +func (s *SourceBatchNode) edot(*bytes.Buffer) {} func (s *SourceBatchNode) collectedCount() (count int64) { for _, child := range s.children { diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index ddefbd3c5..953b24879 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -2,7 +2,6 @@ package run import ( "errors" - "expvar" "fmt" "io/ioutil" "log" @@ -45,27 +44,6 @@ import ( const clusterIDFilename = "cluster.id" const serverIDFilename = "server.id" -var ( - //Published vars - cidVar = &expvar.String{} - - sidVar = &expvar.String{} - - hostVar = &expvar.String{} - - productVar = &expvar.String{} - - versionVar = &expvar.String{} -) - -func init() { - expvar.Publish(kapacitor.ClusterIDVarName, cidVar) - expvar.Publish(kapacitor.ServerIDVarName, sidVar) - expvar.Publish(kapacitor.HostVarName, hostVar) - expvar.Publish(kapacitor.ProductVarName, productVar) - expvar.Publish(kapacitor.VersionVarName, versionVar) -} - // BuildInfo represents the build details for the server code. type BuildInfo struct { Version string @@ -414,11 +392,11 @@ func (s *Server) Open() error { } // Set published vars - cidVar.Set(s.ClusterID) - sidVar.Set(s.ServerID) - hostVar.Set(s.hostname) - productVar.Set(kapacitor.Product) - versionVar.Set(s.buildInfo.Version) + kapacitor.ClusterIDVar.Set(s.ClusterID) + kapacitor.ServerIDVar.Set(s.ServerID) + kapacitor.HostVar.Set(s.hostname) + kapacitor.ProductVar.Set(kapacitor.Product) + kapacitor.VersionVar.Set(s.buildInfo.Version) s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID) // Start profiling, if set. diff --git a/edge.go b/edge.go index 44ba49029..088338716 100644 --- a/edge.go +++ b/edge.go @@ -2,12 +2,12 @@ package kapacitor import ( "errors" - "expvar" "fmt" "log" "strconv" "sync" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -36,6 +36,7 @@ type Edge struct { logger *log.Logger aborted chan struct{} + statsKey string statMap *expvar.Map groupMu sync.RWMutex groupStats map[models.GroupID]*edgeStat @@ -48,10 +49,11 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer "child": childName, "type": t.String(), } - sm := NewStatistics("edges", tags) + key, sm := NewStatistics("edges", tags) sm.Add(statCollected, 0) sm.Add(statEmitted, 0) e := &Edge{ + statsKey: key, statMap: sm, aborted: make(chan struct{}), groupStats: make(map[models.GroupID]*edgeStat), @@ -125,6 +127,7 @@ func (e *Edge) Close() { if e.reduce != nil { close(e.reduce) } + DeleteStatistics(e.statsKey) } // Abort all next and collect calls. diff --git a/expvar/expvar.go b/expvar/expvar.go new file mode 100644 index 000000000..230448ce0 --- /dev/null +++ b/expvar/expvar.go @@ -0,0 +1,245 @@ +// This package is a fork of the golang expvar expvar.Var types. +// Adding extra support for deleting and accessing raw typed values. +package expvar + +import ( + "bytes" + "expvar" + "fmt" + "math" + "sort" + "strconv" + "sync" + "sync/atomic" +) + +// Int is a 64-bit integer variable that satisfies the expvar.Var interface. +type Int struct { + i int64 +} + +func (v *Int) String() string { + return strconv.FormatInt(v.Get(), 10) +} + +func (v *Int) Add(delta int64) { + atomic.AddInt64(&v.i, delta) +} + +func (v *Int) Set(value int64) { + atomic.StoreInt64(&v.i, value) +} + +func (v *Int) Get() int64 { + return atomic.LoadInt64(&v.i) +} + +// Float is a 64-bit float variable that satisfies the expvar.Var interface. +type Float struct { + f uint64 +} + +func (v *Float) String() string { + return strconv.FormatFloat(v.Get(), 'g', -1, 64) +} + +func (v *Float) Get() float64 { + return math.Float64frombits(atomic.LoadUint64(&v.f)) +} + +// Add adds delta to v. +func (v *Float) Add(delta float64) { + for { + cur := atomic.LoadUint64(&v.f) + curVal := math.Float64frombits(cur) + nxtVal := curVal + delta + nxt := math.Float64bits(nxtVal) + if atomic.CompareAndSwapUint64(&v.f, cur, nxt) { + return + } + } +} + +// Set sets v to value. +func (v *Float) Set(value float64) { + atomic.StoreUint64(&v.f, math.Float64bits(value)) +} + +// MaxFloat is a 64-bit float variable that satisfies the expvar.Var interface. +// When setting a value it will only be set if it is greater than the current value. +type MaxFloat struct { + f uint64 +} + +func (v *MaxFloat) String() string { + return strconv.FormatFloat(v.Get(), 'g', -1, 64) +} + +func (v *MaxFloat) Get() float64 { + return math.Float64frombits(atomic.LoadUint64(&v.f)) +} + +// Set sets v to value. +func (v *MaxFloat) Set(value float64) { + nxtBits := math.Float64bits(value) + for { + curBits := atomic.LoadUint64(&v.f) + cur := math.Float64frombits(curBits) + if value > cur { + if atomic.CompareAndSwapUint64(&v.f, curBits, nxtBits) { + return + } + } else { + return + } + } +} + +// Map is a string-to-expvar.Var map variable that satisfies the expvar.Var interface. +type Map struct { + mu sync.RWMutex + m map[string]expvar.Var + keys []string // sorted +} + +func (v *Map) String() string { + v.mu.RLock() + defer v.mu.RUnlock() + var b bytes.Buffer + fmt.Fprintf(&b, "{") + first := true + v.doLocked(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(&b, ", ") + } + fmt.Fprintf(&b, "%q: %v", kv.Key, kv.Value) + first = false + }) + fmt.Fprintf(&b, "}") + return b.String() +} + +func (v *Map) Init() *Map { + v.m = make(map[string]expvar.Var) + return v +} + +// updateKeys updates the sorted list of keys in v.keys. +// must be called with v.mu held. +func (v *Map) updateKeys() { + if len(v.m) == len(v.keys) { + // No new key. + return + } + v.keys = v.keys[:0] + for k := range v.m { + v.keys = append(v.keys, k) + } + sort.Strings(v.keys) +} + +func (v *Map) Get(key string) expvar.Var { + v.mu.RLock() + defer v.mu.RUnlock() + return v.m[key] +} + +func (v *Map) Set(key string, av expvar.Var) { + v.mu.Lock() + defer v.mu.Unlock() + v.m[key] = av + v.updateKeys() +} + +func (v *Map) Delete(key string) { + v.mu.Lock() + defer v.mu.Unlock() + delete(v.m, key) + v.updateKeys() +} + +func (v *Map) Add(key string, delta int64) { + v.mu.RLock() + av, ok := v.m[key] + v.mu.RUnlock() + if !ok { + // check again under the write lock + v.mu.Lock() + av, ok = v.m[key] + if !ok { + av = new(Int) + v.m[key] = av + v.updateKeys() + } + v.mu.Unlock() + } + + // Add to Int; ignore otherwise. + if iv, ok := av.(*Int); ok { + iv.Add(delta) + } +} + +// AddFloat adds delta to the *Float value stored under the given map key. +func (v *Map) AddFloat(key string, delta float64) { + v.mu.RLock() + av, ok := v.m[key] + v.mu.RUnlock() + if !ok { + // check again under the write lock + v.mu.Lock() + av, ok = v.m[key] + if !ok { + av = new(Float) + v.m[key] = av + v.updateKeys() + } + v.mu.Unlock() + } + + // Add to Float; ignore otherwise. + if iv, ok := av.(*Float); ok { + iv.Add(delta) + } +} + +// Do calls f for each entry in the map. +// The map is locked during the iteration, +// but existing entries may be concurrently updated. +func (v *Map) Do(f func(expvar.KeyValue)) { + v.mu.RLock() + defer v.mu.RUnlock() + v.doLocked(f) +} + +// doLocked calls f for each entry in the map. +// v.mu must be held for reads. +func (v *Map) doLocked(f func(expvar.KeyValue)) { + for _, k := range v.keys { + f(expvar.KeyValue{k, v.m[k]}) + } +} + +// String is a string variable, and satisfies the expvar.Var interface. +type String struct { + mu sync.RWMutex + s string +} + +func (v *String) String() string { + v.mu.RLock() + defer v.mu.RUnlock() + return strconv.Quote(v.s) +} + +func (v *String) Set(value string) { + v.mu.Lock() + defer v.mu.Unlock() + v.s = value +} + +func (v *String) Get() string { + v.mu.RLock() + defer v.mu.RUnlock() + return v.s +} diff --git a/global_stats.go b/global_stats.go index 3927843e1..9e1c22cfb 100644 --- a/global_stats.go +++ b/global_stats.go @@ -2,11 +2,11 @@ package kapacitor import ( "expvar" + "fmt" "runtime" - "strconv" - "sync" "time" + kexpvar "github.com/influxdata/kapacitor/expvar" "github.com/twinj/uuid" ) @@ -30,9 +30,18 @@ const ( var ( // Global expvars - NumTasks = &expvar.Int{} - NumEnabledTasks = &expvar.Int{} - NumSubscriptions = &expvar.Int{} + NumTasksVar = &kexpvar.Int{} + NumEnabledTasksVar = &kexpvar.Int{} + NumSubscriptionsVar = &kexpvar.Int{} + + ClusterIDVar = &kexpvar.String{} + ServerIDVar = &kexpvar.String{} + HostVar = &kexpvar.String{} + ProductVar = &kexpvar.String{} + VersionVar = &kexpvar.String{} + + // All internal stats are added as sub-maps to this top level map. + stats *kexpvar.Map ) var ( @@ -41,78 +50,65 @@ var ( func init() { startTime = time.Now().UTC() - expvar.Publish(NumTasksVarName, NumTasks) - expvar.Publish(NumEnabledTasksVarName, NumEnabledTasks) - expvar.Publish(NumSubscriptionsVarName, NumSubscriptions) -} -// Gets an exported var and returns its unquoted string contents -func GetStringVar(name string) string { - s, err := strconv.Unquote(expvar.Get(name).String()) - if err != nil { - panic(err) - } - return s -} + expvar.Publish(NumTasksVarName, NumTasksVar) + expvar.Publish(NumEnabledTasksVarName, NumEnabledTasksVar) + expvar.Publish(NumSubscriptionsVarName, NumSubscriptionsVar) -// Gets an exported var and returns its int value -func GetIntVar(name string) int64 { - i, err := strconv.ParseInt(expvar.Get(name).String(), 10, 64) - if err != nil { - panic(err) - } - return i -} + expvar.Publish(ClusterIDVarName, ClusterIDVar) + expvar.Publish(ServerIDVarName, ServerIDVar) + expvar.Publish(HostVarName, HostVar) + expvar.Publish(ProductVarName, ProductVar) + expvar.Publish(VersionVarName, VersionVar) -// Gets an exported var and returns its float value -func GetFloatVar(name string) float64 { - f, err := strconv.ParseFloat(expvar.Get(name).String(), 64) - if err != nil { - panic(err) - } - return f + // Initialze the global stats map + stats = &kexpvar.Map{} + stats.Init() + expvar.Publish(Product, stats) } func Uptime() time.Duration { return time.Now().Sub(startTime) } -var expvarMu sync.Mutex - // NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, // and values are placed at the key "values". // The "values" map is returned so that statistics can be set. -func NewStatistics(name string, tags map[string]string) *expvar.Map { - expvarMu.Lock() - defer expvarMu.Unlock() - +func NewStatistics(name string, tags map[string]string) (string, *kexpvar.Map) { key := uuid.NewV4().String() - m := &expvar.Map{} + m := &kexpvar.Map{} m.Init() - expvar.Publish(key, m) // Set the name - nameVar := &expvar.String{} + nameVar := &kexpvar.String{} nameVar.Set(name) m.Set("name", nameVar) // Set the tags - tagsVar := &expvar.Map{} + tagsVar := &kexpvar.Map{} tagsVar.Init() for k, v := range tags { - value := &expvar.String{} + value := &kexpvar.String{} value.Set(v) tagsVar.Set(k, value) } m.Set("tags", tagsVar) // Create and set the values entry used for actual stats. - statMap := &expvar.Map{} + statMap := &kexpvar.Map{} statMap.Init() m.Set("values", statMap) - return statMap + // Set new statsMap on the top level map. + stats.Set(key, m) + + return key, statMap +} + +// Remove a statistics map. +func DeleteStatistics(key string) { + stats.Delete(key) } type StatsData struct { @@ -132,83 +128,68 @@ func GetStatsData() ([]StatsData, error) { allData = append(allData, globalData) + // Get all global statistics expvar.Do(func(kv expvar.KeyValue) { - var f interface{} - var err error switch v := kv.Value.(type) { - case *expvar.Float: - f, err = strconv.ParseFloat(v.String(), 64) - if err == nil { - globalData.Values[kv.Key] = f - } - case *expvar.Int: - f, err = strconv.ParseInt(v.String(), 10, 64) - if err == nil { - globalData.Values[kv.Key] = f - } - case *expvar.Map: - data := StatsData{ - Tags: make(map[string]string), - Values: make(map[string]interface{}), + case *kexpvar.Int: + globalData.Values[kv.Key] = v.Get() + case *kexpvar.Float: + globalData.Values[kv.Key] = v.Get() + case *kexpvar.Map: + if kv.Key != Product { + panic("unexpected published top level expvar.Map with key " + kv.Key) } + } + }) + // Get all other specific statistics + stats.Do(func(kv expvar.KeyValue) { + v := kv.Value.(*kexpvar.Map) + + data := StatsData{ + Tags: make(map[string]string), + Values: make(map[string]interface{}), + } - v.Do(func(subKV expvar.KeyValue) { - switch subKV.Key { - case "name": - // straight to string name. - u, err := strconv.Unquote(subKV.Value.String()) - if err != nil { - return + v.Do(func(subKV expvar.KeyValue) { + switch subKV.Key { + case "name": + data.Name = subKV.Value.(*kexpvar.String).Get() + case "tags": + // string-string tags map. + n := subKV.Value.(*kexpvar.Map) + n.Do(func(t expvar.KeyValue) { + data.Tags[t.Key] = t.Value.(*kexpvar.String).Get() + }) + case "values": + // string-interface map. + n := subKV.Value.(*kexpvar.Map) + n.Do(func(kv expvar.KeyValue) { + switch v := kv.Value.(type) { + case *kexpvar.Int: + data.Values[kv.Key] = v.Get() + case *kexpvar.Float: + data.Values[kv.Key] = v.Get() + case *kexpvar.MaxFloat: + data.Values[kv.Key] = v.Get() + default: + panic(fmt.Sprintf("unknown expvar.Var type for stats %T", kv.Value)) } - data.Name = u - case "tags": - // string-string tags map. - n := subKV.Value.(*expvar.Map) - n.Do(func(t expvar.KeyValue) { - u, err := strconv.Unquote(t.Value.String()) - if err != nil { - return - } - data.Tags[t.Key] = u - }) - case "values": - // string-interface map. - n := subKV.Value.(*expvar.Map) - n.Do(func(kv expvar.KeyValue) { - var f interface{} - var err error - switch v := kv.Value.(type) { - case *expvar.Float: - f, err = strconv.ParseFloat(v.String(), 64) - if err != nil { - return - } - case *expvar.Int: - f, err = strconv.ParseInt(v.String(), 10, 64) - if err != nil { - return - } - default: - return - } - data.Values[kv.Key] = f - }) - } - }) - - // If no field data, don't include it in the results - if len(data.Values) == 0 { - return + }) } + }) - allData = append(allData, data) + // If no field data, don't include it in the results + if len(data.Values) == 0 { + return } + + allData = append(allData, data) }) // Add uptime to globalData globalData.Values[UptimeVarName] = Uptime().Seconds() - // Add Go memstats. + // Add Go runtime stats. data := StatsData{ Name: "runtime", } diff --git a/join.go b/join.go index 67de8605b..cb8551fe9 100644 --- a/join.go +++ b/join.go @@ -99,22 +99,6 @@ func (j *JoinNode) runJoin([]byte) error { return nil } -func (j *JoinNode) nodeExecTime() time.Duration { - j.mu.RLock() - defer j.mu.RUnlock() - sum := 0.0 - total := 0.0 - for _, group := range j.groups { - avg, count := group.timer.AverageTime() - sum += float64(avg) * float64(count) - total += float64(count) - } - if total == 0 { - return 0 - } - return time.Duration(sum / total) -} - // safely get the group for the point or create one if it doesn't exist. func (j *JoinNode) getGroup(p models.PointInterface) *group { j.mu.Lock() @@ -151,7 +135,7 @@ func newGroup(i int, j *JoinNode) *group { head: make([]time.Time, i), j: j, points: make(chan srcPoint), - timer: j.et.tm.TimingService.NewTimer(), + timer: j.et.tm.TimingService.NewTimer(j.avgExecVar), } } diff --git a/node.go b/node.go index db9e0acd1..caac4f5f4 100644 --- a/node.go +++ b/node.go @@ -8,11 +8,16 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/timer" ) +const ( + averageExecTimeVarName = "avg_execution_time" +) + // A node that can be in an executor. type Node interface { pipeline.Node @@ -40,12 +45,10 @@ type Node interface { abortParentEdges() // executing dot - edot(buf *bytes.Buffer, execTime time.Duration) + edot(buf *bytes.Buffer) nodeStatsByGroup() map[models.GroupID]nodeStats - nodeExecTime() time.Duration - collectedCount() int64 } @@ -65,6 +68,9 @@ type node struct { outs []*Edge logger *log.Logger timer timer.Timer + statsKey string + statMap *expvar.Map + avgExecVar *expvar.MaxFloat } func (n *node) addParentEdge(e *Edge) { @@ -78,7 +84,16 @@ func (n *node) abortParentEdges() { } func (n *node) start(snapshot []byte) { - n.timer = n.et.tm.TimingService.NewTimer() + tags := map[string]string{ + "task": n.et.Task.Name, + "node": n.Name(), + "type": n.et.Task.Type.String(), + "kind": n.Desc(), + } + n.statsKey, n.statMap = NewStatistics("nodes", tags) + n.avgExecVar = &expvar.MaxFloat{} + n.statMap.Set(averageExecTimeVarName, n.avgExecVar) + n.timer = n.et.tm.TimingService.NewTimer(n.avgExecVar) n.errCh = make(chan error, 1) go func() { var err error @@ -108,7 +123,7 @@ func (n *node) stop() { if n.stopF != nil { n.stopF() } - + DeleteStatistics(n.statsKey) } // no-op snapshot @@ -167,17 +182,12 @@ func (n *node) closeChildEdges() { } } -func (n *node) nodeExecTime() time.Duration { - execTime, _ := n.timer.AverageTime() - return execTime -} - -func (n *node) edot(buf *bytes.Buffer, execTime time.Duration) { +func (n *node) edot(buf *bytes.Buffer) { buf.Write([]byte( fmt.Sprintf("\n%s [label=\"%s %v\"];\n", n.Name(), n.Name(), - execTime, + time.Duration(n.avgExecVar.Get()), ), )) for i, c := range n.children { diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 8069b966c..682828bd5 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -284,7 +284,7 @@ func (s *Service) linkSubscriptions() error { } } - kapacitor.NumSubscriptions.Set(numSubscriptions) + kapacitor.NumSubscriptionsVar.Set(numSubscriptions) return nil } diff --git a/services/reporting/service.go b/services/reporting/service.go index dcdb72768..45810c022 100644 --- a/services/reporting/service.go +++ b/services/reporting/service.go @@ -47,10 +47,10 @@ func (s *Service) Open() error { } // Populate published vars - s.clusterID = kapacitor.GetStringVar(kapacitor.ClusterIDVarName) - s.serverID = kapacitor.GetStringVar(kapacitor.ServerIDVarName) - s.hostname = kapacitor.GetStringVar(kapacitor.HostVarName) - s.version = kapacitor.GetStringVar(kapacitor.VersionVarName) + s.clusterID = kapacitor.ClusterIDVar.Get() + s.serverID = kapacitor.ServerIDVar.Get() + s.hostname = kapacitor.HostVar.Get() + s.version = kapacitor.VersionVar.Get() s.product = kapacitor.Product // Populate anonymous tags @@ -104,9 +104,9 @@ func (s *Service) sendUsageReport() error { // Add values data.Values[kapacitor.ClusterIDVarName] = s.clusterID data.Values[kapacitor.ServerIDVarName] = s.serverID - data.Values[kapacitor.NumTasksVarName] = kapacitor.GetIntVar(kapacitor.NumTasksVarName) - data.Values[kapacitor.NumEnabledTasksVarName] = kapacitor.GetIntVar(kapacitor.NumEnabledTasksVarName) - data.Values[kapacitor.NumSubscriptionsVarName] = kapacitor.GetIntVar(kapacitor.NumSubscriptionsVarName) + data.Values[kapacitor.NumTasksVarName] = kapacitor.NumTasksVar.Get() + data.Values[kapacitor.NumEnabledTasksVarName] = kapacitor.NumEnabledTasksVar.Get() + data.Values[kapacitor.NumSubscriptionsVarName] = kapacitor.NumSubscriptionsVar.Get() data.Values[kapacitor.UptimeVarName] = kapacitor.Uptime().Seconds() usage := client.Usage{ diff --git a/services/stats/service.go b/services/stats/service.go index 0f04ed120..44bb86338 100644 --- a/services/stats/service.go +++ b/services/stats/service.go @@ -29,6 +29,7 @@ import ( "time" "github.com/influxdata/kapacitor" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/timer" ) @@ -133,6 +134,6 @@ func (s *Service) reportStats() { } } -func (s *Service) NewTimer() timer.Timer { - return timer.New(s.timingSampleRate, s.timingMovingAvgSize) +func (s *Service) NewTimer(avgVar *expvar.MaxFloat) timer.Timer { + return timer.New(s.timingSampleRate, s.timingMovingAvgSize, avgVar) } diff --git a/services/task_store/service.go b/services/task_store/service.go index eb60ec547..e4acf9a76 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -177,8 +177,8 @@ func (ts *Service) Open() error { } // Set expvars - kapacitor.NumTasks.Set(numTasks) - kapacitor.NumEnabledTasks.Set(numEnabledTasks) + kapacitor.NumTasksVar.Set(numTasks) + kapacitor.NumEnabledTasksVar.Set(numEnabledTasks) return nil } @@ -484,7 +484,7 @@ func (ts *Service) Save(task *rawTask) error { return err } if !exists { - kapacitor.NumTasks.Add(1) + kapacitor.NumTasksVar.Add(1) } return nil }) @@ -500,7 +500,7 @@ func (ts *Service) Delete(name string) error { exists := tb.Get([]byte(name)) != nil if exists { tb.Delete([]byte(name)) - kapacitor.NumTasks.Add(-1) + kapacitor.NumTasksVar.Add(-1) } } eb := tx.Bucket(enabledBucket) @@ -573,7 +573,7 @@ func (ts *Service) Enable(name string) error { return err } if !enabled { - kapacitor.NumEnabledTasks.Add(1) + kapacitor.NumEnabledTasksVar.Add(1) } return nil }) @@ -654,7 +654,7 @@ func (ts *Service) Disable(name string) error { if err != nil { return err } - kapacitor.NumEnabledTasks.Add(-1) + kapacitor.NumEnabledTasksVar.Add(-1) } return nil }) diff --git a/task.go b/task.go index 0496c6b2f..15fad599d 100644 --- a/task.go +++ b/task.go @@ -310,7 +310,7 @@ func (et *ExecutingTask) EDot() []byte { )) et.walk(func(n Node) error { - n.edot(&buf, n.nodeExecTime()) + n.edot(&buf) return nil }) buf.Write([]byte("}")) diff --git a/task_master.go b/task_master.go index e210b1f10..c32a67b69 100644 --- a/task_master.go +++ b/task_master.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/services/httpd" @@ -91,7 +92,7 @@ type TaskMaster struct { Alert(title, text string) error } TimingService interface { - NewTimer() timer.Timer + NewTimer(*expvar.MaxFloat) timer.Timer } LogService LogService @@ -482,6 +483,6 @@ func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error) { type noOpTimingService struct{} -func (noOpTimingService) NewTimer() timer.Timer { +func (noOpTimingService) NewTimer(*expvar.MaxFloat) timer.Timer { return timer.NewNoOp() } diff --git a/timer/noop.go b/timer/noop.go index a8697cae2..d6f0935e6 100644 --- a/timer/noop.go +++ b/timer/noop.go @@ -1,15 +1,12 @@ package timer -import "time" - type noop struct { } -func (noop) Start() {} -func (noop) Pause() {} -func (noop) Resume() {} -func (noop) Stop() {} -func (noop) AverageTime() (time.Duration, int) { return 0, 1 } +func (noop) Start() {} +func (noop) Pause() {} +func (noop) Resume() {} +func (noop) Stop() {} func NewNoOp() Timer { return noop{} diff --git a/timer/timer.go b/timer/timer.go index 3cbe6d2d7..0e4ef5b13 100644 --- a/timer/timer.go +++ b/timer/timer.go @@ -2,8 +2,9 @@ package timer import ( "math/rand" - "sync" "time" + + "github.com/influxdata/kapacitor/expvar" ) type Timer interface { @@ -19,9 +20,6 @@ type Timer interface { // Stop the timer. // Timer must be started. Stop() - // Return average of timings and number of - // timings used to compute the average. - AverageTime() (time.Duration, int) } type timerState int @@ -40,12 +38,15 @@ type timer struct { current time.Duration avg *movavg state timerState + + avgVar *expvar.MaxFloat } -func New(sampleRate float64, movingAverageSize int) Timer { +func New(sampleRate float64, movingAverageSize int, avgVar *expvar.MaxFloat) Timer { return &timer{ sampleRate: sampleRate, avg: newMovAvg(movingAverageSize), + avgVar: avgVar, } } @@ -85,16 +86,10 @@ func (t *timer) Stop() { return } t.current += time.Now().Sub(t.start) - t.avg.update(float64(t.current)) + avg := t.avg.update(float64(t.current)) t.current = 0 t.state = Stopped -} - -// Return the average time in nanoseconds and -// the number of timings used to compute the average. -func (t *timer) AverageTime() (time.Duration, int) { - avg, count := t.avg.average() - return time.Duration(avg), count + t.avgVar.Set(avg) } // Maintains a moving average of values @@ -104,7 +99,6 @@ type movavg struct { idx int count int avg float64 - mu sync.RWMutex } func newMovAvg(size int) *movavg { @@ -115,30 +109,17 @@ func newMovAvg(size int) *movavg { } } -func (m *movavg) update(value float64) { - m.mu.Lock() - defer m.mu.Unlock() +func (m *movavg) update(value float64) float64 { + m.count++ n := float64(m.count) - if n == 0 { - m.count = 1 - m.avg = value - return - } - m.avg += (value - m.avg) / n m.idx = (m.idx + 1) % m.size - if m.count == m.size { + if m.count == m.size+1 { old := m.history[m.idx] m.avg = (n*m.avg - old) / (n - 1) - } else { - m.count++ + m.count-- } m.history[m.idx] = value -} - -func (m *movavg) average() (float64, int) { - m.mu.RLock() - defer m.mu.RUnlock() - return m.avg, m.count + return m.avg } diff --git a/timer/timer_test.go b/timer/timer_test.go new file mode 100644 index 000000000..95241b4f5 --- /dev/null +++ b/timer/timer_test.go @@ -0,0 +1,25 @@ +package timer + +import ( + "math" + "testing" +) + +func TestMovAvg(t *testing.T) { + count := 100 + ma := newMovAvg(count) + for i := 0; i < count; i++ { + avg := ma.update(1) + if got, exp := avg, 1.0; got != exp { + t.Fatalf("unexpected movavg: got %f, exp %f", got, exp) + } + } + c := float64(count) + for i := 0; i < count; i++ { + avg := ma.update(2) + f := float64(i + 1) + if got, exp := avg, ((c-f)+2.0*f)/c; math.Abs(got-exp) > 1e-8 { + t.Fatalf("unexpected movavg i: %d got %f, exp %f", i, got, exp) + } + } +} diff --git a/union.go b/union.go index 05cabffca..8fe46754c 100644 --- a/union.go +++ b/union.go @@ -2,7 +2,6 @@ package kapacitor import ( "log" - "time" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/timer" @@ -33,7 +32,7 @@ func (u *UnionNode) runUnion([]byte) error { } errors := make(chan error, len(u.ins)) for _, in := range u.ins { - t := u.et.tm.TimingService.NewTimer() + t := u.et.tm.TimingService.NewTimer(u.avgExecVar) u.timers = append(u.timers, t) go func(e *Edge, t timer.Timer) { switch u.Wants() { @@ -76,17 +75,3 @@ func (u *UnionNode) runUnion([]byte) error { } return nil } - -func (u *UnionNode) nodeExecTime() time.Duration { - sum := 0.0 - total := 0.0 - for _, t := range u.timers { - avg, count := t.AverageTime() - sum += float64(avg) * float64(count) - total += float64(count) - } - if total == 0 { - return 0 - } - return time.Duration(sum / total) -}