Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete diagnostics support #3969

Merged
merged 3 commits into from
Sep 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions monitor/go_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ func (g *goRuntime) Statistics() (map[string]interface{}, error) {
}, nil
}

// Diagnostics returns the statistics for the goRuntime type
func (g *goRuntime) Diagnostics() (map[string]interface{}, error) {
return nil, nil
func (g *goRuntime) Diagnostics() (*Diagnostic, error) {
diagnostics := map[string]interface{}{
"GOARCH": runtime.GOARCH,
"GOOS": runtime.GOOS,
"GOMAXPROCS": runtime.GOMAXPROCS(-1),
"version": runtime.Version(),
}

return DiagnosticFromMap(diagnostics), nil
}
26 changes: 26 additions & 0 deletions monitor/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package monitor

import (
"os"
)

// network captures network statistics and implements the monitor client interface
type network struct{}

// Statistics returns the statistics for the network type
func (n *network) Statistics() (map[string]interface{}, error) {
return nil, nil
}

func (n *network) Diagnostics() (*Diagnostic, error) {
h, err := os.Hostname()
if err != nil {
return nil, err
}

diagnostics := map[string]interface{}{
"hostname": h,
}

return DiagnosticFromMap(diagnostics), nil
}
216 changes: 167 additions & 49 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,64 @@ import (

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)

const leaderWaitTimeout = 30 * time.Second

// Client is the interface modules must implement if they wish to register with monitor.
type Client interface {
// StatsClient is the interface modules must implement if they wish to register with monitor.
type StatsClient interface {
// Statistics returns a map of keys to values. Each Value must be either int64 or float64.
// Statistical information is written to an InfluxDB system if enabled.
Statistics() (map[string]interface{}, error)
Diagnostics() (map[string]interface{}, error)
}

// DiagsClient is the interface modules implement if they register diags with monitor.
type DiagsClient interface {
Diagnostics() (*Diagnostic, error)
}

// The DiagsClientFunc type is an adapter to allow the use of
// ordinary functions as Diagnostis clients.
type DiagsClientFunc func() (*Diagnostic, error)

// Diagnostics calls f().
func (f DiagsClientFunc) Diagnostics() (*Diagnostic, error) {
return f()
}

// Diagnostic represents a table of diagnostic information. The first value
// is the name of the columns, the second is a slice of interface slices containing
// the values for each column, by row. This information is never written to an InfluxDB
// system and is display-only. An example showing, say, connections follows:
//
// source_ip source_port dest_ip dest_port
// 182.1.0.2 2890 127.0.0.1 38901
// 174.33.1.2 2924 127.0.0.1 38902
type Diagnostic struct {
Columns []string
Rows [][]interface{}
}

func NewDiagnostic(columns []string) *Diagnostic {
return &Diagnostic{
Columns: columns,
Rows: make([][]interface{}, 0),
}
}

func (d *Diagnostic) AddRow(r []interface{}) {
d.Rows = append(d.Rows, r)
}

// Monitor represents an instance of the monitor system.
type Monitor struct {
wg sync.WaitGroup
done chan struct{}
mu sync.Mutex
registrations []*clientWithMeta
wg sync.WaitGroup
done chan struct{}
mu sync.Mutex

statRegistrations []*clientWithMeta
diagRegistrations map[string]DiagsClient

storeEnabled bool
storeDatabase string
Expand All @@ -50,12 +92,13 @@ type Monitor struct {
// New returns a new instance of the monitor system.
func New(c Config) *Monitor {
return &Monitor{
done: make(chan struct{}),
registrations: make([]*clientWithMeta, 0),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
done: make(chan struct{}),
statRegistrations: make([]*clientWithMeta, 0),
diagRegistrations: make(map[string]DiagsClient),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
}
}

Expand All @@ -64,8 +107,12 @@ func New(c Config) *Monitor {
func (m *Monitor) Open() error {
m.Logger.Printf("Starting monitor system")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a nit, and I'm not sure how to get around this yet, for testing, you can't silence this as it will "Open" from the server before you can call "SetLogger" on it to null it out. I get a lot of chatter in my test output because of it. I would like to find a better way to do this.


// Self-register Go runtime statm.
m.Register("runtime", nil, &goRuntime{})
// Self-register various stats and diagnostics.
gr := &goRuntime{}
m.RegisterStatsClient("runtime", nil, gr)
m.RegisterDiagnosticsClient("runtime", gr)
m.RegisterDiagnosticsClient("network", &network{})
m.RegisterDiagnosticsClient("system", &system{})

// If enabled, record stats in a InfluxDB system.
if m.storeEnabled {
Expand All @@ -92,16 +139,49 @@ func (m *Monitor) SetLogger(l *log.Logger) {
}

// Register registers a client with the given name and tags.
func (m *Monitor) Register(name string, tags map[string]string, client Client) error {
func (m *Monitor) RegisterStatsClient(name string, tags map[string]string, client StatsClient) error {
m.mu.Lock()
defer m.mu.Unlock()

a := tags
if a == nil {
a = make(map[string]string)
}

// Get cluster-level metadata to supplement stats.
var clusterID string
var hostname string
var err error
if cID, err := m.MetaStore.ClusterID(); err != nil {
m.Logger.Printf("failed to determine cluster ID: %s", err)
} else {
clusterID = strconv.FormatUint(cID, 10)
}
nodeID := strconv.FormatUint(m.MetaStore.NodeID(), 10)
if hostname, err = os.Hostname(); err != nil {
m.Logger.Printf("failed to determine hostname: %s", err)
}
a["clusterID"] = clusterID
a["nodeID"] = nodeID
a["hostname"] = hostname

c := &clientWithMeta{
Client: client,
name: name,
tags: tags,
StatsClient: client,
name: name,
tags: a,
}
m.registrations = append(m.registrations, c)
m.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags)

m.statRegistrations = append(m.statRegistrations, c)
m.Logger.Printf(`'%s:%v' registered for statistics monitoring`, name, tags)
return nil
}

// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
return nil
}

Expand All @@ -110,9 +190,9 @@ func (m *Monitor) Statistics() ([]*statistic, error) {
m.mu.Lock()
defer m.mu.Unlock()

statistics := make([]*statistic, 0, len(m.registrations))
for _, r := range m.registrations {
stats, err := r.Client.Statistics()
statistics := make([]*statistic, 0, len(m.statRegistrations))
for _, r := range m.statRegistrations {
stats, err := r.StatsClient.Statistics()
if err != nil {
continue
}
Expand All @@ -127,14 +207,24 @@ func (m *Monitor) Statistics() ([]*statistic, error) {
return statistics, nil
}

func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
m.mu.Lock()
defer m.mu.Unlock()

diags := make(map[string]*Diagnostic, len(m.diagRegistrations))
for k, v := range m.diagRegistrations {
d, err := v.Diagnostics()
if err != nil {
continue
}
diags[k] = d
}
return diags, nil
}

// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
// XXX add tags such as local hostname and cluster ID
//a.Tags["clusterID"] = strconv.FormatUint(m.clusterID, 10)
//a.Tags["nodeID"] = strconv.FormatUint(m.nodeID, 10)
//a.Tags["hostname"] = m.hostname
defer m.wg.Done()

m.Logger.Printf("storing statistics in database '%s', interval %s",
m.storeDatabase, m.storeInterval)

Expand All @@ -154,7 +244,26 @@ func (m *Monitor) storeStatistics() {
for {
select {
case <-tick.C:
// Write stats here.
stats, err := m.Statistics()
if err != nil {
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
continue
}

points := make(tsdb.Points, 0, len(stats))
for _, s := range stats {
points = append(points, tsdb.NewPoint(s.Name, s.Tags, s.Values, time.Now()))
}

err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: m.storeDatabase,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
})
if err != nil {
m.Logger.Printf("failed to store statistics: %s", err)
}
case <-m.done:
m.Logger.Printf("terminating storage of statistics")
return
Expand All @@ -170,16 +279,11 @@ type statistic struct {
Values map[string]interface{}
}

// newStatistic returns a new statistic object. It ensures that tags are always non-nil.
// newStatistic returns a new statistic object.
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
a := tags
if a == nil {
a = make(map[string]string)
}

return &statistic{
Name: name,
Tags: a,
Tags: tags,
Values: values,
}
}
Expand All @@ -206,24 +310,24 @@ func (s *statistic) valueNames() []string {

// clientWithMeta wraps a registered client with its associated name and tagm.
type clientWithMeta struct {
Client
StatsClient
name string
tags map[string]string
}

// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for
// use by external packages that just record stats in an expvar.Map type.
type MonitorClient struct {
// StatsMonitorClient wraps a *expvar.Map so that it implements the StatsClient interface.
// It is for use by external packages that just record stats in an expvar.Map type.
type StatsMonitorClient struct {
ep *expvar.Map
}

// NewMonitorClient returns a new MonitorClient using the given expvar.Map.
func NewMonitorClient(ep *expvar.Map) *MonitorClient {
return &MonitorClient{ep: ep}
// NewStatsMonitorClient returns a new StatsMonitorClient using the given expvar.Map.
func NewStatsMonitorClient(ep *expvar.Map) *StatsMonitorClient {
return &StatsMonitorClient{ep: ep}
}

// Statistics implements the Client interface for a MonitorClient.
func (m MonitorClient) Statistics() (map[string]interface{}, error) {
// Statistics implements the Client interface for a StatsMonitorClient.
func (m StatsMonitorClient) Statistics() (map[string]interface{}, error) {
values := make(map[string]interface{})
m.ep.Do(func(kv expvar.KeyValue) {
var f interface{}
Expand All @@ -248,7 +352,21 @@ func (m MonitorClient) Statistics() (map[string]interface{}, error) {
return values, nil
}

// Diagnostics implements the Client interface for a MonitorClient.
func (m MonitorClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil
// DiagnosticFromMap returns a Diagnostic from a map.
func DiagnosticFromMap(m map[string]interface{}) *Diagnostic {
// Display columns in deterministic order.
sortedKeys := make([]string, 0, len(m))
for k, _ := range m {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)

d := NewDiagnostic(sortedKeys)
row := make([]interface{}, len(sortedKeys))
for i, k := range sortedKeys {
row[i] = m[k]
}
d.AddRow(row)

return d
}
11 changes: 7 additions & 4 deletions monitor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@ func Test_RegisterStats(t *testing.T) {
}

// Register a client without tags.
if err := monitor.Register("foo", nil, client); err != nil {
if err := monitor.RegisterStatsClient("foo", nil, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
json := executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) || !strings.Contains(json, `"name":"foo"`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
}

// Register a client with tags.
if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil {
if err := monitor.RegisterStatsClient("baz", map[string]string{"proto": "tcp"}, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
json = executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) ||
!strings.Contains(json, `"name":"baz"`) ||
!strings.Contains(json, `"proto":"tcp"`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)

}
}

Expand Down
Loading