Skip to content

Commit

Permalink
Move some content from correctness_test.go to utils.go (#1497)
Browse files Browse the repository at this point in the history
* Move some content from correctness_test.go to utils.go

This change makes these functions/types available from the metrics
package, where they will be needed to address issue #652.

* Add comments for exported types and fcns

* Address PR comments

* Fix lint
  • Loading branch information
pmcollins authored Aug 5, 2020
1 parent 51395f9 commit 56a22ad
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 134 deletions.
141 changes: 7 additions & 134 deletions testbed/correctness/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,24 @@
package correctness

import (
"bufio"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/service/defaultcomponents"
"go.opentelemetry.io/collector/testbed/testbed"
)

type PipelineDef struct {
receiver string
exporter string
testName string
dataSender testbed.DataSender
dataReceiver testbed.DataReceiver
resourceSpec testbed.ResourceSpec
}

var correctnessResults testbed.TestResultsSummary = &testbed.CorrectnessResults{}

func TestMain(m *testing.M) {
testbed.DoTestMain(m, correctnessResults)
}

func TestTracingGoldenData(t *testing.T) {
tests, err := loadPictOutputPipelineDefs("testdata/generated_pict_pairs_traces_pipeline.txt")
tests, err := LoadPictOutputPipelineDefs("testdata/generated_pict_pairs_traces_pipeline.txt")
assert.NoError(t, err)
processors := map[string]string{
"batch": `
Expand All @@ -56,87 +41,22 @@ func TestTracingGoldenData(t *testing.T) {
`,
}
for _, test := range tests {
test.testName = fmt.Sprintf("%s-%s", test.receiver, test.exporter)
test.dataSender = constructSender(t, test.receiver)
test.dataReceiver = constructReceiver(t, test.exporter)
t.Run(test.testName, func(t *testing.T) {
testWithTracingGoldenDataset(t, test.dataSender, test.dataReceiver, test.resourceSpec, processors)
test.TestName = fmt.Sprintf("%s-%s", test.Receiver, test.Exporter)
test.DataSender = ConstructTraceSender(t, test.Receiver)
test.DataReceiver = ConstructReceiver(t, test.Exporter)
t.Run(test.TestName, func(t *testing.T) {
testWithTracingGoldenDataset(t, test.DataSender, test.DataReceiver, test.ResourceSpec, processors)
})
}
}

func constructSender(t *testing.T, receiver string) testbed.DataSender {
var sender testbed.DataSender
switch receiver {
case "otlp":
sender = testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "opencensus":
sender = testbed.NewOCTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "jaeger":
sender = testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "zipkin":
sender = testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
default:
t.Errorf("unknown receiver type: %s", receiver)
}
return sender
}

func constructReceiver(t *testing.T, exporter string) testbed.DataReceiver {
var receiver testbed.DataReceiver
switch exporter {
case "otlp":
receiver = testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
case "opencensus":
receiver = testbed.NewOCDataReceiver(testbed.GetAvailablePort(t))
case "jaeger":
receiver = testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t))
case "zipkin":
receiver = testbed.NewZipkinDataReceiver(testbed.GetAvailablePort(t))
default:
t.Errorf("unknown exporter type: %s", exporter)
}
return receiver
}

func loadPictOutputPipelineDefs(fileName string) ([]PipelineDef, error) {
file, err := os.Open(filepath.Clean(fileName))
if err != nil {
return nil, err
}
defer func() {
cerr := file.Close()
if err == nil {
err = cerr
}
}()

defs := make([]PipelineDef, 0)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
s := strings.Split(scanner.Text(), "\t")
if "Receiver" == s[0] {
continue
}

var aDef PipelineDef
aDef.receiver, aDef.exporter = s[0], s[1]
defs = append(defs, aDef)
}

return defs, err
}

