Skip to content

Commit

Permalink
fix: removing the logic of collecting metric per each node (#26)
Browse files Browse the repository at this point in the history
* fix: removing the logic of collecting metric per each node
  • Loading branch information
silenceqi authored Dec 24, 2024
1 parent 53be218 commit 0b24918
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 55 deletions.
1 change: 1 addition & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Information about release notes of INFINI Framework is provided here.
- Update elastic metadata safely (#20)
- Fixed the issue that the metadata does not take effect immediately after the cluster changes to available (#23)
- Enable skipping to the next file with multiple gaps #22
- Removing the logic of collecting metric per each node (#26)


### Improvements
Expand Down
75 changes: 20 additions & 55 deletions modules/metrics/elastic/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,6 @@ func (m *ElasticsearchMetric) InitialCollectTask(k string, v *elastic.Elasticsea
if err != nil {
log.Debug(v.Config.Name, " get shards info error: ", err)
}
if v.Config.Discovery.Enabled && v.Nodes == nil {
if global.Env().IsDebug {
log.Debugf("elasticsearch: %v - %v, no nodes info was found, skip nodes metrics collect", k, v.Config.Name)
}
return
}
shardInfos := map[string]map[string]interface{}{}
indexInfos := map[string]map[string]bool{}
for _, item := range shards {
Expand All @@ -300,61 +294,29 @@ func (m *ElasticsearchMetric) InitialCollectTask(k string, v *elastic.Elasticsea
shardInfos[item.NodeID]["shards"] = append(shardInfos[item.NodeID]["shards"].([]interface{}), item)
indexInfos[item.NodeID][item.Index] = true
}
if len(shardInfos) > 50 || len(shards) > 5000 {
log.Warnf("cluster [%v] has over 50 nodes or 5000 shards. use the agent for metrics collection: https://github.com/infinilabs/agent.", v.Config.Name)
}

//get node stats per each node
if v.Nodes != nil {
for nodeID, y := range *v.Nodes {
//get node level stats
nodeHost := y.GetHttpPublishHost()

var host string
//published host is not a valid host
if elastic.IsHostDead(nodeHost) {
host = v.GetActivePreferredHost(nodeHost)
} else {
//the node is online
if elastic.IsHostAvailable(nodeHost) {
host = nodeHost
} else {
//host not dead and is not available, skip collecting
log.Debugf("host [%v] is not available, skip metrics collecting", nodeHost)
continue
host := v.GetActiveHost()
//published host is not a valid host
if host != "" && !elastic.IsHostDead(host) && elastic.IsHostAvailable(host) {
//host not dead and is not available, skip collecting
stats := client.GetNodesStats("", host,"")
if stats.ErrorObject != nil {
log.Errorf("error on get node stats: %v %v", host, stats.ErrorObject)
} else {
for nodeID, nodeStats := range stats.Nodes {
if _, ok := shardInfos[nodeID]; ok {
shardInfos[nodeID]["indices_count"] = len(indexInfos[nodeID])
}
m.SaveNodeStats(v, nodeID, nodeStats, shardInfos[nodeID])
}
log.Debugf("collect nodes stats, endpoint: %s", host)
stats := client.GetNodesStats(nodeID, host,"")

log.Trace(y.GetHttpPublishHost(), " => ", host, stats.ErrorObject)

if stats.ErrorObject != nil {
log.Errorf("get node stats of %s error: %v", y.Name, stats.ErrorObject)
continue
}
if _, ok := shardInfos[nodeID]; ok {
shardInfos[nodeID]["indices_count"] = len(indexInfos[nodeID])
}
m.SaveNodeStats(v, nodeID, stats.Nodes[nodeID], shardInfos[nodeID])
}
} else {
host := v.GetActiveHost()
//published host is not a valid host
if host != "" && !elastic.IsHostDead(host) && elastic.IsHostAvailable(host) {
//host not dead and is not available, skip collecting
stats := client.GetNodesStats("", host,"")
if stats.ErrorObject != nil {
log.Errorf("error on get node stats: %v %v", host, stats.ErrorObject)
} else {
for nodeID, nodeStats := range stats.Nodes {
if _, ok := shardInfos[nodeID]; ok {
shardInfos[nodeID]["indices_count"] = len(indexInfos[nodeID])
}
m.SaveNodeStats(v, nodeID, nodeStats, shardInfos[nodeID])
}
}
} else {
log.Debugf("host [%v] is not available, skip metrics collecting", host)
}
log.Debugf("host [%v] is not available, skip metrics collecting", host)
}

},
}
taskID := task.RegisterScheduleTask(nodeStatsMetricTask)
Expand Down Expand Up @@ -385,6 +347,9 @@ func (m *ElasticsearchMetric) InitialCollectTask(k string, v *elastic.Elasticsea
log.Debug(v.Config.Name, " get shards info error: ", err)
//return true
}
if (v.Health != nil && v.Health.NumberOfNodes > 50) || len(shards) > 5000 {
log.Warnf("cluster [%v] has over 50 nodes or 5000 shards. use the agent for metrics collection: https://github.com/infinilabs/agent.", v.Config.Name)
}
indexStats, err := client.GetStats()
if err != nil {
log.Error(v.Config.Name, " get indices stats error: ", err)
Expand Down

0 comments on commit 0b24918

Please sign in to comment.