Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move default node collector port to avoid conflicts and add setting in config #1618

Merged
merged 15 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/config/crd/bases/odigos.io_collectorsgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,19 @@ spec:
spec:
description: CollectorsGroupSpec defines the desired state of Collector
properties:
collectorOwnMetricsPort:
description: |-
The port to use for exposing the collector's own metrics as a prometheus endpoint.
This can be used to resolve conflicting ports when a collector is using the host network.
format: int32
type: integer
role:
enum:
- CLUSTER_GATEWAY
- NODE_COLLECTOR
type: string
required:
- collectorOwnMetricsPort
- role
type: object
status:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// CollectorsGroupSpec defines the desired state of Collector
type CollectorsGroupSpec struct {
Role CollectorsGroupRole `json:"role"`

// The port to use for exposing the collector's own metrics as a prometheus endpoint.
blumamir marked this conversation as resolved.
Show resolved Hide resolved
// This can be used to resolve conflicting ports when a collector is using the host network.
CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort"`
}

// CollectorsGroupStatus defines the observed state of Collector
Expand Down
27 changes: 3 additions & 24 deletions autoscaler/controllers/collectorsgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/datacollection"
"github.com/odigos-io/odigos/autoscaler/controllers/gateway"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -32,28 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

type onlyCreatePredicate struct {
predicate.Funcs
}

func (i *onlyCreatePredicate) Create(e event.CreateEvent) bool {
return true
}

func (i *onlyCreatePredicate) Update(e event.UpdateEvent) bool {
return false
}

func (i *onlyCreatePredicate) Delete(e event.DeleteEvent) bool {
return false
}

func (i *onlyCreatePredicate) Generic(e event.GenericEvent) bool {
return false
}

var _ predicate.Predicate = &onlyCreatePredicate{}

// CollectorsGroupReconciler reconciles a CollectorsGroup object
type CollectorsGroupReconciler struct {
client.Client
Expand Down Expand Up @@ -102,6 +79,8 @@ func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
func (r *CollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&odigosv1.CollectorsGroup{}).
WithEventFilter(&onlyCreatePredicate{}).
blumamir marked this conversation as resolved.
Show resolved Hide resolved
// we assume everything in the collectorsgroup spec is the configuration for the collectors to generate.
// thus, we need to monitor any change to the spec which is what the generation field is for.
WithEventFilter(&predicate.GenerationChangedPredicate{}).
Complete(r)
}
16 changes: 8 additions & 8 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client

func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
datacollection *odigosv1.CollectorsGroup, scheme *runtime.Scheme, setTracesLoadBalancer bool, disableNameProcessor bool) (*v1.ConfigMap, error) {
cmData, err := calculateConfigMapData(apps, dests, processors, setTracesLoadBalancer, disableNameProcessor)
cmData, err := calculateConfigMapData(datacollection, apps, dests, processors, setTracesLoadBalancer, disableNameProcessor)
if err != nil {
return nil, err
}
Expand All @@ -124,16 +124,16 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
return &desired, nil
}

