Skip to content

Commit

Permalink
expvar refactored to allow getting and deleting
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 22, 2016
1 parent c39d89f commit 0aab1d7
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 242 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

### Release Notes

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal stats now includes a measurement `node` that contains an averaged execution time for the node, tagged by the task, node, task type and kind of node (i.e. window vs union).
These stats are also available in the DOT output of the Kapacitor show command.


### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.


### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

// Do not add the source batch node to the dot output
// since its not really an edge.
func (s *SourceBatchNode) edot(*bytes.Buffer, time.Duration) {}
func (s *SourceBatchNode) edot(*bytes.Buffer) {}

func (s *SourceBatchNode) collectedCount() (count int64) {
for _, child := range s.children {
Expand Down
32 changes: 5 additions & 27 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package run

import (
"errors"
"expvar"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -45,27 +44,6 @@ import (
const clusterIDFilename = "cluster.id"
const serverIDFilename = "server.id"

var (
//Published vars
cidVar = &expvar.String{}

sidVar = &expvar.String{}

hostVar = &expvar.String{}

productVar = &expvar.String{}

versionVar = &expvar.String{}
)

func init() {
expvar.Publish(kapacitor.ClusterIDVarName, cidVar)
expvar.Publish(kapacitor.ServerIDVarName, sidVar)
expvar.Publish(kapacitor.HostVarName, hostVar)
expvar.Publish(kapacitor.ProductVarName, productVar)
expvar.Publish(kapacitor.VersionVarName, versionVar)
}

// BuildInfo represents the build details for the server code.
type BuildInfo struct {
Version string
Expand Down Expand Up @@ -414,11 +392,11 @@ func (s *Server) Open() error {
}

// Set published vars
cidVar.Set(s.ClusterID)
sidVar.Set(s.ServerID)
hostVar.Set(s.hostname)
productVar.Set(kapacitor.Product)
versionVar.Set(s.buildInfo.Version)
kapacitor.ClusterIDVar.Set(s.ClusterID)
kapacitor.ServerIDVar.Set(s.ServerID)
kapacitor.HostVar.Set(s.hostname)
kapacitor.ProductVar.Set(kapacitor.Product)
kapacitor.VersionVar.Set(s.buildInfo.Version)
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start profiling, if set.
Expand Down
7 changes: 5 additions & 2 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package kapacitor

import (
"errors"
"expvar"
"fmt"
"log"
"strconv"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -36,6 +36,7 @@ type Edge struct {

logger *log.Logger
aborted chan struct{}
statsKey string
statMap *expvar.Map
groupMu sync.RWMutex
groupStats map[models.GroupID]*edgeStat
Expand All @@ -48,10 +49,11 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer
"child": childName,
"type": t.String(),
}
sm := NewStatistics("edges", tags)
key, sm := NewStatistics("edges", tags)
sm.Add(statCollected, 0)
sm.Add(statEmitted, 0)
e := &Edge{
statsKey: key,
statMap: sm,
aborted: make(chan struct{}),
groupStats: make(map[models.GroupID]*edgeStat),
Expand Down Expand Up @@ -125,6 +127,7 @@ func (e *Edge) Close() {
if e.reduce != nil {
close(e.reduce)
}
DeleteStatistics(e.statsKey)
}

// Abort all next and collect calls.
Expand Down
245 changes: 245 additions & 0 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// This package is a fork of the golang expvar expvar.Var types.
// Adding extra support for deleting and accessing raw typed values.
package expvar

import (
"bytes"
"expvar"
"fmt"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
)

// Int is a 64-bit integer variable that satisfies the expvar.Var interface.
type Int struct {
i int64
}

func (v *Int) String() string {
return strconv.FormatInt(v.Get(), 10)
}

func (v *Int) Add(delta int64) {
atomic.AddInt64(&v.i, delta)
}

func (v *Int) Set(value int64) {
atomic.StoreInt64(&v.i, value)
}

func (v *Int) Get() int64 {
return atomic.LoadInt64(&v.i)
}

// Float is a 64-bit float variable that satisfies the expvar.Var interface.
type Float struct {
f uint64
}

func (v *Float) String() string {
return strconv.FormatFloat(v.Get(), 'g', -1, 64)
}

func (v *Float) Get() float64 {
return math.Float64frombits(atomic.LoadUint64(&v.f))
}

// Add adds delta to v.
func (v *Float) Add(delta float64) {
for {
cur := atomic.LoadUint64(&v.f)
curVal := math.Float64frombits(cur)
nxtVal := curVal + delta
nxt := math.Float64bits(nxtVal)
if atomic.CompareAndSwapUint64(&v.f, cur, nxt) {
return
}
}
}

// Set sets v to value.
func (v *Float) Set(value float64) {
atomic.StoreUint64(&v.f, math.Float64bits(value))
}

// MaxFloat is a 64-bit float variable that satisfies the expvar.Var interface.
// When setting a value it will only be set if it is greater than the current value.
type MaxFloat struct {
f uint64
}

func (v *MaxFloat) String() string {
return strconv.FormatFloat(v.Get(), 'g', -1, 64)
}

func (v *MaxFloat) Get() float64 {
return math.Float64frombits(atomic.LoadUint64(&v.f))
}

// Set sets v to value.
func (v *MaxFloat) Set(value float64) {
nxtBits := math.Float64bits(value)
for {
curBits := atomic.LoadUint64(&v.f)
cur := math.Float64frombits(curBits)
if value > cur {
if atomic.CompareAndSwapUint64(&v.f, curBits, nxtBits) {
return
}
} else {
return
}
}
}

// Map is a string-to-expvar.Var map variable that satisfies the expvar.Var interface.
type Map struct {
mu sync.RWMutex
m map[string]expvar.Var
keys []string // sorted
}

func (v *Map) String() string {
v.mu.RLock()
defer v.mu.RUnlock()
var b bytes.Buffer
fmt.Fprintf(&b, "{")
first := true
v.doLocked(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(&b, ", ")
}
fmt.Fprintf(&b, "%q: %v", kv.Key, kv.Value)
first = false
})
fmt.Fprintf(&b, "}")
return b.String()
}

func (v *Map) Init() *Map {
v.m = make(map[string]expvar.Var)
return v
}

// updateKeys updates the sorted list of keys in v.keys.
// must be called with v.mu held.
func (v *Map) updateKeys() {
if len(v.m) == len(v.keys) {
// No new key.
return
}
v.keys = v.keys[:0]
for k := range v.m {
v.keys = append(v.keys, k)
}
sort.Strings(v.keys)
}

func (v *Map) Get(key string) expvar.Var {
v.mu.RLock()
defer v.mu.RUnlock()
return v.m[key]
}

func (v *Map) Set(key string, av expvar.Var) {
v.mu.Lock()
defer v.mu.Unlock()
v.m[key] = av
v.updateKeys()
}

func (v *Map) Delete(key string) {
v.mu.Lock()
defer v.mu.Unlock()
delete(v.m, key)
v.updateKeys()
}

func (v *Map) Add(key string, delta int64) {
v.mu.RLock()
av, ok := v.m[key]
v.mu.RUnlock()
if !ok {
// check again under the write lock
v.mu.Lock()
av, ok = v.m[key]
if !ok {
av = new(Int)
v.m[key] = av
v.updateKeys()
}
v.mu.Unlock()
}

// Add to Int; ignore otherwise.
if iv, ok := av.(*Int); ok {
iv.Add(delta)
}
}

// AddFloat adds delta to the *Float value stored under the given map key.
func (v *Map) AddFloat(key string, delta float64) {
v.mu.RLock()
av, ok := v.m[key]
v.mu.RUnlock()
if !ok {
// check again under the write lock
v.mu.Lock()
av, ok = v.m[key]
if !ok {
av = new(Float)
v.m[key] = av
v.updateKeys()
}
v.mu.Unlock()
}

// Add to Float; ignore otherwise.
if iv, ok := av.(*Float); ok {
iv.Add(delta)
}
}

// Do calls f for each entry in the map.
// The map is locked during the iteration,
// but existing entries may be concurrently updated.
func (v *Map) Do(f func(expvar.KeyValue)) {
v.mu.RLock()
defer v.mu.RUnlock()
v.doLocked(f)
}

// doLocked calls f for each entry in the map.
// v.mu must be held for reads.
func (v *Map) doLocked(f func(expvar.KeyValue)) {
for _, k := range v.keys {
f(expvar.KeyValue{k, v.m[k]})
}
}

// String is a string variable, and satisfies the expvar.Var interface.
type String struct {
mu sync.RWMutex
s string
}

func (v *String) String() string {
v.mu.RLock()
defer v.mu.RUnlock()
return strconv.Quote(v.s)
}

func (v *String) Set(value string) {
v.mu.Lock()
defer v.mu.Unlock()
v.s = value
}

func (v *String) Get() string {
v.mu.RLock()
defer v.mu.RUnlock()
return v.s
}
Loading

0 comments on commit 0aab1d7

Please sign in to comment.