Skip to content

Commit

Permalink
Updated loadbalancing test to verify routing_key changes
Browse files Browse the repository at this point in the history
  • Loading branch information
madaraszg-tulip committed Dec 10, 2024
1 parent beaa50b commit 5ad5662
Showing 1 changed file with 210 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,27 +1,237 @@
package loadbalancing_test

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
"github.com/grafana/dskit/backoff"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"google.golang.org/grpc"
)

func getPtrToUint(v uint16) *uint16 {
res := &v
return res
}

// Test performs a basic integration test which runs the otelcol.exporter.loadbalancing
// component and ensures that it can pass data to an OTLP gRPC server.
func Test(t *testing.T) {
traceCh := make(chan ptrace.Traces)
tracesServer := makeTracesServer(t, traceCh)

ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.loadbalancing")
require.NoError(t, err)

cfgTemplate := `
routing_key = "%s"
resolver {
static {
hostnames = ["%s"]
}
}
protocol {
otlp {
client {
compression = "none"
tls {
insecure = true
insecure_skip_verify = true
}
}
}
}
debug_metrics {
disable_high_cardinality_metrics = true
}
`

cfg := fmt.Sprintf(cfgTemplate, "traceID", tracesServer)
var args loadbalancing.Arguments
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
require.Equal(t, args.DebugMetricsConfig().DisableHighCardinalityMetrics, true)

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our exporter.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our exporter to finish and pass data to our rpc server.
select {
case <-time.After(time.Second):
require.FailNow(t, "failed waiting for traces")
case tr := <-traceCh:
require.Equal(t, 1, tr.SpanCount())
}

// Update the config to disable traces export
cfg = fmt.Sprintf(cfgTemplate, "metric", tracesServer)
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
ctrl.Update(args)

// Send traces in the background to our exporter.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MaxRetries: 3,
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
require.ErrorContains(t, err, "telemetry type is not supported")
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our exporter to finish and pass data to our rpc server.
// no error here, as we we expect to fail sending in the first place
select {
case <-traceCh:
require.FailNow(t, "no traces expected here")
case <-time.After(time.Second):
}

// Re-run the test with reenabled traces export
cfg = fmt.Sprintf(cfgTemplate, "traceID", tracesServer)
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
ctrl.Update(args)

// Send traces in the background to our exporter.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our exporter to finish and pass data to our rpc server.
select {
case <-time.After(time.Second):
require.FailNow(t, "failed waiting for traces")
case tr := <-traceCh:
require.Equal(t, 1, tr.SpanCount())
}
}

// makeTracesServer returns a host:port which will accept traces over insecure
// gRPC.
func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string {
t.Helper()

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

srv := grpc.NewServer()
ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch})

go func() {
err := srv.Serve(lis)
require.NoError(t, err)
}()
t.Cleanup(srv.Stop)

return lis.Addr().String()
}

type mockTracesReceiver struct {
ptraceotlp.UnimplementedGRPCServer
ch chan ptrace.Traces
}

var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil)

func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
ms.ch <- req.Traces()
return ptraceotlp.NewExportResponse(), nil
}

func createTestTraces() ptrace.Traces {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
bb := `{
"resource_spans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
}`

decoder := &ptrace.JSONUnmarshaler{}
data, err := decoder.UnmarshalTraces([]byte(bb))
if err != nil {
panic(err)
}
return data
}

func TestConfigConversion(t *testing.T) {
var (
defaultRetrySettings = configretry.NewDefaultBackOffConfig()
Expand Down

0 comments on commit 5ad5662

Please sign in to comment.