func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) {

ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort

empty := struct{}{}

processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors))
if errs != nil {
for name, err := range errs {
log.Log.V(0).Info(err.Error(), "processor", name)
}
for name, err := range errs {
log.Log.V(0).Info(err.Error(), "processor", name)
}

if !disableNameProcessor {
Expand Down Expand Up @@ -214,7 +214,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
"targets": []string{fmt.Sprintf("127.0.0.1:%d", ownMetricsPort)},
},
},
"metric_relabel_configs": []config.GenericMap{
Expand Down Expand Up @@ -247,7 +247,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
Extensions: []string{"health_check"},
Telemetry: config.Telemetry{
Metrics: config.GenericMap{
"address": "0.0.0.0:8888",
"address": fmt.Sprintf("0.0.0.0:%d", ownMetricsPort),
},
Resource: map[string]*string{
// The collector add "otelcol" as a service name, so we need to remove it
Expand Down
22 changes: 12 additions & 10 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ const (
)

var (
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoReceiversConfigured = errors.New("no receivers were configured, cannot add self telemetry pipeline")
errNoExportersConfigured = errors.New("no exporters were configured, cannot add self telemetry pipeline")
)

func addSelfTelemetryPipeline(c *config.Config) error {
func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error {
if c.Service.Pipelines == nil {
return errNoPipelineConfigured
}
Expand All @@ -47,18 +47,18 @@ func addSelfTelemetryPipeline(c *config.Config) error {
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
"targets": []string{fmt.Sprintf("127.0.0.1:%d", ownTelemetryPort)},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
},
},
},
Expand Down Expand Up @@ -91,13 +91,13 @@ func addSelfTelemetryPipeline(c *config.Config) error {
},
}
c.Service.Pipelines["metrics/otelcol"] = config.Pipeline{
Receivers: []string{"prometheus/self-metrics"},
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
}

c.Service.Telemetry.Metrics = config.GenericMap{
"address": "0.0.0.0:8888",
"address": fmt.Sprintf("0.0.0.0:%d", ownTelemetryPort),
}

for pipelineName, pipeline := range c.Service.Pipelines {
Expand Down Expand Up @@ -126,7 +126,9 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc
common.ToExporterConfigurerArray(dests),
common.ToProcessorConfigurerArray(processors),
memoryLimiterConfiguration,
addSelfTelemetryPipeline,
func(c *config.Config) error {
return addSelfTelemetryPipeline(c, gateway.Spec.CollectorOwnMetricsPort)
},
)
if err != nil {
logger.Error(err, "Failed to calculate config")
Expand Down
14 changes: 8 additions & 6 deletions autoscaler/controllers/gateway/configmap_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package gateway

import (
"fmt"
"testing"

"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/stretchr/testify/assert"
)

Expand All @@ -12,7 +14,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
cases := []struct {
name string
cfg *config.Config
err error
err error
}{
{
name: "no pipeline",
Expand Down Expand Up @@ -66,7 +68,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
},
Processors: config.GenericMap{
"memory_limiter": config.GenericMap{
"check_interval": "1s",
"check_interval": "1s",
},
"resource/odigos-version": config.GenericMap{
"attributes": []config.GenericMap{
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := tc.cfg
Expand All @@ -115,15 +117,15 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
assert.Equal(t, []string{"prometheus"}, c.Service.Pipelines["metrics/otelcol"].Receivers)
assert.Equal(t, []string{"resource/pod-name"}, c.Service.Pipelines["metrics/otelcol"].Processors)
assert.Equal(t, []string{"otlp/ui"}, c.Service.Pipelines["metrics/otelcol"].Exporters)
assert.Equal(t, "0.0.0.0:8888", c.Service.Telemetry.Metrics["address"])
assert.Equal(t, fmt.Sprintf("0.0.0.0:%d", consts.OdigosNodeCollectorOwnTelemetryPortDefault), c.Service.Telemetry.Metrics["address"])
for pipelineName, pipeline := range c.Service.Pipelines {
if pipelineName == "metrics/otelcol" {
assert.NotContains(t, pipeline.Processors, "odigostrafficmetrics")
} else {
assert.Equal(t, pipeline.Processors[len(pipeline.Processors) - 1], "odigostrafficmetrics")
assert.Equal(t, pipeline.Processors[len(pipeline.Processors)-1], "odigostrafficmetrics")
}

}
})
}
}
}
6 changes: 3 additions & 3 deletions autoscaler/controllers/gateway/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien
}

result, err := controllerutil.CreateOrPatch(ctx, c, gatewaySvc, func() error {
updateGatewaySvc(gatewaySvc)
updateGatewaySvc(gatewaySvc, gateway)
return nil
})

Expand All @@ -67,7 +67,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien
return gatewaySvc, nil
}

