Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage of elasticsearch.index metricset #16538

Merged
merged 12 commits into from
Mar 17, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix imports after PR was merged before rebase. {pull}16756[16756]
- Add dashboard for `redisenterprise` module. {pull}16752[16752]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]
- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538]

*Packetbeat*

Expand Down
216 changes: 119 additions & 97 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,96 +27,117 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/v7/metricbeat/helper/elastic"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)

var (
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
xpackSchema = s.Schema{
"uuid": c.Str("uuid"),
"primaries": c.Dict("primaries", indexStatsSchema),
"total": c.Dict("total", indexStatsSchema),
}
errParse = errors.New("failure parsing Indices Stats Elasticsearch API response")
)

indexStatsSchema = s.Schema{
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
}),
"fielddata": c.Dict("fielddata", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
}),
"indexing": c.Dict("indexing", s.Schema{
"index_total": c.Int("index_total"),
"index_time_in_millis": c.Int("index_time_in_millis"),
"throttle_time_in_millis": c.Int("throttle_time_in_millis"),
}),
"merges": c.Dict("merges", s.Schema{
"total_size_in_bytes": c.Int("total_size_in_bytes"),
}),
"query_cache": c.Dict("query_cache", cacheStatsSchema),
"request_cache": c.Dict("request_cache", cacheStatsSchema),
"search": c.Dict("search", s.Schema{
"query_total": c.Int("query_total"),
"query_time_in_millis": c.Int("query_time_in_millis"),
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory_in_bytes": c.Int("memory_in_bytes"),
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),
"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
}),
"store": c.Dict("store", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"refresh": c.Dict("refresh", s.Schema{
"external_total_time_in_millis": c.Int("external_total_time_in_millis", s.Optional),
"total_time_in_millis": c.Int("total_time_in_millis"),
}),
}
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
type stats struct {
Indices map[string]index `json:"indices"`
}

