Skip to content

Commit

Permalink
Report memory usage by application (#7966)
Browse files Browse the repository at this point in the history
Changes the license reporting to not only show the total managed memory but also show a breakdown by application.
  • Loading branch information
pebrc authored Jul 26, 2024
1 parent 87e8d3f commit 704f386
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 83 deletions.
46 changes: 37 additions & 9 deletions docs/operating-eck/licensing.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,23 @@ The operator periodically writes the total amount of Elastic resources under man
----
> kubectl -n elastic-system get configmap elastic-licensing -o json | jq .data
{
"apm_memory": "0.50GiB",
"apm_memory_bytes": "536870912",
"eck_license_expiry_date": "2025-01-01T00:59:59+01:00",
"eck_license_level": "enterprise",
"eck_license_expiry_date": "2022-01-01T00:59:59+01:00",
"elasticsearch_memory": "18.00GiB",
"elasticsearch_memory_bytes": "19327352832",
"enterprise_resource_units": "1",
"max_enterprise_resource_units": "10",
"timestamp": "2020-01-03T23:38:20Z",
"total_managed_memory": "64GiB",
"total_managed_memory_bytes": "68719476736"
"enterprise_search_memory": "4.00GiB",
"enterprise_search_memory_bytes": "4294967296",
"kibana_memory": "1.00GiB",
"kibana_memory_bytes": "1073741824",
"logstash_memory": "2.00GiB",
"logstash_memory_bytes": "2147483648",
"max_enterprise_resource_units": "250",
"timestamp": "2024-07-26T12:40:42+02:00",
"total_managed_memory": "25.50GiB",
"total_managed_memory_bytes": "27380416512"
}
----

Expand All @@ -120,12 +130,30 @@ If the operator metrics endpoint is enabled with the `--metrics-port` flag (chec
[source,shell]
----
> curl "$ECK_METRICS_ENDPOINT" | grep elastic_licensing
# HELP elastic_licensing_enterprise_resource_units_max Maximum number of enterprise resource units available
# TYPE elastic_licensing_enterprise_resource_units_max gauge
elastic_licensing_enterprise_resource_units_max{license_level="enterprise"} 250
# HELP elastic_licensing_enterprise_resource_units_total Total enterprise resource units used
# TYPE elastic_licensing_enterprise_resource_units_total gauge
elastic_licensing_enterprise_resource_units_total{license_level="basic"} 6
# HELP elastic_licensing_memory_gigabytes_total Total memory used in GB
# TYPE elastic_licensing_memory_gigabytes_total gauge
elastic_licensing_memory_gigabytes_total{license_level="basic"} 357.01915648
elastic_licensing_enterprise_resource_units_total{license_level="enterprise"} 1
# HELP elastic_licensing_memory_gibibytes_apm Memory used by APM server in GiB
# TYPE elastic_licensing_memory_gibibytes_apm gauge
elastic_licensing_memory_gibibytes_apm{license_level="enterprise"} 0.5
# HELP elastic_licensing_memory_gibibytes_elasticsearch Memory used by Elasticsearch in GiB
# TYPE elastic_licensing_memory_gibibytes_elasticsearch gauge
elastic_licensing_memory_gibibytes_elasticsearch{license_level="enterprise"} 18
# HELP elastic_licensing_memory_gibibytes_enterprise_search Memory used by Enterprise Search in GiB
# TYPE elastic_licensing_memory_gibibytes_enterprise_search gauge
elastic_licensing_memory_gibibytes_enterprise_search{license_level="enterprise"} 4
# HELP elastic_licensing_memory_gibibytes_kibana Memory used by Kibana in GiB
# TYPE elastic_licensing_memory_gibibytes_kibana gauge
elastic_licensing_memory_gibibytes_kibana{license_level="enterprise"} 1
# HELP elastic_licensing_memory_gibibytes_logstash Memory used by Logstash in GiB
# TYPE elastic_licensing_memory_gibibytes_logstash gauge
elastic_licensing_memory_gibibytes_logstash{license_level="enterprise"} 2
# HELP elastic_licensing_memory_gibibytes_total Total memory used in GiB
# TYPE elastic_licensing_memory_gibibytes_total gauge
elastic_licensing_memory_gibibytes_total{license_level="enterprise"} 25.5
----

NOTE: Logstash resources managed by ECK will be counted towards ERU usage for informational purposes. Billable consumption depends on license terms on a per customer basis (See link:https://www.elastic.co/agreements/global/self-managed[Self Managed Subscription Agreement])
58 changes: 29 additions & 29 deletions pkg/license/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import (
ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
)

// Aggregator aggregates the total of resources of all Elastic managed components
type Aggregator struct {
// aggregator aggregates the total of resources of all Elastic managed components
type aggregator struct {
client k8s.Client
}

type aggregate func(ctx context.Context) (resource.Quantity, error)
type aggregate func(ctx context.Context) (managedMemory, error)

// AggregateMemory aggregates the total memory of all Elastic managed components
func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, error) {
var totalMemory resource.Quantity
// aggregateMemory aggregates the total memory of all Elastic managed components
func (a aggregator) aggregateMemory(ctx context.Context) (memoryUsage, error) {
usage := newMemoryUsage()

for _, f := range []aggregate{
a.aggregateElasticsearchMemory,
Expand All @@ -50,19 +50,19 @@ func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, err
} {
memory, err := f(ctx)
if err != nil {
return resource.Quantity{}, err
return memoryUsage{}, err
}
totalMemory.Add(memory)
usage.add(memory)
}

return totalMemory, nil
return usage, nil
}

func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateElasticsearchMemory(ctx context.Context) (managedMemory, error) {
var esList esv1.ElasticsearchList
err := a.client.List(context.Background(), &esList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}

var total resource.Quantity
Expand All @@ -75,7 +75,7 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
nodespec.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}

total.Add(multiply(mem, nodeSet.Count))
Expand All @@ -84,14 +84,14 @@ func (a Aggregator) aggregateElasticsearchMemory(ctx context.Context) (resource.
}
}

return total, nil
return managedMemory{total, elasticsearchKey}, nil
}

func (a Aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (managedMemory, error) {
var entList entv1.EnterpriseSearchList
err := a.client.List(context.Background(), &entList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}

var total resource.Quantity
Expand All @@ -103,22 +103,22 @@ func (a Aggregator) aggregateEnterpriseSearchMemory(ctx context.Context) (resour
enterprisesearch.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}

total.Add(multiply(mem, ent.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", ent.Namespace, "ent_name", ent.Name,
"memory", mem.String(), "count", ent.Spec.Count)
}

return total, nil
return managedMemory{total, entSearchKey}, nil
}

func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateKibanaMemory(ctx context.Context) (managedMemory, error) {
var kbList kbv1.KibanaList
err := a.client.List(context.Background(), &kbList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}

var total resource.Quantity
Expand All @@ -130,22 +130,22 @@ func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantit
kibana.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}

total.Add(multiply(mem, kb.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", kb.Namespace, "kibana_name", kb.Name,
"memory", mem.String(), "count", kb.Spec.Count)
}

return total, nil
return managedMemory{total, kibanaKey}, nil
}

func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateLogstashMemory(ctx context.Context) (managedMemory, error) {
var lsList lsv1alpha1.LogstashList
err := a.client.List(context.Background(), &lsList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

var total resource.Quantity
Expand All @@ -157,22 +157,22 @@ func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quant
logstash.DefaultMemoryLimit,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

total.Add(multiply(mem, ls.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", ls.Namespace, "logstash_name", ls.Name,
"memory", mem.String(), "count", ls.Spec.Count)
}

return total, nil
return managedMemory{total, logstashKey}, nil
}

func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quantity, error) {
func (a aggregator) aggregateApmServerMemory(ctx context.Context) (managedMemory, error) {
var asList apmv1.ApmServerList
err := a.client.List(context.Background(), &asList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}

var total resource.Quantity
Expand All @@ -184,15 +184,15 @@ func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quan
apmserver.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
return managedMemory{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}

total.Add(multiply(mem, as.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", as.Namespace, "as_name", as.Name,
"memory", mem.String(), "count", as.Spec.Count)
}

return total, nil
return managedMemory{total, apmKey}, nil
}

// containerMemLimits reads the container memory limits from the resource specification with fallback
Expand Down
15 changes: 12 additions & 3 deletions pkg/license/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,20 @@ func TestMemFromNodeOpts(t *testing.T) {
func TestAggregator(t *testing.T) {
objects := readObjects(t, "testdata/stack.yaml")
client := k8s.NewFakeClient(objects...)
aggregator := Aggregator{client: client}
aggregator := aggregator{client: client}

val, err := aggregator.AggregateMemory(context.Background())
val, err := aggregator.aggregateMemory(context.Background())
require.NoError(t, err)
require.Equal(t, 329.9073486328125, inGiB(val))
for k, v := range map[string]float64{
elasticsearchKey: 294.0,
kibanaKey: 5.9073486328125,
apmKey: 2.0,
entSearchKey: 24.0,
logstashKey: 4.0,
} {
require.Equal(t, v, val.appUsage[k].inGiB(), k)
}
require.Equal(t, 329.9073486328125, val.totalMemory.inGiB(), "total")
}

func readObjects(t *testing.T, filePath string) []client.Object {
Expand Down
74 changes: 62 additions & 12 deletions pkg/license/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,75 @@ const (
Type = "elastic-usage"
// GiB represents the number of bytes for 1 GiB
GiB = 1024 * 1024 * 1024

elasticsearchKey = "elasticsearch"
kibanaKey = "kibana"
apmKey = "apm"
entSearchKey = "enterprise_search"
logstashKey = "logstash"
totalKey = "total_managed"
)

type managedMemory struct {
resource.Quantity
label string
}

func newManagedMemory(binarySI int64, label string) managedMemory {
return managedMemory{
Quantity: *resource.NewQuantity(binarySI, resource.BinarySI),
label: label,
}
}

func (mm managedMemory) inGiB() float64 {
return inGiB(mm.Quantity)
}

func (mm managedMemory) intoMap(m map[string]string) {
m[mm.label+"_memory"] = fmt.Sprintf("%0.2fGiB", inGiB(mm.Quantity))
m[mm.label+"_memory_bytes"] = fmt.Sprintf("%d", mm.Quantity.Value())
}

type memoryUsage struct {
appUsage map[string]managedMemory
totalMemory managedMemory
}

func newMemoryUsage() memoryUsage {
return memoryUsage{
appUsage: map[string]managedMemory{},
totalMemory: managedMemory{label: totalKey},
}
}

func (mu *memoryUsage) add(memory managedMemory) {
mu.appUsage[memory.label] = memory
mu.totalMemory.Add(memory.Quantity)
}

// LicensingInfo represents information about the operator license including the total memory of all Elastic managed
// components
type LicensingInfo struct {
memoryUsage
Timestamp string
EckLicenseLevel string
EckLicenseExpiryDate *time.Time
TotalManagedMemoryGiB float64
TotalManagedMemoryBytes int64
MaxEnterpriseResourceUnits int64
EnterpriseResourceUnits int64
}

// toMap transforms a LicensingInfo to a map of string, in order to fill in the data of a config map
func (li LicensingInfo) toMap() map[string]string {
m := map[string]string{
"timestamp": li.Timestamp,
"eck_license_level": li.EckLicenseLevel,
"total_managed_memory": fmt.Sprintf("%0.2fGiB", li.TotalManagedMemoryGiB),
"total_managed_memory_bytes": fmt.Sprintf("%d", li.TotalManagedMemoryBytes),
"enterprise_resource_units": strconv.FormatInt(li.EnterpriseResourceUnits, 10),
"timestamp": li.Timestamp,
"eck_license_level": li.EckLicenseLevel,
"enterprise_resource_units": strconv.FormatInt(li.EnterpriseResourceUnits, 10),
}
for _, v := range li.appUsage {
v.intoMap(m)
}
li.totalMemory.intoMap(m)

if li.MaxEnterpriseResourceUnits > 0 {
m["max_enterprise_resource_units"] = strconv.FormatInt(li.MaxEnterpriseResourceUnits, 10)
Expand All @@ -74,7 +120,12 @@ func (li LicensingInfo) toMap() map[string]string {

func (li LicensingInfo) ReportAsMetrics() {
labels := prometheus.Labels{metrics.LicenseLevelLabel: li.EckLicenseLevel}
metrics.LicensingTotalMemoryGauge.With(labels).Set(li.TotalManagedMemoryGiB)
metrics.LicensingTotalMemoryGauge.With(labels).Set(li.totalMemory.inGiB())
metrics.LicensingESMemoryGauge.With(labels).Set(li.appUsage[elasticsearchKey].inGiB())
metrics.LicensingKBMemoryGauge.With(labels).Set(li.appUsage[kibanaKey].inGiB())
metrics.LicensingAPMMemoryGauge.With(labels).Set(li.appUsage[apmKey].inGiB())
metrics.LicensingEntSearchMemoryGauge.With(labels).Set(li.appUsage[entSearchKey].inGiB())
metrics.LicensingLogstashMemoryGauge.With(labels).Set(li.appUsage[logstashKey].inGiB())
metrics.LicensingTotalERUGauge.With(labels).Set(float64(li.EnterpriseResourceUnits))

if li.MaxEnterpriseResourceUnits > 0 {
Expand All @@ -89,19 +140,18 @@ type LicensingResolver struct {
}

// ToInfo returns licensing information given the total memory of all Elastic managed components
func (r LicensingResolver) ToInfo(ctx context.Context, totalMemory resource.Quantity) (LicensingInfo, error) {
func (r LicensingResolver) ToInfo(ctx context.Context, memoryUsage memoryUsage) (LicensingInfo, error) {
operatorLicense, err := r.getOperatorLicense(ctx)
if err != nil {
return LicensingInfo{}, err
}

licensingInfo := LicensingInfo{
memoryUsage: memoryUsage,
Timestamp: time.Now().Format(time.RFC3339),
EckLicenseLevel: r.getOperatorLicenseLevel(operatorLicense),
EckLicenseExpiryDate: r.getOperatorLicenseExpiry(operatorLicense),
TotalManagedMemoryGiB: inGiB(totalMemory),
TotalManagedMemoryBytes: totalMemory.Value(),
EnterpriseResourceUnits: inEnterpriseResourceUnits(totalMemory),
EnterpriseResourceUnits: inEnterpriseResourceUnits(memoryUsage.totalMemory.Quantity),
}

// include the max ERUs only for a non trial/basic license
Expand Down
Loading

0 comments on commit 704f386

Please sign in to comment.