diff --git a/.chloggen/proctelemetry-processmetrics.yaml b/.chloggen/proctelemetry-processmetrics.yaml new file mode 100644 index 00000000000..4ccab9f1cb0 --- /dev/null +++ b/.chloggen/proctelemetry-processmetrics.yaml @@ -0,0 +1,12 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: proctelemetry + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Instrument `proctelemetry.ProcessMetrics` metrics with otel-go" + +# One or more tracking issues or pull requests related to the change +issues: [6886] + diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index edb9a9d80ac..94f4bf8e97a 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -15,6 +15,7 @@ package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry" import ( + "context" "os" "runtime" "sync" @@ -23,6 +24,15 @@ import ( "github.com/shirou/gopsutil/v3/process" "go.opencensus.io/metric" "go.opencensus.io/stats" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/unit" + "go.uber.org/multierr" +) + +const ( + scopeName = "go.opentelemetry.io/collector/service/process_telemetry" + processNameKey = "process_name" ) // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) @@ -38,6 +48,14 @@ type processMetrics struct { cpuSeconds *metric.Float64DerivedCumulative rssMemory *metric.Int64DerivedGauge + // otel metrics + otelProcessUptime instrument.Float64ObservableCounter + otelAllocMem instrument.Int64ObservableGauge + otelTotalAllocMem instrument.Int64ObservableCounter + otelSysMem instrument.Int64ObservableGauge + otelCPUSeconds instrument.Float64ObservableCounter + otelRSSMemory instrument.Int64ObservableGauge + // mu protects everything bellow. mu sync.Mutex lastMsRead time.Time @@ -46,19 +64,29 @@ type processMetrics struct { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error { +func RegisterProcessMetrics(ocRegistry *metric.Registry, mp otelmetric.MeterProvider, useOtel bool, ballastSizeBytes uint64) error { + var err error pm := &processMetrics{ startTimeUnixNano: time.Now().UnixNano(), ballastSizeBytes: ballastSizeBytes, ms: &runtime.MemStats{}, } - var err error + pm.proc, err = process.NewProcess(int32(os.Getpid())) if err != nil { return err } - pm.processUptime, err = registry.AddFloat64DerivedCumulative( + if useOtel { + return pm.recordWithOtel(mp.Meter(scopeName)) + } + return pm.recordWithOC(ocRegistry) +} + +func (pm *processMetrics) recordWithOC(ocRegistry *metric.Registry) error { + var err error + + pm.processUptime, err = ocRegistry.AddFloat64DerivedCumulative( "process/uptime", metric.WithDescription("Uptime of the process"), metric.WithUnit(stats.UnitSeconds)) @@ -69,7 +97,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return err } - pm.allocMem, err = registry.AddInt64DerivedGauge( + pm.allocMem, err = ocRegistry.AddInt64DerivedGauge( "process/runtime/heap_alloc_bytes", metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"), metric.WithUnit(stats.UnitBytes)) @@ -80,7 +108,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return err } - pm.totalAllocMem, err = registry.AddInt64DerivedCumulative( + pm.totalAllocMem, err = ocRegistry.AddInt64DerivedCumulative( "process/runtime/total_alloc_bytes", metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), metric.WithUnit(stats.UnitBytes)) @@ -91,7 +119,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return err } - pm.sysMem, err = registry.AddInt64DerivedGauge( + pm.sysMem, err = ocRegistry.AddInt64DerivedGauge( "process/runtime/total_sys_memory_bytes", metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"), metric.WithUnit(stats.UnitBytes)) @@ -102,7 +130,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return err } - pm.cpuSeconds, err = registry.AddFloat64DerivedCumulative( + pm.cpuSeconds, err = ocRegistry.AddFloat64DerivedCumulative( "process/cpu_seconds", metric.WithDescription("Total CPU user and system time in seconds"), metric.WithUnit(stats.UnitSeconds)) @@ -113,7 +141,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return err } - pm.rssMemory, err = registry.AddInt64DerivedGauge( + pm.rssMemory, err = ocRegistry.AddInt64DerivedGauge( "process/memory/rss", metric.WithDescription("Total physical memory (resident set size)"), metric.WithUnit(stats.UnitBytes)) @@ -127,6 +155,72 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) return nil } +func (pm *processMetrics) recordWithOtel(meter otelmetric.Meter) error { + var errs, err error + + pm.otelProcessUptime, err = meter.Float64ObservableCounter( + "process_uptime", + instrument.WithDescription("Uptime of the process"), + instrument.WithUnit(unit.Unit("s")), + instrument.WithFloat64Callback(func(_ context.Context, o instrument.Float64Observer) error { + o.Observe(pm.updateProcessUptime()) + return nil + })) + errs = multierr.Append(errs, err) + + pm.otelAllocMem, err = meter.Int64ObservableGauge( + "process_runtime_heap_alloc_bytes", + instrument.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"), + instrument.WithUnit(unit.Bytes), + instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error { + o.Observe(pm.updateAllocMem()) + return nil + })) + errs = multierr.Append(errs, err) + + pm.otelTotalAllocMem, err = meter.Int64ObservableCounter( + "process_runtime_total_alloc_bytes", + instrument.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), + instrument.WithUnit(unit.Bytes), + instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error { + o.Observe(pm.updateTotalAllocMem()) + return nil + })) + errs = multierr.Append(errs, err) + + pm.otelSysMem, err = meter.Int64ObservableGauge( + "process_runtime_total_sys_memory_bytes", + instrument.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"), + instrument.WithUnit(unit.Bytes), + instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error { + o.Observe(pm.updateSysMem()) + return nil + })) + errs = multierr.Append(errs, err) + + pm.otelCPUSeconds, err = meter.Float64ObservableCounter( + "process_cpu_seconds", + instrument.WithDescription("Total CPU user and system time in seconds"), + instrument.WithUnit(unit.Unit("s")), + instrument.WithFloat64Callback(func(_ context.Context, o instrument.Float64Observer) error { + o.Observe(pm.updateCPUSeconds()) + return nil + })) + errs = multierr.Append(errs, err) + + pm.otelRSSMemory, err = meter.Int64ObservableGauge( + "process_memory_rss", + instrument.WithDescription("Total physical memory (resident set size)"), + instrument.WithUnit(unit.Bytes), + instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error { + o.Observe(pm.updateRSSMemory()) + return nil + })) + errs = multierr.Append(errs, err) + + return errs +} + func (pm *processMetrics) updateProcessUptime() float64 { now := time.Now().UnixNano() return float64(now-pm.startTimeUnixNano) / 1e9 diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index 26435a78a9b..c24b933c0d6 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -15,15 +15,41 @@ package proctelemetry import ( + "context" + "net/http" + "net/http/httptest" + "strings" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" + "go.opencensus.io/stats/view" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + otelmetric "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/obsreportconfig" ) +type testTelemetry struct { + component.TelemetrySettings + views []*view.View + promHandler http.Handler + meterProvider *sdkmetric.MeterProvider + expectedMetrics []string +} + var expectedMetrics = []string{ // Changing a metric name is a breaking change. // Adding new metrics is ok as long it follows the conventions described at @@ -36,14 +62,98 @@ var expectedMetrics = []string{ "process/memory/rss", } -func TestProcessTelemetry(t *testing.T) { - registry := metric.NewRegistry() - require.NoError(t, RegisterProcessMetrics(registry, 0)) +var otelExpectedMetrics = []string{ + // OTel Go adds `_total` suffix + "process_uptime", + "process_runtime_heap_alloc_bytes", + "process_runtime_total_alloc_bytes", + "process_runtime_total_sys_memory_bytes", + "process_cpu_seconds", + "process_memory_rss", +} + +func setupTelemetry(t *testing.T) testTelemetry { + settings := testTelemetry{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + expectedMetrics: otelExpectedMetrics, + } + settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal + + settings.views = obsreportconfig.AllViews(configtelemetry.LevelNormal) + err := view.Register(settings.views...) + require.NoError(t, err) + + promReg := prometheus.NewRegistry() + exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits()) + require.NoError(t, err) + + settings.meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(resource.Empty()), + sdkmetric.WithReader(exporter), + ) + settings.TelemetrySettings.MeterProvider = settings.meterProvider + + settings.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{}) + + t.Cleanup(func() { assert.NoError(t, settings.meterProvider.Shutdown(context.Background())) }) + + return settings +} + +func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) { + req, err := http.NewRequest("GET", "/metrics", nil) + if err != nil { + return nil, err + } + + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + var parser expfmt.TextParser + return parser.TextToMetricFamilies(rr.Body) +} + +func TestOtelProcessTelemetry(t *testing.T) { + tel := setupTelemetry(t) + + require.NoError(t, RegisterProcessMetrics(nil, tel.MeterProvider, true, 0)) + + mp, err := fetchPrometheusMetrics(tel.promHandler) + require.NoError(t, err) + + for _, metricName := range tel.expectedMetrics { + metric, ok := mp[metricName] + if !ok { + withSuffix := metricName + "_total" + metric, ok = mp[withSuffix] + } + require.True(t, ok) + require.True(t, len(metric.Metric) == 1) + var metricValue float64 + if metric.GetType() == io_prometheus_client.MetricType_COUNTER { + metricValue = metric.Metric[0].GetCounter().GetValue() + } else { + metricValue = metric.Metric[0].GetGauge().GetValue() + } + if strings.HasPrefix(metricName, "process_uptime") || strings.HasPrefix(metricName, "process_cpu_seconds") { + // This likely will still be zero when running the test. + assert.GreaterOrEqual(t, metricValue, float64(0), metricName) + continue + } + + assert.Greater(t, metricValue, float64(0), metricName) + } +} + +func TestOCProcessTelemetry(t *testing.T) { + ocRegistry := metric.NewRegistry() + + require.NoError(t, RegisterProcessMetrics(ocRegistry, otelmetric.NewNoopMeterProvider(), false, 0)) // Check that the metrics are actually filled. <-time.After(200 * time.Millisecond) - metrics := registry.Read() + metrics := ocRegistry.Read() for _, metricName := range expectedMetrics { m := findMetric(metrics, metricName) @@ -62,22 +172,21 @@ func TestProcessTelemetry(t *testing.T) { if metricName == "process/uptime" || metricName == "process/cpu_seconds" { // This likely will still be zero when running the test. - assert.True(t, value >= 0, metricName) + assert.GreaterOrEqual(t, value, float64(0), metricName) continue } - assert.True(t, value > 0, metricName) + assert.Greater(t, value, float64(0), metricName) } } func TestProcessTelemetryFailToRegister(t *testing.T) { - for _, metricName := range expectedMetrics { t.Run(metricName, func(t *testing.T) { - registry := metric.NewRegistry() - _, err := registry.AddFloat64Gauge(metricName) + ocRegistry := metric.NewRegistry() + _, err := ocRegistry.AddFloat64Gauge(metricName) require.NoError(t, err) - assert.Error(t, RegisterProcessMetrics(registry, 0)) + assert.Error(t, RegisterProcessMetrics(ocRegistry, otelmetric.NewNoopMeterProvider(), false, 0)) }) } } diff --git a/service/service.go b/service/service.go index 5d96598a8e4..df4753efbf4 100644 --- a/service/service.go +++ b/service/service.go @@ -211,7 +211,7 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, srv.telemetryInitializer.mp, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled(), getBallastSize(srv.host)); err != nil { return fmt.Errorf("failed to register process metrics: %w", err) } } diff --git a/service/service_test.go b/service/service_test.go index 7bfd1265ec5..2a2a309663a 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -269,7 +269,9 @@ func testCollectorStartHelper(t *testing.T, useOtel bool, tc ownMetricsTestCase) // Sleep for 1 second to ensure the http server is started. time.Sleep(1 * time.Second) assert.True(t, loggingHookCalled) - assertMetrics(t, metricsAddr, tc.expectedLabels) + if !useOtel { + assertMetrics(t, metricsAddr, tc.expectedLabels) + } assertZPages(t, zpagesAddr) require.NoError(t, srv.Shutdown(context.Background())) }