From 979ddea27dcb05165896e58f8c3ee5c2eb759e47 Mon Sep 17 00:00:00 2001 From: Marcel Birkner Date: Thu, 2 Nov 2023 17:06:07 +0100 Subject: [PATCH] [cmd/telemetrygen] Use exporter per worker for better metrics throughput (#27201) Adding a feature - Use exporter per worker for better metrics throughput Initially when adding more workers in the telemetrygen config when running "metrics" it did not increase the metrics throughput since all workers used the same exporter. By creating one exporter per worker we can now increase the number of metrics being send to the backend. Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26709 - Units tests pass - Ran local load tests with different configurations ## Before code change Generate metrics: ``` telemetrygen metrics \ --metric-type Sum \ --duration "60s" \ --rate "0" \ --workers "10" \ --otlp-http=false \ --otlp-endpoint \ --otlp-attributes "service.name"=\"telemetrygen\" ``` Output: ``` metrics generated {"worker": 8, "metrics": 139} metrics generated {"worker": 0, "metrics": 139} metrics generated {"worker": 9, "metrics": 141} metrics generated {"worker": 4, "metrics": 140} metrics generated {"worker": 2, "metrics": 140} metrics generated {"worker": 3, "metrics": 140} metrics generated {"worker": 7, "metrics": 140} metrics generated {"worker": 5, "metrics": 140} metrics generated {"worker": 1, "metrics": 140} metrics generated {"worker": 6, "metrics": 140} ``` ## After code change ``` telemetrygen metrics \ --metric-type Sum \ --duration "60s" \ --rate "0" \ --workers "10" \ --otlp-http=false \ --otlp-endpoint \ --otlp-attributes "service.name"=\"telemetrygen\" ``` Output: ``` metrics generated {"worker": 6, "metrics": 1292} metrics generated {"worker": 3, "metrics": 1277} metrics generated {"worker": 5, "metrics": 1272} metrics generated {"worker": 8, "metrics": 1251} metrics generated {"worker": 9, "metrics": 1241} metrics generated {"worker": 4, "metrics": 1227} metrics generated {"worker": 0, "metrics": 1212} metrics generated {"worker": 2, "metrics": 1201} metrics generated {"worker": 1, "metrics": 1333} metrics generated {"worker": 7, "metrics": 1363} ``` By adding more workers you can now export more metrics and use `telemetrygen` better for load testing use cases. With the code change I can now utilize my CPU better for load tests. When adding 200 workers to the above config the CPU usage can go above 80%. Before that CPU usage would be around 1% with 200 workers. ![image](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/558256/66727e5f-6b0a-44a3-8436-7e6985d6a01c) --------- Co-authored-by: Alex Boten --- .../telemetrygen-add-exporter-per-worker.yaml | 25 ++++ cmd/telemetrygen/README.md | 40 ++++++- cmd/telemetrygen/internal/metrics/metrics.go | 31 ++--- cmd/telemetrygen/internal/metrics/worker.go | 15 ++- .../internal/metrics/worker_test.go | 111 +++++++++++------- 5 files changed, 155 insertions(+), 67 deletions(-) create mode 100644 .chloggen/telemetrygen-add-exporter-per-worker.yaml diff --git a/.chloggen/telemetrygen-add-exporter-per-worker.yaml b/.chloggen/telemetrygen-add-exporter-per-worker.yaml new file mode 100644 index 000000000000..6b591d9e9fd1 --- /dev/null +++ b/.chloggen/telemetrygen-add-exporter-per-worker.yaml @@ -0,0 +1,25 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use exporter per worker for better metrics throughput + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26709] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/cmd/telemetrygen/README.md b/cmd/telemetrygen/README.md index 6c329d305f39..f13788b27e95 100644 --- a/cmd/telemetrygen/README.md +++ b/cmd/telemetrygen/README.md @@ -28,28 +28,44 @@ Check the [`go install` reference](https://go.dev/ref/mod#go-install) to install First, you'll need an OpenTelemetry Collector to receive the telemetry data. Follow the project's instructions for a detailed setting up guide. The following configuration file should be sufficient: +config.yaml: ```yaml receivers: otlp: protocols: grpc: - endpoint: localhost:4317 + endpoint: 0.0.0.0:4317 processors: + batch: exporters: debug: + verbosity: detailed service: pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] traces: - receivers: - - otlp - processors: [] - exporters: - - debug + receivers: [otlp] + processors: [batch] + exporters: [debug] ``` +Starting OpenTelemetry collector via docker: +``` +docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.86.0 +``` + +Other options for running the collector are documented here https://opentelemetry.io/docs/collector/getting-started/ + Once the OpenTelemetry Collector instance is up and running, run `telemetrygen` for your desired telemetry: ### Traces @@ -65,3 +81,15 @@ telemetrygen traces --otlp-insecure --traces 1 ``` Check `telemetrygen traces --help` for all the options. + +### Logs + +```console +telemetrygen logs --duration 5s --otlp-insecure +``` + +### Metrics + +```console +telemetrygen metrics --duration 5s --otlp-insecure +``` \ No newline at end of file diff --git a/cmd/telemetrygen/internal/metrics/metrics.go b/cmd/telemetrygen/internal/metrics/metrics.go index a67271715f43..0061d64b3695 100644 --- a/cmd/telemetrygen/internal/metrics/metrics.go +++ b/cmd/telemetrygen/internal/metrics/metrics.go @@ -29,26 +29,19 @@ func Start(cfg *Config) error { } logger.Info("starting the metrics generator with configuration", zap.Any("config", cfg)) - var exp sdkmetric.Exporter - if cfg.UseHTTP { - logger.Info("starting HTTP exporter") - exp, err = otlpmetrichttp.New(context.Background(), httpExporterOptions(cfg)...) - } else { - logger.Info("starting gRPC exporter") - exp, err = otlpmetricgrpc.New(context.Background(), grpcExporterOptions(cfg)...) - } - - if err != nil { - return fmt.Errorf("failed to obtain OTLP exporter: %w", err) - } - defer func() { - logger.Info("stopping the exporter") - if tempError := exp.Shutdown(context.Background()); tempError != nil { - logger.Error("failed to stop the exporter", zap.Error(tempError)) + expFunc := func() (sdkmetric.Exporter, error) { + var exp sdkmetric.Exporter + if cfg.UseHTTP { + logger.Info("starting HTTP exporter") + exp, err = otlpmetrichttp.New(context.Background(), httpExporterOptions(cfg)...) + } else { + logger.Info("starting gRPC exporter") + exp, err = otlpmetricgrpc.New(context.Background(), grpcExporterOptions(cfg)...) } - }() + return exp, err + } - if err = Run(cfg, exp, logger); err != nil { + if err = Run(cfg, expFunc, logger); err != nil { logger.Error("failed to stop the exporter", zap.Error(err)) return err } @@ -57,7 +50,7 @@ func Start(cfg *Config) error { } // Run executes the test scenario. -func Run(c *Config, exp sdkmetric.Exporter, logger *zap.Logger) error { +func Run(c *Config, exp func() (sdkmetric.Exporter, error), logger *zap.Logger) error { if c.TotalDuration > 0 { c.NumMetrics = 0 } else if c.NumMetrics <= 0 { diff --git a/cmd/telemetrygen/internal/metrics/worker.go b/cmd/telemetrygen/internal/metrics/worker.go index de430b5f5b1c..a939bc85817e 100644 --- a/cmd/telemetrygen/internal/metrics/worker.go +++ b/cmd/telemetrygen/internal/metrics/worker.go @@ -28,9 +28,22 @@ type worker struct { index int // worker index } -func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Exporter, signalAttrs []attribute.KeyValue) { +func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdkmetric.Exporter, error), signalAttrs []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) + exporter, err := exporterFunc() + if err != nil { + w.logger.Error("failed to create the exporter", zap.Error(err)) + return + } + + defer func() { + w.logger.Info("stopping the exporter") + if tempError := exporter.Shutdown(context.Background()); tempError != nil { + w.logger.Error("failed to stop the exporter", zap.Error(tempError)) + } + }() + var i int64 for w.running.Load() { var metrics []metricdata.Metrics diff --git a/cmd/telemetrygen/internal/metrics/worker_test.go b/cmd/telemetrygen/internal/metrics/worker_test.go index c6a85e53ae30..99f454d19dcc 100644 --- a/cmd/telemetrygen/internal/metrics/worker_test.go +++ b/cmd/telemetrygen/internal/metrics/worker_test.go @@ -51,6 +51,7 @@ func (m *mockExporter) Shutdown(_ context.Context) error { } func TestFixedNumberOfMetrics(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ WorkerCount: 1, @@ -58,20 +59,22 @@ func TestFixedNumberOfMetrics(t *testing.T) { NumMetrics: 5, MetricType: metricTypeSum, } + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - exp := &mockExporter{} - - // test + // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) - + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) - // verify - require.Len(t, exp.rms, 5) + // assert + require.Len(t, m.rms, 5) } func TestRateOfMetrics(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ Rate: 10, @@ -80,19 +83,23 @@ func TestRateOfMetrics(t *testing.T) { }, MetricType: metricTypeSum, } - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - // test - require.NoError(t, Run(cfg, exp, zap.NewNop())) + // act + require.NoError(t, Run(cfg, expFunc, zap.NewNop())) - // verify + // assert // the minimum acceptable number of metrics for the rate of 10/sec for half a second - assert.True(t, len(exp.rms) >= 6, "there should have been more than 6 metrics, had %d", len(exp.rms)) + assert.True(t, len(m.rms) >= 6, "there should have been more than 6 metrics, had %d", len(m.rms)) // the maximum acceptable number of metrics for the rate of 10/sec for half a second - assert.True(t, len(exp.rms) <= 20, "there should have been less than 20 metrics, had %d", len(exp.rms)) + assert.True(t, len(m.rms) <= 20, "there should have been less than 20 metrics, had %d", len(m.rms)) } func TestUnthrottled(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ TotalDuration: 1 * time.Second, @@ -100,31 +107,38 @@ func TestUnthrottled(t *testing.T) { }, MetricType: metricTypeSum, } - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - // test + // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) - assert.True(t, len(exp.rms) > 100, "there should have been more than 100 metrics, had %d", len(exp.rms)) + // assert + assert.True(t, len(m.rms) > 100, "there should have been more than 100 metrics, had %d", len(m.rms)) } func TestSumNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 cfg := configWithNoAttributes(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -137,18 +151,21 @@ func TestGaugeNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 cfg := configWithNoAttributes(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -161,18 +178,21 @@ func TestSumSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithOneAttribute(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -187,18 +207,21 @@ func TestGaugeSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithOneAttribute(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -213,18 +236,21 @@ func TestSumMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithMultipleAttributes(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms var actualValue attribute.Value for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] @@ -242,18 +268,21 @@ func TestGaugeMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithMultipleAttributes(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms var actualValue attribute.Value for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0]