diff --git a/internal/e2e/instrumentation.go b/internal/e2e/instrumentation.go new file mode 100644 index 0000000..878fcbb --- /dev/null +++ b/internal/e2e/instrumentation.go @@ -0,0 +1,115 @@ +package e2e + +import ( + "context" + "log" + "path/filepath" + "time" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +const ( + receivingContainer = "receiver" + testedContainer = "sut" + generatingContainer = "generator" + port = 17016 + collectorRunningPeriod = 35 * time.Second + samplesCount = 10 +) + +func runReceivingSolarWindsOTELCollector( + ctx context.Context, + networkName string, +) (testcontainers.Container, error) { + configPath, err := filepath.Abs(filepath.Join(".", "testdata", "receiving_collector.yaml")) + if err != nil { + return nil, err + } + + container, err := runSolarWindsOTELCollector(ctx, networkName, receivingContainer, configPath) + return container, err +} + +func runTestedSolarWindsOTELCollector( + ctx context.Context, + networkName string, +) (testcontainers.Container, error) { + configPath, err := filepath.Abs(filepath.Join(".", "testdata", "emitting_collector.yaml")) + if err != nil { + return nil, err + } + + container, err := runSolarWindsOTELCollector(ctx, networkName, testedContainer, configPath) + return container, err +} + +func runSolarWindsOTELCollector( + ctx context.Context, + networkName string, + containerName string, + configPath string, +) (testcontainers.Container, error) { + lc := new(logConsumer) + lc.Prefix = containerName + req := testcontainers.ContainerRequest{ + Image: "solarwinds-otel-collector:latest", + LogConsumerCfg: &testcontainers.LogConsumerConfig{ + Consumers: []testcontainers.LogConsumer{lc}, + }, + Files: []testcontainers.ContainerFile{ + { + HostFilePath: configPath, + ContainerFilePath: "/opt/default-config.yaml", + FileMode: 0o440, + }, + }, + WaitingFor: wait.ForLog("Everything is ready. Begin running and processing data."), + Networks: []string{networkName}, + Name: containerName, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + + return container, err +} + +func runGeneratorContainer( + ctx context.Context, + networkName string, + cmd []string, +) (testcontainers.Container, error) { + containerName := generatingContainer + + lc := new(logConsumer) + lc.Prefix = containerName + + req := testcontainers.ContainerRequest{ + Image: "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest", + LogConsumerCfg: &testcontainers.LogConsumerConfig{ + Consumers: []testcontainers.LogConsumer{lc}, + }, + Networks: []string{networkName}, + Name: containerName, + Cmd: cmd, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + + return container, err +} + +type logConsumer struct { + Prefix string +} + +func (lc *logConsumer) Accept(l testcontainers.Log) { + log.Printf("***%s: %s", lc.Prefix, string(l.Content)) +} diff --git a/internal/e2e/integration_test.go b/internal/e2e/integration_test.go index e268ff0..8d0b5ec 100644 --- a/internal/e2e/integration_test.go +++ b/internal/e2e/integration_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "log" - "path/filepath" "strconv" "strings" "testing" @@ -18,16 +17,6 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" - "github.com/testcontainers/testcontainers-go/wait" -) - -const ( - receivingContainer = "receiver" - testedContainer = "sut" - generatingContainer = "generator" - port = 17016 - collectorRunningPeriod = 35 * time.Second - samplesCount = 10 ) func TestMetricStream(t *testing.T) { @@ -58,37 +47,45 @@ func TestMetricStream(t *testing.T) { testcontainers.CleanupContainer(t, gContainer) <-time.After(collectorRunningPeriod) - log.Println("***: evaluation in progress") evaluateMetricsStream(t, ctx, rContainer, samplesCount) } -func evaluateMetricsStream( - t *testing.T, - ctx context.Context, - container testcontainers.Container, - expectedCount int, -) { - // Obtain result from container. - lines, err := loadResultFile(ctx, container, "/tmp/result.json") +func TestTracesStream(t *testing.T) { + ctx := context.Background() + + net, err := network.New(ctx) require.NoError(t, err) + testcontainers.CleanupNetwork(t, net) - ms := pmetric.NewMetrics() - jum := new(pmetric.JSONUnmarshaler) - for _, line := range lines { - m, err := jum.UnmarshalMetrics([]byte(line)) - if err != nil { - continue - } + rContainer, err := runReceivingSolarWindsOTELCollector(ctx, net.Name) + require.NoError(t, err) + testcontainers.CleanupContainer(t, rContainer) - require.Equal(t, m.ResourceMetrics().Len(), 1, "it must contain exactly one resource metric") - evaluateResourceAttributes(t, m.ResourceMetrics().At(0).Resource().Attributes()) - m.ResourceMetrics().MoveAndAppendTo(ms.ResourceMetrics()) + eContainer, err := runTestedSolarWindsOTELCollector(ctx, net.Name) + require.NoError(t, err) + testcontainers.CleanupContainer(t, eContainer) + + cmd := []string{ + "traces", + "--traces", strconv.Itoa(samplesCount), + "--otlp-insecure", + "--otlp-endpoint", "sut:17016", + "--otlp-attributes", "resource.attributes.testing_attribute=\"testing_value\"", } - require.Equal(t, ms.MetricCount(), expectedCount) + + gContainer, err := runGeneratorContainer(ctx, net.Name, cmd) + require.NoError(t, err) + testcontainers.CleanupContainer(t, gContainer) + + <-time.After(collectorRunningPeriod) + + // Traces coming in couples. + expectedTracesCount := samplesCount * 2 + evaluateTracesStream(t, ctx, rContainer, expectedTracesCount) } -func TestTracesStream(t *testing.T) { +func TestLogsStream(t *testing.T) { ctx := context.Background() net, err := network.New(ctx) @@ -104,8 +101,8 @@ func TestTracesStream(t *testing.T) { testcontainers.CleanupContainer(t, eContainer) cmd := []string{ - "traces", - "--traces", strconv.Itoa(samplesCount), + "logs", + "--logs", strconv.Itoa(samplesCount), "--otlp-insecure", "--otlp-endpoint", "sut:17016", "--otlp-attributes", "resource.attributes.testing_attribute=\"testing_value\"", @@ -116,10 +113,50 @@ func TestTracesStream(t *testing.T) { testcontainers.CleanupContainer(t, gContainer) <-time.After(collectorRunningPeriod) - log.Println("***: evaluation in progress") - expectedTracesCount := samplesCount * 2 - evaluateTracesStream(t, ctx, rContainer, expectedTracesCount) + evaluateLogsStream(t, ctx, rContainer, samplesCount) +} + +func evaluateMetricsStream( + t *testing.T, + ctx context.Context, + container testcontainers.Container, + expectedCount int, +) { + // Obtain result from container. + lines, err := loadResultFile(ctx, container, "/tmp/result.json") + require.NoError(t, err) + + gms := pmetric.NewMetrics() + hbms := pmetric.NewMetrics() + jum := new(pmetric.JSONUnmarshaler) + for _, line := range lines { + m, err := jum.UnmarshalMetrics([]byte(line)) + if err != nil || m.ResourceMetrics().Len() == 0 { + continue + } + + if m.ResourceMetrics().At(0).ScopeMetrics().Len() == 0 || + m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len() == 0 { + continue + } + + heartbeatMetricName := "sw.otelcol.uptime" + generatedMetricName := "gen" + metricName := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name() + + switch metricName { + case generatedMetricName: + evaluateResourceAttributes(t, m.ResourceMetrics().At(0).Resource().Attributes()) + m.ResourceMetrics().MoveAndAppendTo(gms.ResourceMetrics()) + case heartbeatMetricName: + m.ResourceMetrics().MoveAndAppendTo(hbms.ResourceMetrics()) + default: + continue + } + } + require.Equal(t, gms.MetricCount(), expectedCount) + evaluateHeartbeetMetrics(t, hbms) } func evaluateTracesStream( @@ -157,40 +194,6 @@ func evaluateTracesStream( require.Equal(t, expectedCount, trs.SpanCount()) } -func TestLogsStream(t *testing.T) { - ctx := context.Background() - - net, err := network.New(ctx) - require.NoError(t, err) - testcontainers.CleanupNetwork(t, net) - - rContainer, err := runReceivingSolarWindsOTELCollector(ctx, net.Name) - require.NoError(t, err) - testcontainers.CleanupContainer(t, rContainer) - - eContainer, err := runTestedSolarWindsOTELCollector(ctx, net.Name) - require.NoError(t, err) - testcontainers.CleanupContainer(t, eContainer) - - cmd := []string{ - "logs", - "--logs", strconv.Itoa(samplesCount), - "--otlp-insecure", - "--otlp-endpoint", "sut:17016", - "--otlp-attributes", "resource.attributes.testing_attribute=\"testing_value\"", - } - - gContainer, err := runGeneratorContainer(ctx, net.Name, cmd) - require.NoError(t, err) - testcontainers.CleanupContainer(t, gContainer) - - <-time.After(collectorRunningPeriod) - log.Println("***: evaluation in progress") - - expectedLogsCount := 10 - evaluateLogsStream(t, ctx, rContainer, expectedLogsCount) -} - func evaluateLogsStream( t *testing.T, ctx context.Context, @@ -246,110 +249,6 @@ func evaluateResourceAttributes( require.Equal(t, val.AsString(), "testing_value", "testing attribute value must be the same") } -func runReceivingSolarWindsOTELCollector( - ctx context.Context, - networkName string, -) (testcontainers.Container, error) { - containerName := receivingContainer - - configPath, err := filepath.Abs(filepath.Join(".", "testdata", "receiving_collector.yaml")) - if err != nil { - return nil, err - } - - lc := new(MyLogConsumer) - lc.Prefix = containerName - req := testcontainers.ContainerRequest{ - Image: "solarwinds-otel-collector:latest", - LogConsumerCfg: &testcontainers.LogConsumerConfig{ - Consumers: []testcontainers.LogConsumer{lc}, - }, - Files: []testcontainers.ContainerFile{ - { - HostFilePath: configPath, - ContainerFilePath: "/opt/default-config.yaml", - FileMode: 0o440, - }, - }, - WaitingFor: wait.ForLog("Everything is ready. Begin running and processing data."), - Networks: []string{networkName}, - Name: containerName, - } - - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - - return container, err -} - -func runTestedSolarWindsOTELCollector( - ctx context.Context, - networkName string, -) (testcontainers.Container, error) { - containerName := testedContainer - - configPath, err := filepath.Abs(filepath.Join(".", "testdata", "emitting_collector.yaml")) - if err != nil { - return nil, err - } - - lc := new(MyLogConsumer) - lc.Prefix = containerName - req := testcontainers.ContainerRequest{ - Image: "solarwinds-otel-collector:latest", - LogConsumerCfg: &testcontainers.LogConsumerConfig{ - Consumers: []testcontainers.LogConsumer{lc}, - }, - Files: []testcontainers.ContainerFile{ - { - HostFilePath: configPath, - ContainerFilePath: "/opt/default-config.yaml", - FileMode: 0o440, - }, - }, - WaitingFor: wait.ForLog("Everything is ready. Begin running and processing data."), - Networks: []string{networkName}, - Name: containerName, - } - - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - - return container, err -} - -func runGeneratorContainer( - ctx context.Context, - networkName string, - cmd []string, -) (testcontainers.Container, error) { - containerName := generatingContainer - - lc := new(MyLogConsumer) - lc.Prefix = containerName - - req := testcontainers.ContainerRequest{ - Image: "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest", - LogConsumerCfg: &testcontainers.LogConsumerConfig{ - Consumers: []testcontainers.LogConsumer{lc}, - }, - Networks: []string{networkName}, - Name: containerName, - Cmd: cmd, - } - - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - - return container, err -} - func loadResultFile( ctx context.Context, container testcontainers.Container, @@ -365,15 +264,7 @@ func loadResultFile( return make([]string, 0), err } - log.Print("*** raw content:\n" + string(content) + "\n") + log.Print("*** raw result content:\n" + string(content) + "\n") lines := strings.Split(string(content), "\n") return lines, nil } - -type MyLogConsumer struct { - Prefix string -} - -func (lc *MyLogConsumer) Accept(l testcontainers.Log) { - log.Printf("***%s: %s", lc.Prefix, string(l.Content)) -}