From 19d0ae1ab9e484ab4420b8f8901013506cb50800 Mon Sep 17 00:00:00 2001 From: Yosi Attias Date: Sat, 12 Mar 2016 19:45:49 +0200 Subject: [PATCH] Adding ExecutionStats to task for finding node stats and task stats. --- node.go | 28 ++++++++++++++++++++++++++++ services/task_store/service.go | 21 ++++++++++++++++----- task.go | 34 ++++++++++++++++++++++++++++++++++ task_master.go | 11 +++++++++++ 4 files changed, 89 insertions(+), 5 deletions(-) diff --git a/node.go b/node.go index ee4cab8977..8c9bcb262a 100644 --- a/node.go +++ b/node.go @@ -52,6 +52,10 @@ type Node interface { nodeStatsByGroup() map[models.GroupID]nodeStats collectedCount() int64 + + emittedCount() int64 + + stats() map[string]interface{} } //implementation of Node @@ -246,6 +250,30 @@ func (n *node) collectedCount() (count int64) { return } +func (n *node) emittedCount() (count int64) { + for _, out := range n.outs { + count += out.collectedCount() + } + return +} + +func (n *node) stats() map[string]interface{} { + stats := make(map[string]interface{}) + + n.statMap.Do(func(kv expvar.KeyValue) { + switch v := kv.Value.(type) { + case kexpvar.IntVar: + stats[kv.Key] = v.IntValue() + case kexpvar.FloatVar: + stats[kv.Key] = v.FloatValue() + default: + stats[kv.Key] = v.String() + } + }) + + return stats +} + // Statistics for a node type nodeStats struct { Fields models.Fields diff --git a/services/task_store/service.go b/services/task_store/service.go index 86bc426a78..91003585b2 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -49,6 +49,7 @@ type Service struct { StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error) StopTask(name string) error IsExecuting(name string) bool + ExecutionStats(name string) (kapacitor.ExecutionStats, error) ExecutingDot(name string, labels bool) string } @@ -684,11 +685,12 @@ func (ts *Service) Disable(name string) error { } type taskInfo struct { - Name string - Type kapacitor.TaskType - DBRPs []kapacitor.DBRP - Enabled bool - Executing bool + Name string + Type kapacitor.TaskType + DBRPs []kapacitor.DBRP + Enabled bool + Executing bool + ExecutionStats kapacitor.ExecutionStats } func (ts *Service) IsEnabled(name string) (e bool) { @@ -725,6 +727,15 @@ func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) { Enabled: enabled, Executing: ts.TaskMaster.IsExecuting(t.Name), } + + if info.Executing { + executionStats, err := ts.TaskMaster.ExecutionStats(t.Name) + if err != nil { + return fmt.Errorf("failed to fetch execution stats. name: %s, err: %s", t.Name, err) + } + info.ExecutionStats = executionStats + } + taskInfos = append(taskInfos, info) return nil } diff --git a/task.go b/task.go index 12ac95913b..d7bd374009 100644 --- a/task.go +++ b/task.go @@ -301,6 +301,40 @@ func (et *ExecutingTask) registerOutput(name string, o Output) { et.outputs[name] = o } +type ExecutionStats struct { + TaskStats map[string]interface{} + NodeStats map[string]map[string]interface{} +} + +func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error) { + executionStats := ExecutionStats{ + TaskStats: make(map[string]interface{}), + NodeStats: make(map[string]map[string]interface{}), + } + + // Fill the task stats + executionStats.TaskStats["throughput"] = et.getThroughput() + + // Fill the nodes stats + err := et.walk(func(node Node) error { + nodeStats := node.stats() + + // Add collected and emitted + nodeStats["collected"] = node.collectedCount() + nodeStats["emitted"] = node.emittedCount() + + executionStats.NodeStats[node.Name()] = nodeStats + + return nil + }) + + if err != nil { + return executionStats, err + } + + return executionStats, nil +} + // Return a graphviz .dot formatted byte array. // Label edges with relavant execution information. func (et *ExecutingTask) EDot(labels bool) []byte { diff --git a/task_master.go b/task_master.go index a852869d7c..8ea957cbe3 100644 --- a/task_master.go +++ b/task_master.go @@ -369,6 +369,17 @@ func (tm *TaskMaster) IsExecuting(name string) bool { return executing } +func (tm *TaskMaster) ExecutionStats(name string) (ExecutionStats, error) { + tm.mu.RLock() + defer tm.mu.RUnlock() + task, executing := tm.tasks[name] + if !executing { + return ExecutionStats{}, nil + } + + return task.ExecutionStats() +} + func (tm *TaskMaster) ExecutingDot(name string, labels bool) string { tm.mu.RLock() defer tm.mu.RUnlock()