Skip to content

Commit

Permalink
feat: add memory limiter to gateway collector (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Mar 27, 2024
1 parent 65e23ce commit 53591fc
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 19 deletions.
30 changes: 30 additions & 0 deletions api/config/crd/bases/odigos.io_odigosconfigurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,36 @@ spec:
properties:
autoscalerImage:
type: string
collectorGateway:
properties:
goMemLimitMiB:
description: the GOMEMLIMIT environment variable value for the
collector gateway deployment. this is when go runtime will start
garbage collection. if not specified, it will be set to 80%
of the hard limit of the memory limiter.
type: integer
memoryLimiterLimitMiB:
description: this parameter sets the "limit_mib" parameter in
the memory limiter configuration for the collector gateway.
it is the hard limit after which a force garbage collection
will be performed. if not set, it will be 50Mi below the memory
request.
type: integer
memoryLimiterSpikeLimitMiB:
description: this parameter sets the "spike_limit_mib" parameter
in the memory limiter configuration for the collector gateway.
note that this is not the processor soft limit, but the diff
in Mib between the hard limit and the soft limit. if not set,
this will be set to 20% of the hard limit (so the soft limit
will be 80% of the hard limit).
type: integer
requestMemoryMiB:
description: 'RequestMemoryMiB is the memory request for the cluster
gateway collector deployment. it will be embedded in the deployment
as a resource request of the form "memory: <value>Mi" default
value is 500Mi'
type: integer
type: object
configVersion:
type: integer
defaultSDKs:
Expand Down
23 changes: 23 additions & 0 deletions api/odigos/v1alpha1/odigosconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type CollectorGatewayConfiguration struct {
// RequestMemoryMiB is the memory request for the cluster gateway collector deployment.
// it will be embedded in the deployment as a resource request of the form "memory: <value>Mi"
// default value is 500Mi
RequestMemoryMiB int `json:"requestMemoryMiB,omitempty"`

// this parameter sets the "limit_mib" parameter in the memory limiter configuration for the collector gateway.
// it is the hard limit after which a force garbage collection will be performed.
// if not set, it will be 50Mi below the memory request.
MemoryLimiterLimitMiB int `json:"memoryLimiterLimitMiB,omitempty"`

// this parameter sets the "spike_limit_mib" parameter in the memory limiter configuration for the collector gateway.
// note that this is not the processor soft limit, but the diff in Mib between the hard limit and the soft limit.
// if not set, this will be set to 20% of the hard limit (so the soft limit will be 80% of the hard limit).
MemoryLimiterSpikeLimitMiB int `json:"memoryLimiterSpikeLimitMiB,omitempty"`

// the GOMEMLIMIT environment variable value for the collector gateway deployment.
// this is when go runtime will start garbage collection.
// if not specified, it will be set to 80% of the hard limit of the memory limiter.
GoMemLimitMib int `json:"goMemLimitMiB,omitempty"`
}

// OdigosConfigurationSpec defines the desired state of OdigosConfiguration
type OdigosConfigurationSpec struct {
OdigosVersion string `json:"odigosVersion"`
Expand All @@ -20,6 +42,7 @@ type OdigosConfigurationSpec struct {
AutoscalerImage string `json:"autoscalerImage,omitempty"`
SupportedSDKs map[common.ProgrammingLanguage][]common.OtelSdk `json:"supportedSDKs,omitempty"`
DefaultSDKs map[common.ProgrammingLanguage]common.OtelSdk `json:"defaultSDKs,omitempty"`
CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"`
}

//+genclient
Expand Down
20 changes: 20 additions & 0 deletions api/odigos/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions autoscaler/controllers/gateway/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/keyval-dev/odigos/common"
)

const (
memoryLimiterProcessorName = "memory_limiter"
)

var availableConfigers = []Configer{&Middleware{}, &Honeycomb{}, &GrafanaCloudPrometheus{}, &GrafanaCloudTempo{}, &GrafanaCloudLoki{}, &Datadog{}, &NewRelic{}, &Logzio{}, &Prometheus{},
&Tempo{}, &Loki{}, &Jaeger{}, &GenericOTLP{}, &OTLPHttp{}, &Elasticsearch{}, &Quickwit{}, &Signoz{}, &Qryn{},
&OpsVerse{}, &Splunk{}, &Lightstep{}, &GoogleCloud{}, &GoogleCloudStorage{}, &Sentry{}, &AzureBlobStorage{},
Expand All @@ -20,8 +24,8 @@ type Configer interface {
ModifyConfig(dest *odigosv1.Destination, currentConfig *commonconf.Config)
}

func Calculate(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList) (string, error) {
currentConfig := getBasicConfig()
func Calculate(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList, memoryLimiterConfig commonconf.GenericMap) (string, error) {
currentConfig := getBasicConfig(memoryLimiterConfig)

configers, err := loadConfigers()
if err != nil {
Expand Down Expand Up @@ -53,7 +57,8 @@ func Calculate(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorLi

// basic config common to all pipelines
pipeline.Receivers = []string{"otlp"}
pipeline.Processors = append([]string{"batch", "resource/odigos-version"}, pipeline.Processors...)
// memory limiter processor should be the first processor in the pipeline
pipeline.Processors = append([]string{memoryLimiterProcessorName, "batch", "resource/odigos-version"}, pipeline.Processors...)
currentConfig.Service.Pipelines[pipelineName] = pipeline
}

Expand All @@ -65,7 +70,7 @@ func Calculate(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorLi
return string(data), nil
}

func getBasicConfig() *commonconf.Config {
func getBasicConfig(memoryLimiterConfig commonconf.GenericMap) *commonconf.Config {
empty := struct{}{}
return &commonconf.Config{
Receivers: commonconf.GenericMap{
Expand All @@ -80,12 +85,13 @@ func getBasicConfig() *commonconf.Config {
},
},
Processors: commonconf.GenericMap{
"batch": empty,
memoryLimiterProcessorName: memoryLimiterConfig,
"batch": empty,
"resource/odigos-version": commonconf.GenericMap{
"attributes": []commonconf.GenericMap{
{
"key": "odigos.version",
"value": "${ODIGOS_VERSION}",
"key": "odigos.version",
"value": "${ODIGOS_VERSION}",
"action": "upsert",
},
},
Expand Down
12 changes: 10 additions & 2 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"

odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1"
"github.com/keyval-dev/odigos/autoscaler/controllers/common"
"github.com/keyval-dev/odigos/autoscaler/controllers/gateway/config"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -19,9 +20,16 @@ const (
configKey = "collector-conf"
)

func syncConfigMap(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme) (string, error) {
func syncConfigMap(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) (string, error) {
logger := log.FromContext(ctx)
desiredData, err := config.Calculate(dests, processors)

memoryLimiterConfiguration := common.GenericMap{
"check_interval": "1s",
"limit_mib": memConfig.memoryLimiterLimitMiB,
"spike_limit_mib": memConfig.memoryLimiterSpikeLimitMiB,
}

desiredData, err := config.Calculate(dests, processors, memoryLimiterConfiguration)
if err != nil {
logger.Error(err, "Failed to calculate config")
return "", err
Expand Down
21 changes: 17 additions & 4 deletions autoscaler/controllers/gateway/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -28,9 +29,9 @@ const (
)

func syncDeployment(dests *odigosv1.DestinationList, gateway *odigosv1.CollectorsGroup, configData string,
ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.Deployment, error) {
ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) {
logger := log.FromContext(ctx)
desiredDeployment, err := getDesiredDeployment(dests, configData, gateway, scheme, imagePullSecrets, odigosVersion)
desiredDeployment, err := getDesiredDeployment(dests, configData, gateway, scheme, imagePullSecrets, odigosVersion, memConfig)
if err != nil {
logger.Error(err, "Failed to get desired deployment")
return nil, err
Expand Down Expand Up @@ -96,7 +97,10 @@ func patchDeployment(existing *appsv1.Deployment, desired *appsv1.Deployment, ct
}

func getDesiredDeployment(dests *odigosv1.DestinationList, configData string,
gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.Deployment, error) {
gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) {

requestMemoryQuantity := resource.MustParse(fmt.Sprintf("%dMi", memConfig.memoryRequestMiB))

desiredDeployment := &appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: gateway.Name,
Expand Down Expand Up @@ -153,6 +157,10 @@ func getDesiredDeployment(dests *odigosv1.DestinationList, configData string,
},
},
},
{
Name: "GOMEMLIMIT",
Value: fmt.Sprintf("%dMiB", memConfig.gomemlimitMiB),
},
},
SecurityContext: &corev1.SecurityContext{
RunAsUser: int64Ptr(10000),
Expand All @@ -179,14 +187,19 @@ func getDesiredDeployment(dests *odigosv1.DestinationList, configData string,
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: requestMemoryQuantity,
},
},
},
},
},
},
},
}

if imagePullSecrets != nil && len(imagePullSecrets) > 0 {
if len(imagePullSecrets) > 0 {
desiredDeployment.Spec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{}
for _, secret := range imagePullSecrets {
desiredDeployment.Spec.Template.Spec.ImagePullSecrets = append(desiredDeployment.Spec.Template.Spec.ImagePullSecrets, corev1.LocalObjectReference{Name: secret})
Expand Down
57 changes: 57 additions & 0 deletions autoscaler/controllers/gateway/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gateway

import odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1"

const (
defaultRequestMemoryMiB = 500

// this configures the processor limit_mib, which is the hard limit in MiB, afterwhich garbage collection will be forced.
// as recommended by the processor docs, if not set, this is set to 50MiB less than the memory limit of the collector
defaultMemoryLimiterLimitDiffMib = 50

// the soft limit will be set to 80% of the hard limit.
// this value is used to derive the "spike_limit_mib" parameter in the processor configuration if a value is not set
defaultMemoryLimiterSpikePercentage = 20.0

// the percentage out of the memory limiter hard limit, at which go runtime will start garbage collection.
// it is used to calculate the GOMEMLIMIT environment variable value.
defaultGoMemLimitPercentage = 80.0
)

type memoryConfigurations struct {
memoryRequestMiB int
memoryLimiterLimitMiB int
memoryLimiterSpikeLimitMiB int
gomemlimitMiB int
}

func getMemoryConfigurations(odigosConfig *odigosv1.OdigosConfiguration) *memoryConfigurations {

memoryRequestMiB := defaultRequestMemoryMiB
if odigosConfig.Spec.CollectorGateway != nil && odigosConfig.Spec.CollectorGateway.RequestMemoryMiB > 0 {
memoryRequestMiB = odigosConfig.Spec.CollectorGateway.RequestMemoryMiB
}

// the memory limiter hard limit is set as 50 MiB less than the memory request
memoryLimiterLimitMiB := memoryRequestMiB - defaultMemoryLimiterLimitDiffMib
if odigosConfig.Spec.CollectorGateway != nil && odigosConfig.Spec.CollectorGateway.MemoryLimiterLimitMiB > 0 {
memoryLimiterLimitMiB = odigosConfig.Spec.CollectorGateway.MemoryLimiterLimitMiB
}

memoryLimiterSpikeLimitMiB := memoryLimiterLimitMiB * defaultMemoryLimiterSpikePercentage / 100.0
if odigosConfig.Spec.CollectorGateway != nil && odigosConfig.Spec.CollectorGateway.MemoryLimiterSpikeLimitMiB > 0 {
memoryLimiterSpikeLimitMiB = odigosConfig.Spec.CollectorGateway.MemoryLimiterSpikeLimitMiB
}

gomemlimitMiB := int(memoryLimiterLimitMiB * defaultGoMemLimitPercentage / 100.0)
if odigosConfig.Spec.CollectorGateway != nil && odigosConfig.Spec.CollectorGateway.GoMemLimitMib != 0 {
gomemlimitMiB = odigosConfig.Spec.CollectorGateway.GoMemLimitMib
}

return &memoryConfigurations{
memoryRequestMiB: memoryRequestMiB,
memoryLimiterLimitMiB: memoryLimiterLimitMiB,
memoryLimiterSpikeLimitMiB: memoryLimiterSpikeLimitMiB,
gomemlimitMiB: gomemlimitMiB,
}
}
19 changes: 15 additions & 4 deletions autoscaler/controllers/gateway/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"

odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1"
"github.com/keyval-dev/odigos/common/consts"
"github.com/keyval-dev/odigos/common/utils"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -55,16 +57,25 @@ func Sync(ctx context.Context, client client.Client, scheme *runtime.Scheme, ima
return err
}

return syncGateway(&dests, &processors, gatewayCollectorGroup, ctx, client, scheme, imagePullSecrets, odigosVersion)
odigosSystemNamespaceName := utils.GetCurrentNamespace()
var odigosConfig odigosv1.OdigosConfiguration
if err := client.Get(ctx, types.NamespacedName{Namespace: odigosSystemNamespaceName, Name: consts.DefaultOdigosConfigurationName}, &odigosConfig); err != nil {
logger.Error(err, "failed to get odigos config")
return err
}

return syncGateway(&dests, &processors, gatewayCollectorGroup, ctx, client, scheme, imagePullSecrets, odigosVersion, &odigosConfig)
}

func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList,
gateway *odigosv1.CollectorsGroup, ctx context.Context,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) error {
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *odigosv1.OdigosConfiguration) error {
logger := log.FromContext(ctx)
logger.V(0).Info("Syncing gateway")

configData, err := syncConfigMap(dests, processors, gateway, ctx, c, scheme)
memConfig := getMemoryConfigurations(odigosConfig)

configData, err := syncConfigMap(dests, processors, gateway, ctx, c, scheme, memConfig)
if err != nil {
logger.Error(err, "Failed to sync config map")
return err
Expand All @@ -76,7 +87,7 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor
return err
}

dep, err := syncDeployment(dests, gateway, configData, ctx, c, scheme, imagePullSecrets, odigosVersion)
dep, err := syncDeployment(dests, gateway, configData, ctx, c, scheme, imagePullSecrets, odigosVersion, memConfig)
if err != nil {
logger.Error(err, "Failed to sync deployment")
return err
Expand Down
Loading

0 comments on commit 53591fc

Please sign in to comment.