cacheStatsSchema = s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
"hit_count": c.Int("hit_count"),
"miss_count": c.Int("miss_count"),
}
)
type index struct {
UUID string `json:"uuid"`
Primaries indexStats `json:"primaries"`
Total indexStats `json:"total"`

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var indicesStruct IndicesStruct
if err := parseAPIResponse(content, &indicesStruct); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}
Index string `json:"index"`
Created int64 `json:"created"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
}

type indexStats struct {
Docs struct {
Count int `json:"count"`
} `json:"docs"`
FieldData struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
} `json:"fielddata"`
Indexing struct {
IndexTotal int `json:"index_total"`
IndexTimeInMillis int `json:"index_time_in_millis"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
} `json:"indexing"`
Merges struct {
TotalSizeInBytes int `json:"total_size_in_bytes"`
} `json:"merges"`
QueryCache cacheStats `json:"query_stats"`
RequestCache cacheStats `json:"request_cache"`
Search struct {
QueryTotal int `json:"query_total"`
QueryTimeInMillis int `json:"query_time_in_millis"`
} `json:"search"`
Segments struct {
Count int `json:"count"`
MemoryInBytes int `json:"memory_in_bytes"`
TermsMemoryInBytes int `json:"terms_memory_in_bytes"`
StoredFieldsMemoryInBytes int `json:"stored_fields_memory_in_bytes"`
TermVectorsMemoryInBytes int `json:"term_vectors_memory_in_bytes"`
NormsMemoryInBytes int `json:"norms_memory_in_bytes"`
PointsMemoryInBytes int `json:"points_memory_in_bytes"`
DocValuesMemoryInBytes int `json:"doc_values_memory_in_bytes"`
IndexWriterMemoryInBytes int `json:"index_writer_memory_in_bytes"`
VersionMapMemoryInBytes int `json:"version_map_memory_in_bytes"`
FixedBitSetMemoryInBytes int `json:"fixed_bit_set_memory_in_bytes"`
} `json:"segments"`
Store struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"store"`
Refresh struct {
ExternalTotalTimeInMillis int `json:"external_total_time_in_millis"`
TotalTimeInMillis int `json:"total_time_in_millis"`
} `json:"refresh"`
}

type cacheStats struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
HitCount int `json:"hit_count"`
MissCount int `json:"miss_count"`
}

type shardStats struct {
Total int `json:"total"`
Primaries int `json:"primaries"`
Replicas int `json:"replicas"`

ActiveTotal int `json:"active_total"`
ActivePrimaries int `json:"active_primaries"`
ActiveReplicas int `json:"active_replicas"`

UnassignedTotal int `json:"unassigned_total"`
UnassignedPrimaries int `json:"unassigned_primaries"`
UnassignedReplicas int `json:"unassigned_replicas"`

Initializing int `json:"initializing"`
Relocating int `json:"relocationg"`
}

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
return errors.Wrap(err, "failure retrieving cluster state from Elasticsearch")
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}

var errs multierror.Errors
for name, index := range indicesStruct.Indices {
for name, idx := range indicesStats.Indices {
event := mb.Event{}
indexStats, err := xpackSchema.Apply(index)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure applying index stats schema"))
continue
}
indexStats["index"] = name
idx.Index = name

err = addClusterStateFields(name, indexStats, clusterState)
err = addClusterStateFields(&idx, clusterState)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure adding cluster state fields"))
continue
Expand All @@ -127,29 +148,33 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "index_stats",
"index_stats": indexStats,
"index_stats": idx,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)
}

return errs.Err()
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
if errs != nil {
return errs.Err()
}

return nil
}

func parseAPIResponse(content []byte, indicesStruct *IndicesStruct) error {
return json.Unmarshal(content, indicesStruct)
func parseAPIResponse(content []byte, indicesStats *stats) error {
return json.Unmarshal(content, indicesStats)
}

// Fields added here are based on same fields being added by internal collection in
// https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L62-L124
func addClusterStateFields(indexName string, indexStats, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, indexName, "metadata")
func addClusterStateFields(idx *index, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, idx.Index, "metadata")
if err != nil {
return errors.Wrap(err, "failed to get index metadata from cluster state")
}

indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, indexName, "routing_table")
indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, idx.Index, "routing_table")
if err != nil {
return errors.Wrap(err, "failed to get index routing table from cluster state")
}
Expand All @@ -163,7 +188,7 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index creation time")
}
indexStats.Put("created", created)
idx.Created = created

// "index_stats.version.created", <--- don't think this is being used in the UI, so can we skip it?
// "index_stats.version.upgraded", <--- don't think this is being used in the UI, so can we skip it?
Expand All @@ -172,13 +197,13 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index status")
}
indexStats.Put("status", status)
idx.Status = status

shardStats, err := getIndexShardStats(shards)
if err != nil {
return errors.Wrap(err, "failed to get index shard stats")
}
indexStats.Put("shards", shardStats)
idx.Shards = *shardStats
return nil
}

Expand Down Expand Up @@ -241,7 +266,7 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
return "red", nil
}

func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
func getIndexShardStats(shards common.MapStr) (*shardStats, error) {
primaries := 0
replicas := 0

Expand Down Expand Up @@ -298,21 +323,18 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
}
}

return common.MapStr{
"total": primaries + replicas,
"primaries": primaries,
"replicas": replicas,

"active_total": activePrimaries + activeReplicas,
"active_primaries": activePrimaries,
"active_replicas": activeReplicas,

"unassigned_total": unassignedPrimaries + unassignedReplicas,
"unassigned_primaries": unassignedPrimaries,
"unassigned_replicas": unassignedReplicas,

"initializing": initializing,
"relocating": relocating,
return &shardStats{
Total: primaries + replicas,
Primaries: primaries,
Replicas: replicas,
ActiveTotal: activePrimaries + activeReplicas,
ActivePrimaries: activePrimaries,
ActiveReplicas: activeReplicas,
UnassignedTotal: unassignedPrimaries + unassignedReplicas,
UnassignedPrimaries: unassignedPrimaries,
UnassignedReplicas: unassignedReplicas,
Initializing: initializing,
Relocating: relocating,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/index/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func BenchmarkParseAPIResponse(b *testing.B) {
content, err := ioutil.ReadFile("_meta/test/stats.800.bench.json")
require.NoError(b, err)

var indicesStruct IndicesStruct
var indicesStats stats

for i := 0; i < b.N; i++ {
err = parseAPIResponse(content, &indicesStruct)
err = parseAPIResponse(content, &indicesStats)
require.NoError(b, err)
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

const (
statsMetrics = "docs,fielddata,indexing,merge,search,segments,store,refresh,query_cache,request_cache"
statsPath = "/_stats/" + statsMetrics
statsPath = "/_stats/" + statsMetrics + "?filter_path=indices"
)

// MetricSet type defines all fields of the MetricSet
Expand Down