func testWithTracingGoldenDataset(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

dataProvider := testbed.NewGoldenDataProvider(
"../../internal/goldendataset/testdata/generated_pict_pairs_traces.txt",
"../../internal/goldendataset/testdata/generated_pict_pairs_spans.txt",
Expand All @@ -145,7 +65,7 @@ func testWithTracingGoldenDataset(
assert.NoError(t, err)
runner := testbed.NewInProcessCollector(factories, sender.GetCollectorPort())
validator := testbed.NewCorrectTestValidator(dataProvider)
config := createConfigYaml(t, sender, receiver, resultDir, processors)
config := CreateConfigYaml(sender, receiver, processors, "traces")
configCleanup, cfgErr := runner.PrepareConfig(config)
assert.NoError(t, cfgErr)
defer configCleanup()
Expand Down Expand Up @@ -182,50 +102,3 @@ func testWithTracingGoldenDataset(

tc.ValidateData()
}

func createConfigYaml(t *testing.T, sender testbed.DataSender, receiver testbed.DataReceiver, resultDir string,
processors map[string]string) string {

// Prepare extra processor config section and comma-separated list of extra processor
// names to use in corresponding "processors" settings.
processorsSections := ""
processorsList := ""
if len(processors) > 0 {
first := true
for name, cfg := range processors {
processorsSections += cfg + "\n"
if !first {
processorsList += ","
}
processorsList += name
first = false
}
}

format := `
receivers:%v
exporters:%v
processors:
%s
extensions:
service:
extensions:
pipelines:
traces:
receivers: [%v]
processors: [%s]
exporters: [%v]
`

return fmt.Sprintf(
format,
sender.GenConfigYAMLStr(),
receiver.GenConfigYAMLStr(),
processorsSections,
sender.ProtocolName(),
processorsList,
receiver.ProtocolName(),
)
}
163 changes: 163 additions & 0 deletions testbed/correctness/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package correctness

import (
"bufio"
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"go.opentelemetry.io/collector/testbed/testbed"
)

// CreateConfigYaml creates a yaml config for an otel collector given a testbed sender, testbed receiver, any
// processors, and a pipeline type. A collector created from the resulting yaml string should be able to talk
// the specified sender and receiver.
func CreateConfigYaml(
sender testbed.DataSender,
receiver testbed.DataReceiver,
processors map[string]string,
pipelineType string,
) string {

// Prepare extra processor config section and comma-separated list of extra processor
// names to use in corresponding "processors" settings.
processorsSections := ""
processorsList := ""
if len(processors) > 0 {
first := true
for name, cfg := range processors {
processorsSections += cfg + "\n"
if !first {
processorsList += ","
}
processorsList += name
first = false
}
}

format := `
receivers:%v
exporters:%v
processors:
%s
extensions:
service:
extensions:
pipelines:
%s:
receivers: [%v]
processors: [%s]
exporters: [%v]
`

return fmt.Sprintf(
format,
sender.GenConfigYAMLStr(),
receiver.GenConfigYAMLStr(),
processorsSections,
pipelineType,
sender.ProtocolName(),
processorsList,
receiver.ProtocolName(),
)
}

// PipelineDef holds the information necessary to run a single testbed configuration.
type PipelineDef struct {
Receiver string
Exporter string
TestName string
DataSender testbed.DataSender
DataReceiver testbed.DataReceiver
ResourceSpec testbed.ResourceSpec
}

// LoadPictOutputPipelineDefs generates a slice of PipelineDefs from the passed-in generated PICT file. The
// result should be a set of PipelineDefs that covers all possible pipeline configurations.
func LoadPictOutputPipelineDefs(fileName string) ([]PipelineDef, error) {
file, err := os.Open(filepath.Clean(fileName))
if err != nil {
return nil, err
}
defer func() {
cerr := file.Close()
if err == nil {
err = cerr
}
}()

defs := make([]PipelineDef, 0)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
s := strings.Split(scanner.Text(), "\t")
if "Receiver" == s[0] {
continue
}

var aDef PipelineDef
aDef.Receiver, aDef.Exporter = s[0], s[1]
defs = append(defs, aDef)
}

return defs, err
}

// ConstructTraceSender creates a testbed trace sender from the passed-in trace sender identifier.
func ConstructTraceSender(t *testing.T, receiver string) testbed.DataSender {
var sender testbed.DataSender
switch receiver {
case "otlp":
sender = testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "opencensus":
sender = testbed.NewOCTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "jaeger":
sender = testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "zipkin":
sender = testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
default:
t.Errorf("unknown receiver type: %s", receiver)
}
return sender
}

// ConstructMetricsSender creates a testbed metrics sender from the passed-in metrics sender identifier.
func ConstructMetricsSender(t *testing.T, receiver string) testbed.DataSender {
var sender testbed.DataSender
switch receiver {
case "otlp":
sender = testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
case "opencensus":
sender = testbed.NewOCMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
// will be uncommented in a subsequent PR
// case "prometheus":
// sender = testbed.NewPrometheusDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
default:
t.Errorf("unknown receiver type: %s", receiver)
}
return sender
}

// ConstructReceiver creates a testbed receiver from the passed-in recevier identifier.
func ConstructReceiver(t *testing.T, exporter string) testbed.DataReceiver {
var receiver testbed.DataReceiver
switch exporter {
case "otlp":
receiver = testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
case "opencensus":
receiver = testbed.NewOCDataReceiver(testbed.GetAvailablePort(t))
case "jaeger":
receiver = testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t))
case "zipkin":
receiver = testbed.NewZipkinDataReceiver(testbed.GetAvailablePort(t))
// will be uncommented in a subsequent PR
// case "prometheus":
// receiver = testbed.NewPrometheusDataReceiver(testbed.GetAvailablePort(t))
default:
t.Errorf("unknown exporter type: %s", exporter)
}
return receiver
}

0 comments on commit 56a22ad

Please sign in to comment.