diff --git a/.chloggen/telemetrygen-add-exporter-per-worker.yaml b/.chloggen/telemetrygen-add-exporter-per-worker.yaml new file mode 100644 index 000000000000..6b591d9e9fd1 --- /dev/null +++ b/.chloggen/telemetrygen-add-exporter-per-worker.yaml @@ -0,0 +1,25 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use exporter per worker for better metrics throughput + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26709] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/cmd/telemetrygen/README.md b/cmd/telemetrygen/README.md index 6c329d305f39..f13788b27e95 100644 --- a/cmd/telemetrygen/README.md +++ b/cmd/telemetrygen/README.md @@ -28,28 +28,44 @@ Check the [`go install` reference](https://go.dev/ref/mod#go-install) to install First, you'll need an OpenTelemetry Collector to receive the telemetry data. Follow the project's instructions for a detailed setting up guide. The following configuration file should be sufficient: +config.yaml: ```yaml receivers: otlp: protocols: grpc: - endpoint: localhost:4317 + endpoint: 0.0.0.0:4317 processors: + batch: exporters: debug: + verbosity: detailed service: pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] traces: - receivers: - - otlp - processors: [] - exporters: - - debug + receivers: [otlp] + processors: [batch] + exporters: [debug] ``` +Starting OpenTelemetry collector via docker: +``` +docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.86.0 +``` + +Other options for running the collector are documented here https://opentelemetry.io/docs/collector/getting-started/ + Once the OpenTelemetry Collector instance is up and running, run `telemetrygen` for your desired telemetry: ### Traces @@ -65,3 +81,15 @@ telemetrygen traces --otlp-insecure --traces 1 ``` Check `telemetrygen traces --help` for all the options. + +### Logs + +```console +telemetrygen logs --duration 5s --otlp-insecure +``` + +### Metrics + +```console +telemetrygen metrics --duration 5s --otlp-insecure +``` \ No newline at end of file diff --git a/cmd/telemetrygen/internal/metrics/metrics.go b/cmd/telemetrygen/internal/metrics/metrics.go index a67271715f43..0061d64b3695 100644 --- a/cmd/telemetrygen/internal/metrics/metrics.go +++ b/cmd/telemetrygen/internal/metrics/metrics.go @@ -29,26 +29,19 @@ func Start(cfg *Config) error { } logger.Info("starting the metrics generator with configuration", zap.Any("config", cfg)) - var exp sdkmetric.Exporter - if cfg.UseHTTP { - logger.Info("starting HTTP exporter") - exp, err = otlpmetrichttp.New(context.Background(), httpExporterOptions(cfg)...) - } else { - logger.Info("starting gRPC exporter") - exp, err = otlpmetricgrpc.New(context.Background(), grpcExporterOptions(cfg)...) - } - - if err != nil { - return fmt.Errorf("failed to obtain OTLP exporter: %w", err) - } - defer func() { - logger.Info("stopping the exporter") - if tempError := exp.Shutdown(context.Background()); tempError != nil { - logger.Error("failed to stop the exporter", zap.Error(tempError)) + expFunc := func() (sdkmetric.Exporter, error) { + var exp sdkmetric.Exporter + if cfg.UseHTTP { + logger.Info("starting HTTP exporter") + exp, err = otlpmetrichttp.New(context.Background(), httpExporterOptions(cfg)...) + } else { + logger.Info("starting gRPC exporter") + exp, err = otlpmetricgrpc.New(context.Background(), grpcExporterOptions(cfg)...) } - }() + return exp, err + } - if err = Run(cfg, exp, logger); err != nil { + if err = Run(cfg, expFunc, logger); err != nil { logger.Error("failed to stop the exporter", zap.Error(err)) return err } @@ -57,7 +50,7 @@ func Start(cfg *Config) error { } // Run executes the test scenario. -func Run(c *Config, exp sdkmetric.Exporter, logger *zap.Logger) error { +func Run(c *Config, exp func() (sdkmetric.Exporter, error), logger *zap.Logger) error { if c.TotalDuration > 0 { c.NumMetrics = 0 } else if c.NumMetrics <= 0 { diff --git a/cmd/telemetrygen/internal/metrics/worker.go b/cmd/telemetrygen/internal/metrics/worker.go index de430b5f5b1c..a939bc85817e 100644 --- a/cmd/telemetrygen/internal/metrics/worker.go +++ b/cmd/telemetrygen/internal/metrics/worker.go @@ -28,9 +28,22 @@ type worker struct { index int // worker index } -func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Exporter, signalAttrs []attribute.KeyValue) { +func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdkmetric.Exporter, error), signalAttrs []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) + exporter, err := exporterFunc() + if err != nil { + w.logger.Error("failed to create the exporter", zap.Error(err)) + return + } + + defer func() { + w.logger.Info("stopping the exporter") + if tempError := exporter.Shutdown(context.Background()); tempError != nil { + w.logger.Error("failed to stop the exporter", zap.Error(tempError)) + } + }() + var i int64 for w.running.Load() { var metrics []metricdata.Metrics diff --git a/cmd/telemetrygen/internal/metrics/worker_test.go b/cmd/telemetrygen/internal/metrics/worker_test.go index c6a85e53ae30..99f454d19dcc 100644 --- a/cmd/telemetrygen/internal/metrics/worker_test.go +++ b/cmd/telemetrygen/internal/metrics/worker_test.go @@ -51,6 +51,7 @@ func (m *mockExporter) Shutdown(_ context.Context) error { } func TestFixedNumberOfMetrics(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ WorkerCount: 1, @@ -58,20 +59,22 @@ func TestFixedNumberOfMetrics(t *testing.T) { NumMetrics: 5, MetricType: metricTypeSum, } + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - exp := &mockExporter{} - - // test + // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) - + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) - // verify - require.Len(t, exp.rms, 5) + // assert + require.Len(t, m.rms, 5) } func TestRateOfMetrics(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ Rate: 10, @@ -80,19 +83,23 @@ func TestRateOfMetrics(t *testing.T) { }, MetricType: metricTypeSum, } - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - // test - require.NoError(t, Run(cfg, exp, zap.NewNop())) + // act + require.NoError(t, Run(cfg, expFunc, zap.NewNop())) - // verify + // assert // the minimum acceptable number of metrics for the rate of 10/sec for half a second - assert.True(t, len(exp.rms) >= 6, "there should have been more than 6 metrics, had %d", len(exp.rms)) + assert.True(t, len(m.rms) >= 6, "there should have been more than 6 metrics, had %d", len(m.rms)) // the maximum acceptable number of metrics for the rate of 10/sec for half a second - assert.True(t, len(exp.rms) <= 20, "there should have been less than 20 metrics, had %d", len(exp.rms)) + assert.True(t, len(m.rms) <= 20, "there should have been less than 20 metrics, had %d", len(m.rms)) } func TestUnthrottled(t *testing.T) { + // arrange cfg := &Config{ Config: common.Config{ TotalDuration: 1 * time.Second, @@ -100,31 +107,38 @@ func TestUnthrottled(t *testing.T) { }, MetricType: metricTypeSum, } - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } - // test + // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) - assert.True(t, len(exp.rms) > 100, "there should have been more than 100 metrics, had %d", len(exp.rms)) + // assert + assert.True(t, len(m.rms) > 100, "there should have been more than 100 metrics, had %d", len(m.rms)) } func TestSumNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 cfg := configWithNoAttributes(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -137,18 +151,21 @@ func TestGaugeNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 cfg := configWithNoAttributes(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -161,18 +178,21 @@ func TestSumSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithOneAttribute(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -187,18 +207,21 @@ func TestGaugeSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithOneAttribute(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] // @note update when telemetrygen allow other metric types @@ -213,18 +236,21 @@ func TestSumMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithMultipleAttributes(metricTypeSum, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms var actualValue attribute.Value for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0] @@ -242,18 +268,21 @@ func TestGaugeMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 cfg := configWithMultipleAttributes(metricTypeGauge, qty) - exp := &mockExporter{} + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, exp, logger)) + require.NoError(t, Run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // asserts - require.Len(t, exp.rms, qty) + require.Len(t, m.rms, qty) - rms := exp.rms + rms := m.rms var actualValue attribute.Value for i := 0; i < qty; i++ { ms := rms[i].ScopeMetrics[0].Metrics[0]