Skip to content

Commit

Permalink
stats: allow statistics maps to quietly fade away when the object the…
Browse files Browse the repository at this point in the history
…y refer to is closed

As per influxdata#5784, there is a risk that a long running influxd process will accumulate a lot of statistics
that describe objects that have already been closed.

This is a concern primarily because of the amount of IO and CPU involved with continuing to publish the last will and testament of an object long since departed.

This series address the issue by pinching some code from Kapacitor - thanks @nathanielc - and using
it as a top level name space for influx statistics objects. The maps are reference counted. One reference is allocated to the described object, another to the monitor.

When the described object is closed, it yells DeleteStatistics! at the influxdb package which takes note of the request, but otherwise ignores it. When the monitor is doing its thing, it quietly peeks at the statistics map's reference count and if it detects that its number is up (well, down really), it yells DeleteStatistics! at the influxdb package who takes notice of it this time. This deferral of the grim reaper is required so that the monitor has time to tidy up the paper work and confirm that the undertaker's cheque has not bounced.

Signed-off-by: Jon Seymour <jon@wildducktheories.com>
  • Loading branch information
jonseymour committed Feb 25, 2016
1 parent a2f2443 commit 5233c12
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 7 deletions.
50 changes: 48 additions & 2 deletions influxvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@ package influxdb

import (
"expvar"
"strconv"
"sync"

expvar2 "github.com/influxdata/influxdb/stats/expvar"
)

var expvarMu sync.Mutex
var root *expvar2.Map // a name space for all statistics maps

func init() {
root = &expvar2.Map{}
root.Init()
expvar.Publish("influx", root)
}

// NewStatistics returns an expvar-based map with the given key. Within that map
// is another map. Within there "name" is the Measurement name, "tags" are the tags,
Expand All @@ -16,8 +26,10 @@ func NewStatistics(key, name string, tags map[string]string) *expvar.Map {

// Add expvar for this service.
var v expvar.Var
if v = expvar.Get(key); v == nil {
v = expvar.NewMap(key)
if v = root.Get(key); v == nil {
v = &expvar.Map{}
v.(*expvar.Map).Init()
root.Set(key, v)
}
m := v.(*expvar.Map)

Expand All @@ -41,5 +53,39 @@ func NewStatistics(key, name string, tags map[string]string) *expvar.Map {
statMap.Init()
m.Set("values", statMap)

// Create a reference counter that we can use to release the map.
referencesVar := &expvar.Int{}
referencesVar.Set(2) // once for the site of usage, once for the monitor
m.Set("references", referencesVar)
return statMap
}

// Iterate over all the statistics maps.
func DoStatistics(fn func(expvar.KeyValue)) {
root.Do(fn)
}

// Used to deregister a statistic when it is no longer needed.
func DeleteStatistics(key string) {
expvarMu.Lock()
defer expvarMu.Unlock()

v := root.Get(key)
if v == nil {
return
}

m := v.(*expvar.Map)

if countVar := m.Get("references"); countVar == nil {
root.Delete(key) // if this happens, start over
return
} else {
countVar.(*expvar.Int).Add(-1)
countText := countVar.String()
count, _ := strconv.ParseInt(countText, 10, 32)
if count <= 0 {
root.Delete(key)
}
}
}
63 changes: 63 additions & 0 deletions influxvar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package influxdb_test

import (
"expvar"
"testing"

"github.com/influxdata/influxdb"
)

func TestEmptyStatistics(t *testing.T) {
found := make([]expvar.KeyValue, 0)
influxdb.DoStatistics(func(kv expvar.KeyValue) {
found = append(found, kv)
})

if length := len(found); length != 0 {
t.Fatalf("non empty initial state. got %d, expected: %d", length, 0)
}
}

// Test that we can create one statistic and that it disappears after it is deleted twice.
func TestOneStatistic(t *testing.T) {

foo := influxdb.NewStatistics("foo", "bar", map[string]string{"tag": "T"})

found := make([]expvar.KeyValue, 0)
influxdb.DoStatistics(func(kv expvar.KeyValue) {
found = append(found, kv)
})

if len(found) != 1 {
t.Fatalf("enumeration error after do. length of slice: got %d, expected %d", len(found), 1)
}
if m, ok := found[0].Value.(*expvar.Map); !ok {
t.Fatalf("value of found object got: %v, expected: a map", found[0].Value)
} else {
if fooActual := m.Get("values"); fooActual != foo {
t.Fatalf("failed to obtain expected map. got: %v, expected: %v", fooActual, foo)
}
}

influxdb.DeleteStatistics("foo")

found = make([]expvar.KeyValue, 0)
influxdb.DoStatistics(func(kv expvar.KeyValue) {
found = append(found, kv)
})

if length := len(found); length != 1 {
t.Fatalf("failed to find expected number of objects. got: %d, expected: 1", length)
}

influxdb.DeleteStatistics("foo")

found = make([]expvar.KeyValue, 0)
influxdb.DoStatistics(func(kv expvar.KeyValue) {
found = append(found, kv)
})

if length := len(found); length != 0 {
t.Fatalf("failed to find expected number of objects. got: %d, expected: 0", length)
}
}
12 changes: 7 additions & 5 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,7 @@ func (m *Monitor) DeregisterDiagnosticsClient(name string) {
func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
var statistics []*Statistic

expvar.Do(func(kv expvar.KeyValue) {
// Skip built-in expvar stats.
if kv.Key == "memstats" || kv.Key == "cmdline" {
return
}
influxdb.DoStatistics(func(kv expvar.KeyValue) {

statistic := &Statistic{
Tags: make(map[string]string),
Expand Down Expand Up @@ -200,6 +196,12 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
}
statistic.Values[kv.Key] = f
})
case "references":
s := subKV.Value.(*expvar.Int).String()
count, _ := strconv.ParseInt(s, 10, 32)
if count < 2 {
influxdb.DeleteStatistics(kv.Key)
}
}
})

Expand Down

0 comments on commit 5233c12

Please sign in to comment.