func updateGatewaySvc(svc *v1.Service) {
func updateGatewaySvc(svc *v1.Service, collectorsGroup *odigosv1.CollectorsGroup) {
svc.Spec.Ports = []v1.ServicePort{
{
Name: "otlp",
Expand All @@ -83,7 +83,7 @@ func updateGatewaySvc(svc *v1.Service) {
},
{
Name: "metrics",
Port: 8888,
Port: collectorsGroup.Spec.CollectorOwnMetricsPort,
},
}

Expand Down
7 changes: 7 additions & 0 deletions common/odigos_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package common

type ProfileName string

type CollectorNodeConfiguration struct {
// The port to use for exposing the collector's own metrics as a prometheus endpoint.
// This can be used to resolve conflicting ports when a collector is using the host network.
CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort,omitempty"`
}

type CollectorGatewayConfiguration struct {
// RequestMemoryMiB is the memory request for the cluster gateway collector deployment.
// it will be embedded in the deployment as a resource request of the form "memory: <value>Mi"
Expand Down Expand Up @@ -38,6 +44,7 @@ type OdigosConfiguration struct {
AutoscalerImage string `json:"autoscalerImage,omitempty"`
DefaultSDKs map[ProgrammingLanguage]OtelSdk `json:"defaultSDKs,omitempty"`
CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"`
CollectorNode *CollectorNodeConfiguration `json:"collectorNode,omitempty"`
Profiles []ProfileName `json:"profiles,omitempty"`

// this is internal currently, and is not exposed on the CLI / helm
Expand Down
6 changes: 6 additions & 0 deletions helm/odigos/templates/odigos-config-cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ data:
goMemLimitMiB: {{ . }}
{{- end }}
{{- end }}
{{- if .Values.collectorNode }}
collectorNode:
{{- with .Values.collectorNode.collectorOwnMetricsPort }}
collectorOwnMetricsPort: {{ . }}
{{- end }}
{{- end }}
instrumentorImage: {{ .Values.instrumentor.image.repository }}
telemetryEnabled: {{ .Values.telemetry.enabled }}
openshiftEnabled: {{ .Values.openshift.enabled }}
Expand Down
5 changes: 5 additions & 0 deletions helm/odigos/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ collectorGateway:
# if not specified, it will be set to 80% of the hard limit of the memory limiter.
goMemLimitMiB: 340

collectorNode:
# The port to use for exposing the collector's own metrics as a prometheus endpoint.
# This can be used to resolve conflicting ports when a collector is using the host network.
collectorOwnMetricsPort: 55682

autoscaler:
image:
repository: keyval/odigos-autoscaler
Expand Down
7 changes: 4 additions & 3 deletions k8sutils/pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const (
)

const (
OdigosNodeCollectorDaemonSetName = "odigos-data-collection"
OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorDaemonSetName = "odigos-data-collection"
OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorOwnTelemetryPortDefault = int32(55682)

OdigosNodeCollectorConfigMapKey = "conf" // this key is different than the cluster collector value. not sure why
)
15 changes: 12 additions & 3 deletions k8sutils/pkg/utils/collectorgroup_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package utils

import (
"context"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func CreateCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error {
log.FromContext(ctx).Info("Creating collector group", "collectorGroupName", collectorGroup.Name)
return c.Create(ctx, collectorGroup)
func ApplyCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error {
logger := log.FromContext(ctx)
logger.Info("Applying collector group", "collectorGroupName", collectorGroup.Name)

err := c.Patch(ctx, collectorGroup, client.Apply, client.ForceOwnership, client.FieldOwner("scheduler"))
if err != nil {
logger.Error(err, "Failed to apply collector group")
return err
}
RonFed marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func GetCollectorGroup(ctx context.Context, c client.Client, namespace string, collectorGroupName string) (*odigosv1.CollectorsGroup, error) {
Expand Down
Loading
Loading