diff --git a/Makefile b/Makefile index b20efeeddbf..2c2fbdd8b41 100644 --- a/Makefile +++ b/Makefile @@ -54,11 +54,13 @@ all: checklicense impi lint misspell test otelcol .PHONY: testbed-runtests testbed-runtests: otelcol + cd ./testbed/correctness && ./runtests.sh cd ./testbed/tests && ./runtests.sh .PHONY: testbed-listtests testbed-listtests: - TESTBED_CONFIG=local.yaml $(GOTEST) -v ./testbed/tests --test.list '.*'|head -n -1 + TESTBED_CONFIG=inprocess.yaml $(GOTEST) -v ./testbed/correctness --test.list '.*'|head -n 10 + TESTBED_CONFIG=local.yaml $(GOTEST) -v ./testbed/tests --test.list '.*'|head -n 20 .PHONY: test test: diff --git a/service/service.go b/service/service.go index 7029daa1b01..2da897deda9 100644 --- a/service/service.go +++ b/service/service.go @@ -228,6 +228,15 @@ func (app *Application) RegisterZPages(mux *http.ServeMux, pathPrefix string) { mux.HandleFunc(path.Join(pathPrefix, extensionzPath), app.handleExtensionzRequest) } +func (app *Application) SignalTestComplete() { + defer func() { + if r := recover(); r != nil { + app.logger.Info("stopTestChan already closed") + } + }() + close(app.stopTestChan) +} + func (app *Application) init() error { l, err := newLogger() if err != nil { diff --git a/service/service_test.go b/service/service_test.go index 8677dda62d1..77026c2a48d 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/version" "go.opentelemetry.io/collector/service/defaultcomponents" "go.opentelemetry.io/collector/testutils" ) @@ -74,6 +75,47 @@ func TestApplication_Start(t *testing.T) { assert.Equal(t, Closed, <-app.GetStateChannel()) } +func TestApplication_StartAsGoRoutine(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + params := Parameters{ + ApplicationStartInfo: ApplicationStartInfo{ + ExeName: "otelcol", + LongName: "InProcess Collector", + Version: version.Version, + GitHash: version.GitHash, + }, + ConfigFactory: func(v *viper.Viper, factories config.Factories) (*configmodels.Config, error) { + return constructMimumalOpConfig(t, factories), nil + }, + Factories: factories, + } + app, err := New(params) + require.NoError(t, err) + app.Command().SetArgs([]string{ + "--metrics-level=NONE", + }) + + appDone := make(chan struct{}) + go func() { + defer close(appDone) + appErr := app.Start() + if appErr != nil { + err = appErr + } + }() + + assert.Equal(t, Starting, <-app.GetStateChannel()) + assert.Equal(t, Running, <-app.GetStateChannel()) + + app.SignalTestComplete() + app.SignalTestComplete() + <-appDone + assert.Equal(t, Closing, <-app.GetStateChannel()) + assert.Equal(t, Closed, <-app.GetStateChannel()) +} + // isAppAvailable checks if the healthcheck server at the given endpoint is // returning `available`. func isAppAvailable(t *testing.T, healthCheckEndPoint string) bool { @@ -412,3 +454,32 @@ func TestApplication_GetExporters(t *testing.T) { close(app.stopTestChan) <-appDone } + +func constructMimumalOpConfig(t *testing.T, factories config.Factories) *configmodels.Config { + configStr := ` +receivers: + otlp: +exporters: + logging: +processors: + batch: + +extensions: + +service: + extensions: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging] +` + v := viper.NewWithOptions(viper.KeyDelimiter("::")) + v.SetConfigType("yaml") + v.ReadConfig(strings.NewReader(configStr)) + cfg, err := config.Load(v, factories) + assert.NoError(t, err) + err = config.ValidateConfig(cfg, zap.NewNop()) + assert.NoError(t, err) + return cfg +} diff --git a/testbed/README.md b/testbed/README.md index 6770948c948..9e6a51d4065 100644 --- a/testbed/README.md +++ b/testbed/README.md @@ -1,4 +1,41 @@ # OpenTelemetry Collector Testbed -Testbed is a controlled environment and tools for conducting performance tests for the Otel Collector, -including reproducible short-term benchmarks,long-running stability tests and maximum load stress tests. +Testbed is a controlled environment and tools for conducting end-to-end tests for the Otel Collector, +including reproducible short-term benchmarks, correctness tests, long-running stability tests and +maximum load stress tests. + +## Usage + +For each type of tests that should have a summary report create a new directory and then a test suite function +which utilizes `*testing.M`. This function should delegate all functionality to `testbed.DoTestMain` supplying +a global instance of `testbed.TestResultsSummary` to it. + +Each test case within the suite should create a `testbed.TestCase` and supply implementations of each of the various +interfaces the `NewTestCase` function takes as parameters. + +## Pluggable Test Components + +* `DataProvider` - Generates test data to send to receiver under test. + * `PerfTestDataProvider` - Implementation of the `DataProvider` for use in performance tests. Tracing IDs are based on the incremented batch and data items counters. + * `GoldenDataProvider` - Implementation of `DataProvider` for use in correctness tests. Provides data from the "Golden" dataset generated using pairwise combinatorial testing techniques. +* `DataSender` - Sends data to the collector instance under test. + * `JaegerGRPCDataSender` - Implementation of `DataSender` which sends to `jaeger` receiver. + * `OCTraceDataSender` - Implementation of `DataSender` which sends to `opencensus` receiver. + * `OCMetricsDataSender` - Implementation of `DataSender` which sends to `opencensus` receiver. + * `OTLPTraceDataSender` - Implementation of `DataSender` which sends to `otlp` receiver. + * `OTLPMetricsDataSender` - Implementation of `DataSender` which sends to `otlp` receiver. + * `ZipkinDataSender` - Implementation of `DataSender` which sends to `zipkin` receiver. +* `DataReceiver` - Receives data from the collector instance under test and stores it for use in test assertions. + * `OCDataReceiver` - Implementation of `DataReceiver` which receives data from `opencensus` exporter. + * `JaegerDataReceiver` - Implementation of `DataReceiver` which receives data from `jaeger` exporter. + * `OTLPDataReceiver` - Implementation of `DataReceiver` which receives data from `otlp` exporter. + * `ZipkinDataReceiver` - Implementation of `DataReceiver` which receives data from `zipkin` exporter. +* `OtelcolRunner` - Configures, starts and stops one or more instances of otelcol which will be the subject of testing being executed. + * `ChildProcess` - Implementation of `OtelcolRunner` runs a single otelcol as a child process on the same machine as the test executor. + * `InProcessCollector` - Implementation of `OtelcolRunner` runs a single otelcol as a go routine within the same process as the test executor. +* `TestCaseValidator` - Validates and reports on test results. + * `PerfTestValidator` - Implementation of `TestCaseValidator` for test suites using `PerformanceResults` for summarizing results. + * `CorrectnessTestValidator` - Implementation of `TestCaseValidator` for test suites using `CorrectnessResults` for summarizing results. +* `TestResultsSummary` - Records itemized test case results plus a summary of one category of testing. + * `PerformanceResults` - Implementation of `TestResultsSummary` with fields suitable for reporting performance test results. + * `CorrectnessResults` - Implementation of `TestResultsSummary` with fields suitable for reporting data translation correctness test results. diff --git a/testbed/correctness/.gitignore b/testbed/correctness/.gitignore new file mode 100644 index 00000000000..a61c5ef81e8 --- /dev/null +++ b/testbed/correctness/.gitignore @@ -0,0 +1,2 @@ +results/* +!results/BASELINE.md diff --git a/testbed/correctness/correctness_test.go b/testbed/correctness/correctness_test.go new file mode 100644 index 00000000000..9e95b4bf4c3 --- /dev/null +++ b/testbed/correctness/correctness_test.go @@ -0,0 +1,231 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package correctness + +import ( + "bufio" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/service/defaultcomponents" + "go.opentelemetry.io/collector/testbed/testbed" +) + +type PipelineDef struct { + receiver string + exporter string + testName string + dataSender testbed.DataSender + dataReceiver testbed.DataReceiver + resourceSpec testbed.ResourceSpec +} + +var correctnessResults testbed.TestResultsSummary = &testbed.CorrectnessResults{} + +func TestMain(m *testing.M) { + testbed.DoTestMain(m, correctnessResults) +} + +func TestTracingGoldenData(t *testing.T) { + tests, err := loadPictOutputPipelineDefs("testdata/generated_pict_pairs_traces_pipeline.txt") + assert.NoError(t, err) + processors := map[string]string{ + "batch": ` + batch: + send_batch_size: 1024 +`, + } + for _, test := range tests { + test.testName = fmt.Sprintf("%s-%s", test.receiver, test.exporter) + test.dataSender = constructSender(t, test.receiver) + test.dataReceiver = constructReceiver(t, test.exporter) + t.Run(test.testName, func(t *testing.T) { + testWithTracingGoldenDataset(t, test.dataSender, test.dataReceiver, test.resourceSpec, processors) + }) + } +} + +func constructSender(t *testing.T, receiver string) testbed.DataSender { + var sender testbed.DataSender + switch receiver { + case "otlp": + sender = testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + case "opencensus": + sender = testbed.NewOCTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + case "jaeger": + sender = testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + case "zipkin": + sender = testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + default: + t.Errorf("unknown receiver type: %s", receiver) + } + return sender +} + +func constructReceiver(t *testing.T, exporter string) testbed.DataReceiver { + var receiver testbed.DataReceiver + switch exporter { + case "otlp": + receiver = testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + case "opencensus": + receiver = testbed.NewOCDataReceiver(testbed.GetAvailablePort(t)) + case "jaeger": + receiver = testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t)) + case "zipkin": + receiver = testbed.NewZipkinDataReceiver(testbed.GetAvailablePort(t)) + default: + t.Errorf("unknown exporter type: %s", exporter) + } + return receiver +} + +func loadPictOutputPipelineDefs(fileName string) ([]PipelineDef, error) { + file, err := os.Open(filepath.Clean(fileName)) + if err != nil { + return nil, err + } + defer func() { + cerr := file.Close() + if err == nil { + err = cerr + } + }() + + defs := make([]PipelineDef, 0) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + s := strings.Split(scanner.Text(), "\t") + if "Receiver" == s[0] { + continue + } + + var aDef PipelineDef + aDef.receiver, aDef.exporter = s[0], s[1] + defs = append(defs, aDef) + } + + return defs, err +} + +func testWithTracingGoldenDataset( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + resourceSpec testbed.ResourceSpec, + processors map[string]string, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + dataProvider := testbed.NewGoldenDataProvider( + "../../internal/goldendataset/testdata/generated_pict_pairs_traces.txt", + "../../internal/goldendataset/testdata/generated_pict_pairs_spans.txt", + 161803) + factories, err := defaultcomponents.Components() + assert.NoError(t, err) + runner := testbed.NewInProcessCollector(factories, sender.GetCollectorPort()) + validator := testbed.NewCorrectTestValidator(dataProvider) + config := createConfigYaml(t, sender, receiver, resultDir, processors) + configCleanup, cfgErr := runner.PrepareConfig(config) + assert.NoError(t, cfgErr) + defer configCleanup() + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + runner, + validator, + correctnessResults, + ) + defer tc.Stop() + + tc.SetResourceLimits(resourceSpec) + tc.EnableRecording() + tc.StartBackend() + tc.StartAgent("--metrics-level=NONE") + + tc.StartLoad(testbed.LoadOptions{ + DataItemsPerSecond: 1024, + ItemsPerBatch: 1, + }) + + duration := time.Second + tc.Sleep(duration) + + tc.StopLoad() + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, + duration, "all data items received") + + tc.StopAgent() + + tc.ValidateData() +} + +func createConfigYaml(t *testing.T, sender testbed.DataSender, receiver testbed.DataReceiver, resultDir string, + processors map[string]string) string { + + // Prepare extra processor config section and comma-separated list of extra processor + // names to use in corresponding "processors" settings. + processorsSections := "" + processorsList := "" + if len(processors) > 0 { + first := true + for name, cfg := range processors { + processorsSections += cfg + "\n" + if !first { + processorsList += "," + } + processorsList += name + first = false + } + } + + format := ` +receivers:%v +exporters:%v +processors: + %s + +extensions: + +service: + extensions: + pipelines: + traces: + receivers: [%v] + processors: [%s] + exporters: [%v] +` + + return fmt.Sprintf( + format, + sender.GenConfigYAMLStr(), + receiver.GenConfigYAMLStr(), + processorsSections, + sender.ProtocolName(), + processorsList, + receiver.ProtocolName(), + ) +} diff --git a/testbed/correctness/inprocess.yaml b/testbed/correctness/inprocess.yaml new file mode 100644 index 00000000000..9b7b7d8e50e --- /dev/null +++ b/testbed/correctness/inprocess.yaml @@ -0,0 +1 @@ +agent: in-process diff --git a/testbed/correctness/runtests.sh b/testbed/correctness/runtests.sh new file mode 100755 index 00000000000..f07064a1d56 --- /dev/null +++ b/testbed/correctness/runtests.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -e + +SED="sed" + +PASS_COLOR=$(printf "\033[32mPASS\033[0m") +FAIL_COLOR=$(printf "\033[31mFAIL\033[0m") +TEST_COLORIZE="${SED} 's/PASS/${PASS_COLOR}/' | ${SED} 's/FAIL/${FAIL_COLOR}/'" +echo ${TEST_ARGS} +mkdir -p results +TESTBED_CONFIG=inprocess.yaml go test -v ${TEST_ARGS} 2>&1 | tee results/testoutput.log | bash -c "${TEST_COLORIZE}" + +testStatus=${PIPESTATUS[0]} + +mkdir -p results/junit +go-junit-report < results/testoutput.log > results/junit/results.xml + +bash -c "cat results/CORRECTNESSRESULTS.md | ${TEST_COLORIZE}" + +exit ${testStatus} diff --git a/testbed/correctness/testdata/generated_pict_pairs_traces_pipeline.txt b/testbed/correctness/testdata/generated_pict_pairs_traces_pipeline.txt new file mode 100644 index 00000000000..af31f5aa370 --- /dev/null +++ b/testbed/correctness/testdata/generated_pict_pairs_traces_pipeline.txt @@ -0,0 +1,17 @@ +Receiver Exporter +otlp jaeger +zipkin opencensus +otlp opencensus +jaeger opencensus +opencensus jaeger +zipkin otlp +jaeger jaeger +opencensus opencensus +otlp zipkin +jaeger zipkin +opencensus zipkin +zipkin jaeger +otlp otlp +jaeger otlp +opencensus otlp +zipkin zipkin diff --git a/testbed/correctness/testdata/pict_input_traces_pipeline.txt b/testbed/correctness/testdata/pict_input_traces_pipeline.txt new file mode 100644 index 00000000000..05efad8fdea --- /dev/null +++ b/testbed/correctness/testdata/pict_input_traces_pipeline.txt @@ -0,0 +1,2 @@ +Receiver: jaeger, opencensus, otlp, zipkin +Exporter: jaeger, opencensus, otlp, zipkin diff --git a/testbed/testbed/child_process.go b/testbed/testbed/child_process.go index 0896d4794d1..c5868e8b58b 100644 --- a/testbed/testbed/child_process.go +++ b/testbed/testbed/child_process.go @@ -18,9 +18,12 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os" "os/exec" + "path" + "path/filepath" "sync" "sync/atomic" "syscall" @@ -55,12 +58,15 @@ func (rs *ResourceSpec) isSpecified() bool { return rs != nil && (rs.ExpectedMaxCPU != 0 || rs.ExpectedMaxRAM != 0) } -// childProcess is a child process that can be monitored and the output -// of which will be written to a log file. -type childProcess struct { +// ChildProcess implements the OtelcolRunner interface as a child process on the same machine executing +//the test. The process can be monitored and the output of which will be written to a log file. +type ChildProcess struct { // Descriptive name of the process name string + // Config file name + configFileName string + // Command to execute cmd *exec.Cmd @@ -107,7 +113,7 @@ type childProcess struct { ramMiBMax uint32 } -type startParams struct { +type StartParams struct { name string logFilePath string cmd string @@ -122,6 +128,35 @@ type ResourceConsumption struct { RAMMiBMax uint32 } +func (cp *ChildProcess) PrepareConfig(configStr string) (configCleanup func(), err error) { + configCleanup = func() { + // NoOp + } + var file *os.File + file, err = ioutil.TempFile("", "agent*.yaml") + if err != nil { + log.Printf("%s", err) + return configCleanup, err + } + + defer func() { + errClose := file.Close() + if errClose != nil { + log.Printf("%s", errClose) + } + }() + + if _, err = file.WriteString(configStr); err != nil { + log.Printf("%s", err) + return configCleanup, err + } + cp.configFileName = file.Name() + configCleanup = func() { + os.Remove(cp.configFileName) + } + return configCleanup, err +} + // start a child process. // // Parameters: @@ -130,7 +165,7 @@ type ResourceConsumption struct { // the process to. // cmd is the executable to run. // cmdArgs is the command line arguments to pass to the process. -func (cp *childProcess) start(params startParams) error { +func (cp *ChildProcess) Start(params StartParams) (receiverAddr string, err error) { cp.name = params.name cp.doneSignal = make(chan struct{}) @@ -139,29 +174,42 @@ func (cp *childProcess) start(params startParams) error { log.Printf("Starting %s (%s)", cp.name, params.cmd) // Prepare log file - logFile, err := os.Create(params.logFilePath) + var logFile *os.File + logFile, err = os.Create(params.logFilePath) if err != nil { - return fmt.Errorf("cannot create %s: %s", params.logFilePath, err.Error()) + return receiverAddr, fmt.Errorf("cannot create %s: %s", params.logFilePath, err.Error()) } log.Printf("Writing %s log to %s", cp.name, params.logFilePath) // Prepare to start the process. // #nosec - cp.cmd = exec.Command(params.cmd, params.cmdArgs...) + args := params.cmdArgs + if !containsConfig(args) { + if cp.configFileName == "" { + configFile := path.Join("testdata", "agent-config.yaml") + cp.configFileName, err = filepath.Abs(configFile) + if err != nil { + return receiverAddr, err + } + } + args = append(args, "--config") + args = append(args, cp.configFileName) + } + cp.cmd = exec.Command(params.cmd, args...) // Capture standard output and standard error. stdoutIn, err := cp.cmd.StdoutPipe() if err != nil { - return fmt.Errorf("cannot capture stdout of %s: %s", params.cmd, err.Error()) + return receiverAddr, fmt.Errorf("cannot capture stdout of %s: %s", params.cmd, err.Error()) } stderrIn, err := cp.cmd.StderrPipe() if err != nil { - return fmt.Errorf("cannot capture stderr of %s: %s", params.cmd, err.Error()) + return receiverAddr, fmt.Errorf("cannot capture stderr of %s: %s", params.cmd, err.Error()) } // Start the process. - if err := cp.cmd.Start(); err != nil { - return fmt.Errorf("cannot start executable at %s: %s", params.cmd, err.Error()) + if err = cp.cmd.Start(); err != nil { + return receiverAddr, fmt.Errorf("cannot start executable at %s: %s", params.cmd, err.Error()) } cp.startTime = time.Now() @@ -182,10 +230,14 @@ func (cp *childProcess) start(params startParams) error { cp.outputWG.Done() }() - return nil + receiverAddr = fmt.Sprintf("%s:%d", DefaultHost, 0) + return receiverAddr, err } -func (cp *childProcess) stop() { +func (cp *ChildProcess) Stop() (stopped bool, err error) { + if !cp.isStarted || cp.isStopped { + return false, nil + } cp.stopOnce.Do(func() { if !cp.isStarted { @@ -201,7 +253,7 @@ func (cp *childProcess) stop() { close(cp.doneSignal) // Gracefully signal process to stop. - if err := cp.cmd.Process.Signal(syscall.SIGTERM); err != nil { + if err = cp.cmd.Process.Signal(syscall.SIGTERM); err != nil { log.Printf("Cannot send SIGTEM: %s", err.Error()) } @@ -217,7 +269,7 @@ func (cp *childProcess) stop() { // Time is out. Kill the process. log.Printf("%s pid=%d is not responding to SIGTERM. Sending SIGKILL to kill forcedly.", cp.name, cp.cmd.Process.Pid) - if err := cp.cmd.Process.Signal(syscall.SIGKILL); err != nil { + if err = cp.cmd.Process.Signal(syscall.SIGKILL); err != nil { log.Printf("Cannot send SIGKILL: %s", err.Error()) } case <-finished: @@ -229,7 +281,7 @@ func (cp *childProcess) stop() { cp.outputWG.Wait() // Wait for process to terminate - err := cp.cmd.Wait() + err = cp.cmd.Wait() // Let goroutine know process is finished. close(finished) @@ -244,9 +296,11 @@ func (cp *childProcess) stop() { log.Printf("%s execution failed: %s", cp.name, err.Error()) } }) + stopped = true + return stopped, err } -func (cp *childProcess) watchResourceConsumption() error { +func (cp *ChildProcess) WatchResourceConsumption() error { if !cp.resourceSpec.isSpecified() { // Resource monitoring is not enabled. return nil @@ -280,7 +334,7 @@ func (cp *childProcess) watchResourceConsumption() error { cp.fetchCPUUsage() if err := cp.checkAllowedResourceUsage(); err != nil { - cp.stop() + cp.Stop() return err } @@ -291,7 +345,11 @@ func (cp *childProcess) watchResourceConsumption() error { } } -func (cp *childProcess) fetchRAMUsage() { +func (cp *ChildProcess) GetProcessMon() *process.Process { + return cp.processMon +} + +func (cp *ChildProcess) fetchRAMUsage() { // Get process memory and CPU times mi, err := cp.processMon.MemoryInfo() if err != nil { @@ -314,7 +372,7 @@ func (cp *childProcess) fetchRAMUsage() { atomic.StoreUint32(&cp.ramMiBCur, ramMiBCur) } -func (cp *childProcess) fetchCPUUsage() { +func (cp *ChildProcess) fetchCPUUsage() { times, err := cp.processMon.Times() if err != nil { log.Printf("cannot get process times for %d: %s", @@ -343,7 +401,7 @@ func (cp *childProcess) fetchCPUUsage() { atomic.StoreUint32(&cp.cpuPercentX1000Cur, curCPUPercentageX1000) } -func (cp *childProcess) checkAllowedResourceUsage() error { +func (cp *ChildProcess) checkAllowedResourceUsage() error { // Check if current CPU usage exceeds expected. var errMsg string if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur/1000 > cp.resourceSpec.ExpectedMaxCPU { @@ -367,7 +425,7 @@ func (cp *childProcess) checkAllowedResourceUsage() error { } // GetResourceConsumption returns resource consumption as a string -func (cp *childProcess) GetResourceConsumption() string { +func (cp *ChildProcess) GetResourceConsumption() string { if !cp.resourceSpec.isSpecified() { // Monitoring is not enabled. return "" @@ -381,7 +439,7 @@ func (cp *childProcess) GetResourceConsumption() string { } // GetTotalConsumption returns total resource consumption since start of process -func (cp *childProcess) GetTotalConsumption() *ResourceConsumption { +func (cp *ChildProcess) GetTotalConsumption() *ResourceConsumption { rc := &ResourceConsumption{} if cp.processMon != nil { @@ -403,3 +461,12 @@ func (cp *childProcess) GetTotalConsumption() *ResourceConsumption { return rc } + +func containsConfig(s []string) bool { + for _, a := range s { + if a == "--config" { + return true + } + } + return false +} diff --git a/testbed/testbed/data_providers.go b/testbed/testbed/data_providers.go new file mode 100644 index 00000000000..6d2f4bed0f2 --- /dev/null +++ b/testbed/testbed/data_providers.go @@ -0,0 +1,380 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testbed + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "log" + "math/rand" + "strconv" + "sync/atomic" + "time" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/golang/protobuf/ptypes/timestamp" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/data" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/collector/internal/goldendataset" + "go.opentelemetry.io/collector/translator/internaldata" +) + +//DataProvider defines the interface for generators of test data used to drive various end-to-end tests. +type DataProvider interface { + //SetLoadGeneratorCounters supplies pointers to LoadGenerator counters. + //The data provider implementation should increment these as it generates data. + SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) + //GenerateTraces returns an internal Traces instance with an OTLP ResourceSpans slice populated with test data. + GenerateTraces() (pdata.Traces, bool) + //GenerateTracesOld returns a slice of OpenCensus Span instances populated with test data. + GenerateTracesOld() ([]*tracepb.Span, bool) + //GenerateMetrics returns an internal MetricData instance with an OTLP ResourceMetrics slice of test data. + GenerateMetrics() (data.MetricData, bool) + //GenerateMetricsOld returns a slice of OpenCensus Metric instances populated with test data. + GenerateMetricsOld() ([]*metricspb.Metric, bool) + //GetGeneratedSpan returns the generated Span matching the provided traceId and spanId or else nil if no match found. + GetGeneratedSpan(traceID []byte, spanID []byte) *otlptrace.Span +} + +//PerfTestDataProvider in an implementation of the DataProvider for use in performance tests. +//Tracing IDs are based on the incremented batch and data items counters. +type PerfTestDataProvider struct { + options LoadOptions + batchesGenerated *uint64 + dataItemsGenerated *uint64 +} + +//NewPerfTestDataProvider creates an instance of PerfTestDataProvider which generates test data based on the sizes +//specified in the supplied LoadOptions. +func NewPerfTestDataProvider(options LoadOptions) *PerfTestDataProvider { + return &PerfTestDataProvider{ + options: options, + } +} + +func (dp *PerfTestDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) { + dp.batchesGenerated = batchesGenerated + dp.dataItemsGenerated = dataItemsGenerated +} + +func (dp *PerfTestDataProvider) GenerateTracesOld() ([]*tracepb.Span, bool) { + + var spans []*tracepb.Span + traceID := atomic.AddUint64(dp.batchesGenerated, 1) + for i := 0; i < dp.options.ItemsPerBatch; i++ { + + startTime := time.Now() + + spanID := atomic.AddUint64(dp.dataItemsGenerated, 1) + + // Create a span. + span := &tracepb.Span{ + TraceId: GenerateSequentialTraceID(traceID), + SpanId: GenerateSequentialSpanID(spanID), + Name: &tracepb.TruncatableString{Value: "load-generator-span"}, + Kind: tracepb.Span_CLIENT, + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "load_generator.span_seq_num": { + Value: &tracepb.AttributeValue_IntValue{IntValue: int64(spanID)}, + }, + "load_generator.trace_seq_num": { + Value: &tracepb.AttributeValue_IntValue{IntValue: int64(traceID)}, + }, + }, + }, + StartTime: timeToTimestamp(startTime), + EndTime: timeToTimestamp(startTime.Add(time.Duration(time.Millisecond))), + } + + // Append attributes. + for k, v := range dp.options.Attributes { + span.Attributes.AttributeMap[k] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: v}}, + } + } + + spans = append(spans, span) + } + return spans, false +} + +func (dp *PerfTestDataProvider) GenerateTraces() (pdata.Traces, bool) { + + traceData := pdata.NewTraces() + traceData.ResourceSpans().Resize(1) + ilss := traceData.ResourceSpans().At(0).InstrumentationLibrarySpans() + ilss.Resize(1) + spans := ilss.At(0).Spans() + spans.Resize(dp.options.ItemsPerBatch) + + traceID := atomic.AddUint64(dp.batchesGenerated, 1) + for i := 0; i < dp.options.ItemsPerBatch; i++ { + + startTime := time.Now() + endTime := startTime.Add(time.Duration(time.Millisecond)) + + spanID := atomic.AddUint64(dp.dataItemsGenerated, 1) + + span := spans.At(i) + + // Create a span. + span.SetTraceID(GenerateSequentialTraceID(traceID)) + span.SetSpanID(GenerateSequentialSpanID(spanID)) + span.SetName("load-generator-span") + span.SetKind(pdata.SpanKindCLIENT) + attrs := span.Attributes() + attrs.UpsertInt("load_generator.span_seq_num", int64(spanID)) + attrs.UpsertInt("load_generator.trace_seq_num", int64(traceID)) + // Additional attributes. + for k, v := range dp.options.Attributes { + attrs.UpsertString(k, v) + } + span.SetStartTime(pdata.TimestampUnixNano(uint64(startTime.UnixNano()))) + span.SetEndTime(pdata.TimestampUnixNano(uint64(endTime.UnixNano()))) + } + return traceData, false +} + +func GenerateSequentialTraceID(id uint64) []byte { + var traceID [16]byte + binary.PutUvarint(traceID[:], id) + return traceID[:] +} + +func GenerateSequentialSpanID(id uint64) []byte { + var spanID [8]byte + binary.PutUvarint(spanID[:], id) + return spanID[:] +} + +func (dp *PerfTestDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool) { + + resource := &resourcepb.Resource{ + Labels: dp.options.Attributes, + } + + // Generate 7 data points per metric. + const dataPointsPerMetric = 7 + + var metrics []*metricspb.Metric + for i := 0; i < dp.options.ItemsPerBatch; i++ { + + metric := &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "load_generator_" + strconv.Itoa(i), + Description: "Load Generator Counter #" + strconv.Itoa(i), + Unit: "", + Type: metricspb.MetricDescriptor_GAUGE_INT64, + LabelKeys: []*metricspb.LabelKey{ + {Key: "item_index"}, + {Key: "batch_index"}, + }, + }, + Resource: resource, + } + + batchIndex := atomic.AddUint64(dp.batchesGenerated, 1) + + // Generate data points for the metric. We generate timeseries each containing + // a single data points. This is the most typical payload composition since + // monitoring libraries typically generated one data point at a time. + for j := 0; j < dataPointsPerMetric; j++ { + timeseries := &metricspb.TimeSeries{} + + startTime := time.Now() + value := atomic.AddUint64(dp.dataItemsGenerated, 1) + + // Create a data point. + point := &metricspb.Point{ + Timestamp: timeToTimestamp(startTime), + Value: &metricspb.Point_Int64Value{Int64Value: int64(value)}, + } + timeseries.Points = append(timeseries.Points, point) + timeseries.LabelValues = []*metricspb.LabelValue{ + {Value: "item_" + strconv.Itoa(j)}, + {Value: "batch_" + strconv.Itoa(int(batchIndex))}, + } + + metric.Timeseries = append(metric.Timeseries, timeseries) + } + + metrics = append(metrics, metric) + } + return metrics, false +} + +func (dp *PerfTestDataProvider) GenerateMetrics() (data.MetricData, bool) { + + // Generate 7 data points per metric. + const dataPointsPerMetric = 7 + + metricData := data.NewMetricData() + metricData.ResourceMetrics().Resize(1) + metricData.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Resize(1) + if dp.options.Attributes != nil { + attrs := metricData.ResourceMetrics().At(0).Resource().Attributes() + attrs.InitEmptyWithCapacity(len(dp.options.Attributes)) + for k, v := range dp.options.Attributes { + attrs.UpsertString(k, v) + } + } + metrics := metricData.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + metrics.Resize(dp.options.ItemsPerBatch) + + for i := 0; i < dp.options.ItemsPerBatch; i++ { + metric := metrics.At(i) + metricDescriptor := metric.MetricDescriptor() + metricDescriptor.InitEmpty() + metricDescriptor.SetName("load_generator_" + strconv.Itoa(i)) + metricDescriptor.SetDescription("Load Generator Counter #" + strconv.Itoa(i)) + metricDescriptor.SetType(pdata.MetricTypeInt64) + + batchIndex := atomic.AddUint64(dp.batchesGenerated, 1) + + // Generate data points for the metric. + metric.Int64DataPoints().Resize(dataPointsPerMetric) + for j := 0; j < dataPointsPerMetric; j++ { + dataPoint := metric.Int64DataPoints().At(j) + dataPoint.SetStartTime(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) + value := atomic.AddUint64(dp.dataItemsGenerated, 1) + dataPoint.SetValue(int64(value)) + dataPoint.LabelsMap().InitFromMap(map[string]string{ + "item_index": "item_" + strconv.Itoa(j), + "batch_index": "batch_" + strconv.Itoa(int(batchIndex)), + }) + } + } + return metricData, false +} + +func (dp *PerfTestDataProvider) GetGeneratedSpan(traceID []byte, spanID []byte) *otlptrace.Span { + // function not supported for this data provider + return nil +} + +// timeToTimestamp converts a time.Time to a timestamp.Timestamp pointer. +func timeToTimestamp(t time.Time) *timestamp.Timestamp { + if t.IsZero() { + return nil + } + nanoTime := t.UnixNano() + return ×tamp.Timestamp{ + Seconds: nanoTime / 1e9, + Nanos: int32(nanoTime % 1e9), + } +} + +//GoldenDataProvider is an implementation of DataProvider for use in correctness tests. +//Provided data from the "Golden" dataset generated using pairwise combinatorial testing techniques. +type GoldenDataProvider struct { + tracePairsFile string + spanPairsFile string + random io.Reader + batchesGenerated *uint64 + dataItemsGenerated *uint64 + resourceSpans []*otlptrace.ResourceSpans + spansIndex int + spansMap map[string]*otlptrace.Span +} + +//NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based +//on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables. +//The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs. +func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, randomSeed int64) *GoldenDataProvider { + return &GoldenDataProvider{ + tracePairsFile: tracePairsFile, + spanPairsFile: spanPairsFile, + random: io.Reader(rand.New(rand.NewSource(randomSeed))), + } +} + +func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) { + dp.batchesGenerated = batchesGenerated + dp.dataItemsGenerated = dataItemsGenerated +} + +func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) { + if dp.resourceSpans == nil { + var err error + dp.resourceSpans, err = goldendataset.GenerateResourceSpans(dp.tracePairsFile, dp.spanPairsFile, dp.random) + if err != nil { + log.Printf("cannot generate traces: %s", err) + dp.resourceSpans = make([]*otlptrace.ResourceSpans, 0) + } + } + atomic.AddUint64(dp.batchesGenerated, 1) + if dp.spansIndex >= len(dp.resourceSpans) { + return pdata.TracesFromOtlp(make([]*otlptrace.ResourceSpans, 0)), true + } + resourceSpans := make([]*otlptrace.ResourceSpans, 1) + resourceSpans[0] = dp.resourceSpans[dp.spansIndex] + dp.spansIndex++ + spanCount := uint64(0) + for _, libSpans := range resourceSpans[0].InstrumentationLibrarySpans { + spanCount += uint64(len(libSpans.Spans)) + } + atomic.AddUint64(dp.dataItemsGenerated, spanCount) + return pdata.TracesFromOtlp(resourceSpans), false +} + +func (dp *GoldenDataProvider) GenerateTracesOld() ([]*tracepb.Span, bool) { + traces, done := dp.GenerateTraces() + spans := make([]*tracepb.Span, 0, traces.SpanCount()) + traceDatas := internaldata.TraceDataToOC(traces) + for _, traceData := range traceDatas { + spans = append(spans, traceData.Spans...) + } + return spans, done +} + +func (dp *GoldenDataProvider) GenerateMetrics() (data.MetricData, bool) { + return data.MetricData{}, true +} + +func (dp *GoldenDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool) { + return make([]*metricspb.Metric, 0), true +} + +func (dp *GoldenDataProvider) GetGeneratedSpan(traceID []byte, spanID []byte) *otlptrace.Span { + if dp.spansMap == nil { + dp.spansMap = populateSpansMap(dp.resourceSpans) + } + key := traceIDAndSpanIDToString(traceID, spanID) + return dp.spansMap[key] +} + +func populateSpansMap(resourceSpansList []*otlptrace.ResourceSpans) map[string]*otlptrace.Span { + spansMap := make(map[string]*otlptrace.Span) + for _, resourceSpans := range resourceSpansList { + for _, libSpans := range resourceSpans.InstrumentationLibrarySpans { + for _, span := range libSpans.Spans { + key := traceIDAndSpanIDToString(span.TraceId, span.SpanId) + spansMap[key] = span + } + } + } + return spansMap +} + +func traceIDAndSpanIDToString(traceID []byte, spanID []byte) string { + return fmt.Sprintf("%s-%s", hex.EncodeToString(traceID), hex.EncodeToString(spanID)) +} diff --git a/testbed/testbed/load_generator.go b/testbed/testbed/load_generator.go index 9bdfe76312d..f80e57b1c7e 100644 --- a/testbed/testbed/load_generator.go +++ b/testbed/testbed/load_generator.go @@ -15,28 +15,23 @@ package testbed import ( - "encoding/binary" "fmt" "log" - "strconv" "sync" "sync/atomic" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/golang/protobuf/ptypes/timestamp" "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/data" ) // LoadGenerator is a simple load generator. type LoadGenerator struct { sender DataSender + dataProvider DataProvider + // Number of batches of data items sent. batchesSent uint64 @@ -68,14 +63,15 @@ type LoadOptions struct { } // NewLoadGenerator creates a load generator that sends data using specified sender. -func NewLoadGenerator(sender DataSender) (*LoadGenerator, error) { +func NewLoadGenerator(dataProvider DataProvider, sender DataSender) (*LoadGenerator, error) { if sender == nil { return nil, fmt.Errorf("cannot create load generator without DataSender") } lg := &LoadGenerator{ - stopSignal: make(chan struct{}), - sender: sender, + stopSignal: make(chan struct{}), + sender: sender, + dataProvider: dataProvider, } return lg, nil @@ -140,6 +136,8 @@ func (lg *LoadGenerator) generate() { return } + lg.dataProvider.SetLoadGeneratorCounters(&lg.batchesSent, &lg.dataItemsSent) + err := lg.sender.Start() if err != nil { log.Printf("Cannot start sender: %v", err) @@ -173,50 +171,12 @@ func (lg *LoadGenerator) generate() { lg.sender.Flush() } -func (lg *LoadGenerator) generateTraceOld() { - - traceSender := lg.sender.(TraceDataSenderOld) - - var spans []*tracepb.Span - traceID := atomic.AddUint64(&lg.batchesSent, 1) - for i := 0; i < lg.options.ItemsPerBatch; i++ { - - startTime := time.Now() - - spanID := atomic.AddUint64(&lg.dataItemsSent, 1) - - // Create a span. - span := &tracepb.Span{ - TraceId: GenerateTraceID(traceID), - SpanId: GenerateSpanID(spanID), - Name: &tracepb.TruncatableString{Value: "load-generator-span"}, - Kind: tracepb.Span_CLIENT, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "load_generator.span_seq_num": { - Value: &tracepb.AttributeValue_IntValue{IntValue: int64(spanID)}, - }, - "load_generator.trace_seq_num": { - Value: &tracepb.AttributeValue_IntValue{IntValue: int64(traceID)}, - }, - }, - }, - StartTime: timeToTimestamp(startTime), - EndTime: timeToTimestamp(startTime.Add(time.Duration(time.Millisecond))), - } - - // Append attributes. - for k, v := range lg.options.Attributes { - span.Attributes.AttributeMap[k] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: v}}, - } - } - - spans = append(spans, span) - } +func (lg *LoadGenerator) generateTrace() { + traceSender := lg.sender.(TraceDataSender) - traceData := consumerdata.TraceData{ - Spans: spans, + traceData, done := lg.dataProvider.GenerateTraces() + if done { + return } err := traceSender.SendSpans(traceData) @@ -228,40 +188,15 @@ func (lg *LoadGenerator) generateTraceOld() { } } -func (lg *LoadGenerator) generateTrace() { - traceSender := lg.sender.(TraceDataSender) +func (lg *LoadGenerator) generateTraceOld() { + traceSender := lg.sender.(TraceDataSenderOld) - traceData := pdata.NewTraces() - traceData.ResourceSpans().Resize(1) - ilss := traceData.ResourceSpans().At(0).InstrumentationLibrarySpans() - ilss.Resize(1) - spans := ilss.At(0).Spans() - spans.Resize(lg.options.ItemsPerBatch) - - traceID := atomic.AddUint64(&lg.batchesSent, 1) - for i := 0; i < lg.options.ItemsPerBatch; i++ { - - startTime := time.Now() - endTime := startTime.Add(time.Duration(time.Millisecond)) - - spanID := atomic.AddUint64(&lg.dataItemsSent, 1) - - span := spans.At(i) - - // Create a span. - span.SetTraceID(GenerateTraceID(traceID)) - span.SetSpanID(GenerateSpanID(spanID)) - span.SetName("load-generator-span") - span.SetKind(pdata.SpanKindCLIENT) - attrs := span.Attributes() - attrs.UpsertInt("load_generator.span_seq_num", int64(spanID)) - attrs.UpsertInt("load_generator.trace_seq_num", int64(traceID)) - // Additional attributes. - for k, v := range lg.options.Attributes { - attrs.UpsertString(k, v) - } - span.SetStartTime(pdata.TimestampUnixNano(uint64(startTime.UnixNano()))) - span.SetEndTime(pdata.TimestampUnixNano(uint64(endTime.UnixNano()))) + spans, done := lg.dataProvider.GenerateTracesOld() + if done { + return + } + traceData := consumerdata.TraceData{ + Spans: spans, } err := traceSender.SendSpans(traceData) @@ -272,77 +207,13 @@ func (lg *LoadGenerator) generateTrace() { log.Printf("Cannot send traces: %v", err) } } -func GenerateTraceID(id uint64) []byte { - var traceID [16]byte - binary.PutUvarint(traceID[:], id) - return traceID[:] -} - -func GenerateSpanID(id uint64) []byte { - var spanID [8]byte - binary.PutUvarint(spanID[:], id) - return spanID[:] -} - -func (lg *LoadGenerator) generateMetricsOld() { - - metricSender := lg.sender.(MetricDataSenderOld) - - resource := &resourcepb.Resource{ - Labels: lg.options.Attributes, - } - // Generate 7 data points per metric. - const dataPointsPerMetric = 7 - - var metrics []*metricspb.Metric - for i := 0; i < lg.options.ItemsPerBatch; i++ { - - metric := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "load_generator_" + strconv.Itoa(i), - Description: "Load Generator Counter #" + strconv.Itoa(i), - Unit: "", - Type: metricspb.MetricDescriptor_GAUGE_INT64, - LabelKeys: []*metricspb.LabelKey{ - {Key: "item_index"}, - {Key: "batch_index"}, - }, - }, - Resource: resource, - } - - batchIndex := atomic.AddUint64(&lg.batchesSent, 1) - - // Generate data points for the metric. We generate timeseries each containing - // a single data points. This is the most typical payload composition since - // monitoring libraries typically generated one data point at a time. - for j := 0; j < dataPointsPerMetric; j++ { - timeseries := &metricspb.TimeSeries{} - - startTime := time.Now() - value := atomic.AddUint64(&lg.dataItemsSent, 1) - - // Create a data point. - point := &metricspb.Point{ - Timestamp: timeToTimestamp(startTime), - Value: &metricspb.Point_Int64Value{Int64Value: int64(value)}, - } - timeseries.Points = append(timeseries.Points, point) - timeseries.LabelValues = []*metricspb.LabelValue{ - {Value: "item_" + strconv.Itoa(j)}, - {Value: "batch_" + strconv.Itoa(int(batchIndex))}, - } - - metric.Timeseries = append(metric.Timeseries, timeseries) - } - - metrics = append(metrics, metric) - } +func (lg *LoadGenerator) generateMetrics() { + metricSender := lg.sender.(MetricDataSender) - metricData := consumerdata.MetricsData{ - Resource: resource, - Metrics: metrics, + metricData, done := lg.dataProvider.GenerateMetrics() + if done { + return } err := metricSender.SendMetrics(metricData) @@ -354,52 +225,22 @@ func (lg *LoadGenerator) generateMetricsOld() { } } -func (lg *LoadGenerator) generateMetrics() { - - metricSender := lg.sender.(MetricDataSender) +func (lg *LoadGenerator) generateMetricsOld() { + metricSender := lg.sender.(MetricDataSenderOld) - // Generate 7 data points per metric. - const dataPointsPerMetric = 7 - - metricData := data.NewMetricData() - metricData.ResourceMetrics().Resize(1) - metricData.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Resize(1) - if lg.options.Attributes != nil { - attrs := metricData.ResourceMetrics().At(0).Resource().Attributes() - attrs.InitEmptyWithCapacity(len(lg.options.Attributes)) - for k, v := range lg.options.Attributes { - attrs.UpsertString(k, v) - } + resource := &resourcepb.Resource{ + Labels: lg.options.Attributes, } - metrics := metricData.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() - metrics.Resize(lg.options.ItemsPerBatch) - - for i := 0; i < lg.options.ItemsPerBatch; i++ { - metric := metrics.At(i) - metricDescriptor := metric.MetricDescriptor() - metricDescriptor.InitEmpty() - metricDescriptor.SetName("load_generator_" + strconv.Itoa(i)) - metricDescriptor.SetDescription("Load Generator Counter #" + strconv.Itoa(i)) - metricDescriptor.SetType(pdata.MetricTypeInt64) - - batchIndex := atomic.AddUint64(&lg.batchesSent, 1) - - // Generate data points for the metric. - metric.Int64DataPoints().Resize(dataPointsPerMetric) - for j := 0; j < dataPointsPerMetric; j++ { - dataPoint := metric.Int64DataPoints().At(j) - dataPoint.SetStartTime(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) - value := atomic.AddUint64(&lg.dataItemsSent, 1) - dataPoint.SetValue(int64(value)) - dataPoint.LabelsMap().InitFromMap(map[string]string{ - "item_index": "item_" + strconv.Itoa(j), - "batch_index": "batch_" + strconv.Itoa(int(batchIndex)), - }) - } + metrics, done := lg.dataProvider.GenerateMetricsOld() + if done { + return + } + metricData := consumerdata.MetricsData{ + Resource: resource, + Metrics: metrics, } err := metricSender.SendMetrics(metricData) - if err == nil { lg.prevErr = nil } else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() { @@ -407,15 +248,3 @@ func (lg *LoadGenerator) generateMetrics() { log.Printf("Cannot send metrics: %v", err) } } - -// timeToTimestamp converts a time.Time to a timestamp.Timestamp pointer. -func timeToTimestamp(t time.Time) *timestamp.Timestamp { - if t.IsZero() { - return nil - } - nanoTime := t.UnixNano() - return ×tamp.Timestamp{ - Seconds: nanoTime / 1e9, - Nanos: int32(nanoTime % 1e9), - } -} diff --git a/testbed/testbed/mock_backend_test.go b/testbed/testbed/mock_backend_test.go index a21538fb9a6..2239cb2341f 100644 --- a/testbed/testbed/mock_backend_test.go +++ b/testbed/testbed/mock_backend_test.go @@ -32,12 +32,12 @@ func TestGeneratorAndBackend(t *testing.T) { { "Jaeger-JaegerGRPC", NewJaegerDataReceiver(port), - NewJaegerGRPCDataSender(port), + NewJaegerGRPCDataSender(DefaultHost, port), }, { "Zipkin-Zipkin", NewZipkinDataReceiver(port), - NewZipkinDataSender(port), + NewZipkinDataSender(DefaultHost, port), }, } @@ -52,7 +52,9 @@ func TestGeneratorAndBackend(t *testing.T) { defer mb.Stop() - lg, err := NewLoadGenerator(test.sender) + options := LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := NewPerfTestDataProvider(options) + lg, err := NewLoadGenerator(dataProvider, test.sender) require.NoError(t, err, "Cannot start load generator") assert.EqualValues(t, 0, lg.dataItemsSent) diff --git a/testbed/testbed/otelcol_runner.go b/testbed/testbed/otelcol_runner.go new file mode 100644 index 00000000000..db16c0cff26 --- /dev/null +++ b/testbed/testbed/otelcol_runner.go @@ -0,0 +1,178 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testbed + +import ( + "fmt" + "strings" + + "github.com/shirou/gopsutil/process" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/version" + "go.opentelemetry.io/collector/service" +) + +// OtelcolRunner defines the interface for configuring, starting and stopping one or more instances of +// otelcol which will be the subject of testing being executed. +type OtelcolRunner interface { + // PrepareConfig stores the provided YAML-based otelcol configuration file in the format needed by the otelcol + // instance(s) this runner manages. If successful, it returns the cleanup config function to be executed after + // the test is executed. + PrepareConfig(configStr string) (configCleanup func(), err error) + // Starts the otelcol instance(s) if not already running which is the subject of the test to be run. + // It returns the host:port of the data receiver to post test data to. + Start(args StartParams) (receiverAddr string, err error) + // Stops the otelcol instance(s) which are the subject of the test just run if applicable. Returns whether + // the instance was actually stopped or not. + Stop() (stopped bool, err error) + // WatchResourceConsumption toggles on the monitoring of resource consumpution by the otelcol instance under test. + WatchResourceConsumption() error + // GetProcessMon returns the Process being used to monitor resource consumption. + GetProcessMon() *process.Process + // GetTotalConsumption returns the data collected by the process monitor. + GetTotalConsumption() *ResourceConsumption + // GetResourceConsumption returns the data collected by the process monitor as a display string. + GetResourceConsumption() string +} + +// InProcessCollector implements the OtelcolRunner interfaces running a single otelcol as a go routine within the +// same process as the test executor. +type InProcessCollector struct { + logger *zap.Logger + factories config.Factories + receiverPort int + config *configmodels.Config + svc *service.Application + appDone chan struct{} + stopped bool +} + +// NewInProcessCollector crewtes a new InProcessCollector using the supplied component factories. +func NewInProcessCollector(factories config.Factories, receiverPort int) *InProcessCollector { + return &InProcessCollector{ + factories: factories, + receiverPort: receiverPort, + } +} + +func (ipp *InProcessCollector) PrepareConfig(configStr string) (configCleanup func(), err error) { + configCleanup = func() { + // NoOp + } + var logger *zap.Logger + logger, err = configureLogger() + if err != nil { + return configCleanup, err + } + ipp.logger = logger + v := viper.NewWithOptions(viper.KeyDelimiter("::")) + v.SetConfigType("yaml") + v.ReadConfig(strings.NewReader(configStr)) + cfg, err := config.Load(v, ipp.factories) + if err != nil { + return configCleanup, err + } + err = config.ValidateConfig(cfg, zap.NewNop()) + if err != nil { + return configCleanup, err + } + ipp.config = cfg + return configCleanup, err +} + +func (ipp *InProcessCollector) Start(args StartParams) (receiverAddr string, err error) { + params := service.Parameters{ + ApplicationStartInfo: service.ApplicationStartInfo{ + ExeName: "otelcol", + LongName: "InProcess Collector", + Version: version.Version, + GitHash: version.GitHash, + }, + ConfigFactory: func(v *viper.Viper, factories config.Factories) (*configmodels.Config, error) { + return ipp.config, nil + }, + Factories: ipp.factories, + } + ipp.svc, err = service.New(params) + if err != nil { + return receiverAddr, err + } + ipp.svc.Command().SetArgs(args.cmdArgs) + + ipp.appDone = make(chan struct{}) + go func() { + defer close(ipp.appDone) + appErr := ipp.svc.Start() + if appErr != nil { + err = appErr + } + }() + + for state := range ipp.svc.GetStateChannel() { + switch state { + case service.Starting: + // NoOp + case service.Running: + receiverAddr = fmt.Sprintf("%s:%d", DefaultHost, ipp.receiverPort) + return receiverAddr, err + default: + err = fmt.Errorf("unable to start, otelcol state is %d", state) + } + } + return receiverAddr, err +} + +func (ipp *InProcessCollector) Stop() (stopped bool, err error) { + if !ipp.stopped { + ipp.stopped = true + ipp.svc.SignalTestComplete() + } + <-ipp.appDone + stopped = ipp.stopped + return stopped, err +} + +func (ipp *InProcessCollector) WatchResourceConsumption() error { + return nil +} + +func (ipp *InProcessCollector) GetProcessMon() *process.Process { + return nil +} + +func (ipp *InProcessCollector) GetTotalConsumption() *ResourceConsumption { + return &ResourceConsumption{ + CPUPercentAvg: 0, + CPUPercentMax: 0, + RAMMiBAvg: 0, + RAMMiBMax: 0, + } +} + +func (ipp *InProcessCollector) GetResourceConsumption() string { + return "" +} + +func configureLogger() (*zap.Logger, error) { + conf := zap.NewDevelopmentConfig() + conf.Level.SetLevel(zapcore.InfoLevel) + logger, err := conf.Build() + return logger, err +} diff --git a/testbed/testbed/otelcol_runner_test.go b/testbed/testbed/otelcol_runner_test.go new file mode 100644 index 00000000000..46ae8510ddd --- /dev/null +++ b/testbed/testbed/otelcol_runner_test.go @@ -0,0 +1,69 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testbed + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/service/defaultcomponents" +) + +func TestNewInProcessPipeline(t *testing.T) { + factories, err := defaultcomponents.Components() + assert.NoError(t, err) + sender := NewOTLPTraceDataSender(DefaultHost, GetAvailablePort(t)) + receiver := NewOTLPDataReceiver(DefaultOTLPPort) + runner := NewInProcessCollector(factories, sender.Port) + + format := ` +receivers:%v +exporters:%v +processors: + batch: + queued_retry: + +extensions: + +service: + extensions: + pipelines: + traces: + receivers: [%v] + processors: [batch, queued_retry] + exporters: [%v] +` + config := fmt.Sprintf( + format, + sender.GenConfigYAMLStr(), + receiver.GenConfigYAMLStr(), + sender.ProtocolName(), + receiver.ProtocolName(), + ) + configCleanup, cfgErr := runner.PrepareConfig(config) + defer configCleanup() + assert.NoError(t, cfgErr) + assert.NotNil(t, configCleanup) + assert.NotNil(t, runner.config) + args := StartParams{} + defer runner.Stop() + endpoint, startErr := runner.Start(args) + assert.NoError(t, startErr) + receiverAddr := fmt.Sprintf("%s:%d", DefaultHost, sender.Port) + assert.Equal(t, receiverAddr, endpoint) + assert.NotNil(t, runner.svc) +} diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index a4484d4fce5..28cab168dbd 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -52,6 +52,8 @@ type DataReceiverBase struct { Port int } +const DefaultHost = "localhost" + func (mb *DataReceiverBase) ReportFatalError(err error) { log.Printf("Fatal error reported: %v", err) } diff --git a/testbed/testbed/results.go b/testbed/testbed/results.go index 4bf2ff07d71..b0c58249037 100644 --- a/testbed/testbed/results.go +++ b/testbed/testbed/results.go @@ -23,16 +23,27 @@ import ( "time" ) -type Results struct { +// TestResultsSummary defines the interface to record results of one category of testing. +type TestResultsSummary interface { + // Create and open the file and write headers. + Init(resultsDir string) + // Add results for one test. + Add(testName string, result interface{}) + // Save the total results and close the file. + Save() +} + +// PerformanceResults implements the TestResultsSummary interface with fields suitable for reporting +// performance test results. +type PerformanceResults struct { resultsDir string resultsFile *os.File - perTestResults []*TestResult + perTestResults []*PerformanceTestResult totalDuration time.Duration } -var results = Results{} - -type TestResult struct { +// PerformanceTestResult reports the results of a single performance test. +type PerformanceTestResult struct { testName string result string duration time.Duration @@ -45,16 +56,12 @@ type TestResult struct { errorCause string } -func (r *Results) Init(resultsDir string) { +func (r *PerformanceResults) Init(resultsDir string) { r.resultsDir = resultsDir - r.perTestResults = []*TestResult{} + r.perTestResults = []*PerformanceTestResult{} - // Create results directory if it does not exist - if _, err := os.Stat(r.resultsDir); os.IsNotExist(err) { - os.Mkdir(r.resultsDir, os.FileMode(0755)) - } - - // Create results file + // Create resultsSummary file + os.MkdirAll(resultsDir, os.FileMode(0755)) var err error r.resultsFile, err = os.Create(path.Join(r.resultsDir, "TESTRESULTS.md")) if err != nil { @@ -63,34 +70,157 @@ func (r *Results) Init(resultsDir string) { // Write the header _, _ = io.WriteString(r.resultsFile, - "# Test Results\n"+ + "# Test PerformanceResults\n"+ fmt.Sprintf("Started: %s\n\n", time.Now().Format(time.RFC1123Z))+ "Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|\n"+ "----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|\n") } // Save the total results and close the file. -func (r *Results) Save() { +func (r *PerformanceResults) Save() { _, _ = io.WriteString(r.resultsFile, fmt.Sprintf("\nTotal duration: %.0fs\n", r.totalDuration.Seconds())) r.resultsFile.Close() } // Add results for one test. -func (r *Results) Add(testName string, result *TestResult) { +func (r *PerformanceResults) Add(testName string, result interface{}) { + testResult, ok := result.(*PerformanceTestResult) + if !ok { + return + } _, _ = io.WriteString(r.resultsFile, fmt.Sprintf("%-40s|%-6s|%7.0fs|%8.1f|%8.1f|%11d|%11d|%10d|%14d|%s\n", - result.testName, - result.result, - result.duration.Seconds(), - result.cpuPercentageAvg, - result.cpuPercentageMax, - result.ramMibAvg, - result.ramMibMax, - result.sentSpanCount, - result.receivedSpanCount, - result.errorCause, + testResult.testName, + testResult.result, + testResult.duration.Seconds(), + testResult.cpuPercentageAvg, + testResult.cpuPercentageMax, + testResult.ramMibAvg, + testResult.ramMibMax, + testResult.sentSpanCount, + testResult.receivedSpanCount, + testResult.errorCause, + ), + ) + r.totalDuration = r.totalDuration + testResult.duration +} + +// CorrectnessResults implements the TestResultsSummary interface with fields suitable for reporting data translation +// correctness test results. +type CorrectnessResults struct { + resultsDir string + resultsFile *os.File + perTestResults []*CorrectnessTestResult + totalAssertionFailures uint64 + totalDuration time.Duration +} + +// CorrectnessTestResult reports the results of a single correctness test. +type CorrectnessTestResult struct { + testName string + result string + duration time.Duration + sentSpanCount uint64 + receivedSpanCount uint64 + assertionFailureCount uint64 + assertionFailures []*AssertionFailure +} + +type AssertionFailure struct { + typeName string + dataComboName string + fieldPath string + expectedValue interface{} + actualValue interface{} + sumCount int +} + +func (af AssertionFailure) String() string { + return fmt.Sprintf("%s/%s e=%#v a=%#v ", af.dataComboName, af.fieldPath, af.expectedValue, af.actualValue) +} + +func (r *CorrectnessResults) Init(resultsDir string) { + r.resultsDir = resultsDir + r.perTestResults = []*CorrectnessTestResult{} + + // Create resultsSummary file + os.MkdirAll(resultsDir, os.FileMode(0755)) + var err error + r.resultsFile, err = os.Create(path.Join(r.resultsDir, "CORRECTNESSRESULTS.md")) + if err != nil { + log.Fatalf(err.Error()) + } + + // Write the header + _, _ = io.WriteString(r.resultsFile, + "# Test Results\n"+ + fmt.Sprintf("Started: %s\n\n", time.Now().Format(time.RFC1123Z))+ + "Test |Result|Duration|Sent Items|Received Items|Failure Count|Failures\n"+ + "----------------------------------------|------|-------:|---------:|-------------:|------------:|--------\n") +} + +func (r *CorrectnessResults) Add(testName string, result interface{}) { + testResult, ok := result.(*CorrectnessTestResult) + if !ok { + return + } + consolidated := consolidateAssertionFailures(testResult.assertionFailures) + failuresStr := "" + for _, af := range consolidated { + failuresStr = fmt.Sprintf("%s%s,%#v!=%#v,count=%d; ", failuresStr, af.fieldPath, af.expectedValue, + af.actualValue, af.sumCount) + } + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("%-40s|%-6s|%7.0fs|%10d|%14d|%13d|%s\n", + testResult.testName, + testResult.result, + testResult.duration.Seconds(), + testResult.sentSpanCount, + testResult.receivedSpanCount, + testResult.assertionFailureCount, + failuresStr, ), ) - r.totalDuration = r.totalDuration + result.duration + r.perTestResults = append(r.perTestResults, testResult) + r.totalAssertionFailures = r.totalAssertionFailures + testResult.assertionFailureCount + r.totalDuration = r.totalDuration + testResult.duration +} + +func (r *CorrectnessResults) Save() { + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("\nTotal assertion failures: %d\n", r.totalAssertionFailures)) + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("\nTotal duration: %.0fs\n", r.totalDuration.Seconds())) + r.resultsFile.Close() +} + +func exists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return true, err +} + +func consolidateAssertionFailures(failures []*AssertionFailure) map[string]*AssertionFailure { + afMap := make(map[string]*AssertionFailure) + for _, f := range failures { + summary := afMap[f.fieldPath] + if summary == nil { + summary = &AssertionFailure{ + typeName: f.typeName, + dataComboName: f.dataComboName + "...", + fieldPath: f.fieldPath, + expectedValue: f.expectedValue, + actualValue: f.actualValue, + } + afMap[f.fieldPath] = summary + } + summary.sumCount++ + } + return afMap } diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index a6ea5e36080..5a092885932 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -73,13 +73,17 @@ type MetricDataSenderOld interface { // DataSenderOverTraceExporter partially implements TraceDataSender via a TraceExporter. type DataSenderOverTraceExporterOld struct { exporter component.TraceExporterOld + Host string Port int } // NewDataSenderOverExporter creates a new sender that will send // to the specified port after Start is called. -func NewDataSenderOverExporterOld(port int) *DataSenderOverTraceExporterOld { - return &DataSenderOverTraceExporterOld{Port: port} +func NewDataSenderOverExporterOld(host string, port int) *DataSenderOverTraceExporterOld { + return &DataSenderOverTraceExporterOld{ + Host: host, + Port: port, + } } func (ds *DataSenderOverTraceExporterOld) SendSpans(traces consumerdata.TraceData) error { @@ -111,13 +115,17 @@ type MetricDataSender interface { // DataSenderOverTraceExporter partially implements TraceDataSender via a TraceExporter. type DataSenderOverTraceExporter struct { exporter component.TraceExporter + Host string Port int } // NewDataSenderOverExporter creates a new sender that will send // to the specified port after Start is called. -func NewDataSenderOverExporter(port int) *DataSenderOverTraceExporter { - return &DataSenderOverTraceExporter{Port: port} +func NewDataSenderOverExporter(host string, port int) *DataSenderOverTraceExporter { + return &DataSenderOverTraceExporter{ + Host: host, + Port: port, + } } func (ds *DataSenderOverTraceExporter) SendSpans(traces pdata.Traces) error { @@ -142,15 +150,18 @@ var _ TraceDataSender = (*JaegerGRPCDataSender)(nil) // NewJaegerGRPCDataSender creates a new Jaeger protocol sender that will send // to the specified port after Start is called. -func NewJaegerGRPCDataSender(port int) *JaegerGRPCDataSender { - return &JaegerGRPCDataSender{DataSenderOverTraceExporter{Port: port}} +func NewJaegerGRPCDataSender(host string, port int) *JaegerGRPCDataSender { + return &JaegerGRPCDataSender{DataSenderOverTraceExporter{ + Host: host, + Port: port, + }} } func (je *JaegerGRPCDataSender) Start() error { cfg := &jaegerexporter.Config{ // Use standard endpoint for Jaeger. GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: fmt.Sprintf("localhost:%d", je.Port), + Endpoint: fmt.Sprintf("%s:%d", je.Host, je.Port), TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, @@ -184,7 +195,7 @@ func (je *JaegerGRPCDataSender) GenConfigYAMLStr() string { jaeger: protocols: grpc: - endpoint: "localhost:%d" + endpoint: "%s:%d" thrift_tchannel: endpoint: "localhost:8372" thrift_compact: @@ -192,7 +203,7 @@ func (je *JaegerGRPCDataSender) GenConfigYAMLStr() string { thrift_binary: endpoint: "localhost:8374" thrift_http: - endpoint: "localhost:8375"`, je.Port) + endpoint: "localhost:8375"`, je.Host, je.Port) } func (je *JaegerGRPCDataSender) ProtocolName() string { @@ -209,14 +220,17 @@ var _ TraceDataSenderOld = (*OCTraceDataSender)(nil) // NewOCTraceDataSender creates a new OCTraceDataSender that will send // to the specified port after Start is called. -func NewOCTraceDataSender(port int) *OCTraceDataSender { - return &OCTraceDataSender{DataSenderOverTraceExporterOld{Port: port}} +func NewOCTraceDataSender(host string, port int) *OCTraceDataSender { + return &OCTraceDataSender{DataSenderOverTraceExporterOld{ + Host: host, + Port: port, + }} } func (ote *OCTraceDataSender) Start() error { cfg := &opencensusexporter.Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: fmt.Sprintf("localhost:%d", ote.Port), + Endpoint: fmt.Sprintf("%s:%d", ote.Host, ote.Port), TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, @@ -238,7 +252,7 @@ func (ote *OCTraceDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` opencensus: - endpoint: "localhost:%d"`, ote.Port) + endpoint: "%s:%d"`, ote.Host, ote.Port) } func (ote *OCTraceDataSender) ProtocolName() string { @@ -248,6 +262,7 @@ func (ote *OCTraceDataSender) ProtocolName() string { // OCMetricsDataSender implements MetricDataSender for OpenCensus metrics protocol. type OCMetricsDataSender struct { exporter component.MetricsExporterOld + host string port int } @@ -256,14 +271,17 @@ var _ MetricDataSenderOld = (*OCMetricsDataSender)(nil) // NewOCMetricDataSender creates a new OpenCensus metric protocol sender that will send // to the specified port after Start is called. -func NewOCMetricDataSender(port int) *OCMetricsDataSender { - return &OCMetricsDataSender{port: port} +func NewOCMetricDataSender(host string, port int) *OCMetricsDataSender { + return &OCMetricsDataSender{ + host: host, + port: port, + } } func (ome *OCMetricsDataSender) Start() error { cfg := &opencensusexporter.Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: fmt.Sprintf("localhost:%d", ome.port), + Endpoint: fmt.Sprintf("%s:%d", ome.host, ome.port), TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, @@ -292,7 +310,7 @@ func (ome *OCMetricsDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` opencensus: - endpoint: "localhost:%d"`, ome.port) + endpoint: "%s:%d"`, ome.host, ome.port) } func (ome *OCMetricsDataSender) GetCollectorPort() int { @@ -313,14 +331,17 @@ var _ TraceDataSender = (*OTLPTraceDataSender)(nil) // NewOTLPTraceDataSender creates a new OTLPTraceDataSender that will send // to the specified port after Start is called. -func NewOTLPTraceDataSender(port int) *OTLPTraceDataSender { - return &OTLPTraceDataSender{DataSenderOverTraceExporter{Port: port}} +func NewOTLPTraceDataSender(host string, port int) *OTLPTraceDataSender { + return &OTLPTraceDataSender{DataSenderOverTraceExporter{ + Host: host, + Port: port, + }} } func (ote *OTLPTraceDataSender) Start() error { cfg := &otlpexporter.Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: fmt.Sprintf("localhost:%d", ote.Port), + Endpoint: fmt.Sprintf("%s:%d", ote.Host, ote.Port), TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, @@ -343,7 +364,7 @@ func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` otlp: - endpoint: "localhost:%d"`, ote.Port) + endpoint: "%s:%d"`, ote.Host, ote.Port) } func (ote *OTLPTraceDataSender) ProtocolName() string { @@ -353,6 +374,7 @@ func (ote *OTLPTraceDataSender) ProtocolName() string { // OTLPMetricsDataSender implements MetricDataSender for OpenCensus metrics protocol. type OTLPMetricsDataSender struct { exporter component.MetricsExporter + host string port int } @@ -361,14 +383,17 @@ var _ MetricDataSender = (*OTLPMetricsDataSender)(nil) // NewOTLPMetricDataSender creates a new OpenCensus metric protocol sender that will send // to the specified port after Start is called. -func NewOTLPMetricDataSender(port int) *OTLPMetricsDataSender { - return &OTLPMetricsDataSender{port: port} +func NewOTLPMetricDataSender(host string, port int) *OTLPMetricsDataSender { + return &OTLPMetricsDataSender{ + host: host, + port: port, + } } func (ome *OTLPMetricsDataSender) Start() error { cfg := &otlpexporter.Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: fmt.Sprintf("localhost:%d", ome.port), + Endpoint: fmt.Sprintf("%s:%d", ome.host, ome.port), TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, @@ -398,7 +423,7 @@ func (ome *OTLPMetricsDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` otlp: - endpoint: "localhost:%d"`, ome.port) + endpoint: "%s:%d"`, ome.host, ome.port) } func (ome *OTLPMetricsDataSender) GetCollectorPort() int { @@ -419,8 +444,11 @@ var _ TraceDataSenderOld = (*ZipkinDataSender)(nil) // NewZipkinDataSender creates a new Zipkin protocol sender that will send // to the specified port after Start is called. -func NewZipkinDataSender(port int) *ZipkinDataSender { - return &ZipkinDataSender{DataSenderOverTraceExporterOld{Port: port}} +func NewZipkinDataSender(host string, port int) *ZipkinDataSender { + return &ZipkinDataSender{DataSenderOverTraceExporterOld{ + Host: host, + Port: port, + }} } func (zs *ZipkinDataSender) Start() error { @@ -445,7 +473,7 @@ func (zs *ZipkinDataSender) Start() error { func (zs *ZipkinDataSender) GenConfigYAMLStr() string { return fmt.Sprintf(` zipkin: - endpoint: localhost:%d`, zs.Port) + endpoint: %s:%d`, zs.Host, zs.Port) } func (zs *ZipkinDataSender) ProtocolName() string { diff --git a/testbed/testbed/test_bed.go b/testbed/testbed/test_bed.go index 12040f51309..3141933f3f8 100644 --- a/testbed/testbed/test_bed.go +++ b/testbed/testbed/test_bed.go @@ -117,7 +117,7 @@ func LoadConfig() error { return nil } -func Start() error { +func Start(resultsSummary TestResultsSummary) error { // Load the test bed config first. err := LoadConfig() @@ -134,20 +134,20 @@ func Start() error { if err != nil { log.Fatalf(err.Error()) } - results.Init(dir) + resultsSummary.Init(dir) return err } -func SaveResults() { - results.Save() +func SaveResults(resultsSummary TestResultsSummary) { + resultsSummary.Save() } // DoTestMain is intended to be run from TestMain somewhere in the test suit. // This enables the testbed. -func DoTestMain(m *testing.M) { +func DoTestMain(m *testing.M, resultsSummary TestResultsSummary) { // Load the test bed config first. - err := Start() + err := Start(resultsSummary) if err == ErrSkipTests { // Test bed config is not loaded because the tests are globally skipped. @@ -156,7 +156,7 @@ func DoTestMain(m *testing.M) { res := m.Run() - SaveResults() + SaveResults(resultsSummary) // Now run all tests. os.Exit(res) diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index ad92b39bc2a..d8ca601159c 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -48,13 +47,14 @@ type TestCase struct { resourceSpec ResourceSpec // Agent process. - agentProc childProcess + agentProc OtelcolRunner Sender DataSender Receiver DataReceiver LoadGenerator *LoadGenerator MockBackend *MockBackend + validator TestCaseValidator startTime time.Time @@ -70,6 +70,8 @@ type TestCase struct { doneSignal chan struct{} errorCause string + + resultsSummary TestResultsSummary } const mibibyte = 1024 * 1024 @@ -78,8 +80,12 @@ const testcaseDurationVar = "TESTCASE_DURATION" // NewTestCase creates a new TestCase. It expects agent-config.yaml in the specified directory. func NewTestCase( t *testing.T, + dataProvider DataProvider, sender DataSender, receiver DataReceiver, + agentProc OtelcolRunner, + validator TestCaseValidator, + resultsSummary TestResultsSummary, opts ...TestCaseOption, ) *TestCase { tc := TestCase{} @@ -90,6 +96,9 @@ func NewTestCase( tc.startTime = time.Now() tc.Sender = sender tc.Receiver = receiver + tc.agentProc = agentProc + tc.validator = validator + tc.resultsSummary = resultsSummary // Get requested test case duration from env variable. duration := os.Getenv(testcaseDurationVar) @@ -119,17 +128,7 @@ func NewTestCase( tc.resourceSpec.ResourceCheckPeriod = tc.Duration } - configFile := tc.agentConfigFile - if configFile == "" { - // Use the default config file. - configFile = path.Join("testdata", "agent-config.yaml") - } - - // Ensure that the config file is an absolute path. - tc.agentConfigFile, err = filepath.Abs(configFile) - require.NoError(t, err, "Cannot resolve filename") - - tc.LoadGenerator, err = NewLoadGenerator(sender) + tc.LoadGenerator, err = NewLoadGenerator(dataProvider, sender) require.NoError(t, err, "Cannot create generator") tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) @@ -165,11 +164,13 @@ func (tc *TestCase) SetResourceLimits(resourceSpec ResourceSpec) { // StartAgent starts the agent and redirects its standard output and standard error // to "agent.log" file located in the test directory. func (tc *TestCase) StartAgent(args ...string) { - args = append(args, "--config") - args = append(args, tc.agentConfigFile) + if tc.agentConfigFile != "" { + args = append(args, "--config") + args = append(args, tc.agentConfigFile) + } logFileName := tc.composeTestResultFileName("agent.log") - err := tc.agentProc.start(startParams{ + _, err := tc.agentProc.Start(StartParams{ name: "Agent", logFilePath: logFileName, cmd: testBedConfig.Agent, @@ -184,7 +185,7 @@ func (tc *TestCase) StartAgent(args ...string) { // Start watching resource consumption. go func() { - err := tc.agentProc.watchResourceConsumption() + err := tc.agentProc.WatchResourceConsumption() if err != nil { tc.indicateError(err) } @@ -201,7 +202,7 @@ func (tc *TestCase) StartAgent(args ...string) { // StopAgent stops agent process. func (tc *TestCase) StopAgent() { - tc.agentProc.stop() + tc.agentProc.Stop() } // StartLoad starts the load generator and redirects its standard output and standard error @@ -233,7 +234,7 @@ func (tc *TestCase) EnableRecording() { // AgentMemoryInfo returns raw memory info struct about the agent // as returned by github.com/shirou/gopsutil/process func (tc *TestCase) AgentMemoryInfo() (uint32, uint32, error) { - stat, err := tc.agentProc.processMon.MemoryInfo() + stat, err := tc.agentProc.GetProcessMon().MemoryInfo() if err != nil { return 0, 0, err } @@ -255,35 +256,11 @@ func (tc *TestCase) Stop() { } // Report test results - - rc := tc.agentProc.GetTotalConsumption() - - var result string - if tc.t.Failed() { - result = "FAIL" - } else { - result = "PASS" - } - - // Remove "Test" prefix from test name. - testName := tc.t.Name()[4:] - - results.Add(tc.t.Name(), &TestResult{ - testName: testName, - result: result, - receivedSpanCount: tc.MockBackend.DataItemsReceived(), - sentSpanCount: tc.LoadGenerator.DataItemsSent(), - duration: time.Since(tc.startTime), - cpuPercentageAvg: rc.CPUPercentAvg, - cpuPercentageMax: rc.CPUPercentMax, - ramMibAvg: rc.RAMMiBAvg, - ramMibMax: rc.RAMMiBMax, - errorCause: tc.errorCause, - }) + tc.validator.RecordResults(tc) } -// ValidateData validates data by comparing the number of items sent by load generator -// and number of items received by mock backend. +// ValidateData validates data received by mock backend against what was generated and sent to the collector +// instance(s) under test by the LoadGenerator. func (tc *TestCase) ValidateData() { select { case <-tc.ErrorSignal: @@ -292,10 +269,7 @@ func (tc *TestCase) ValidateData() { default: } - if assert.EqualValues(tc.t, tc.LoadGenerator.DataItemsSent(), tc.MockBackend.DataItemsReceived(), - "Received and sent counters do not match.") { - log.Printf("Sent and received data matches.") - } + tc.validator.Validate(tc) } // Sleep for specified duration or until error is signaled. diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go new file mode 100644 index 00000000000..baba86b3d78 --- /dev/null +++ b/testbed/testbed/validator.go @@ -0,0 +1,316 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testbed + +import ( + "encoding/hex" + "log" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/collector/translator/internaldata" +) + +// TestCaseValidator defines the interface for validating and reporting test results. +type TestCaseValidator interface { + // Validate executes validation routines and test assertions. + Validate(tc *TestCase) + // RecordResults updates the TestResultsSummary for the test suite with results of a single test. + RecordResults(tc *TestCase) +} + +// PerfTestValidator implements TestCaseValidator for test suites using PerformanceResults for summarizing results. +type PerfTestValidator struct { +} + +func (v *PerfTestValidator) Validate(tc *TestCase) { + if assert.EqualValues(tc.t, tc.LoadGenerator.DataItemsSent(), tc.MockBackend.DataItemsReceived(), + "Received and sent counters do not match.") { + log.Printf("Sent and received data matches.") + } +} + +func (v *PerfTestValidator) RecordResults(tc *TestCase) { + rc := tc.agentProc.GetTotalConsumption() + + var result string + if tc.t.Failed() { + result = "FAIL" + } else { + result = "PASS" + } + + // Remove "Test" prefix from test name. + testName := tc.t.Name()[4:] + + tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{ + testName: testName, + result: result, + receivedSpanCount: tc.MockBackend.DataItemsReceived(), + sentSpanCount: tc.LoadGenerator.DataItemsSent(), + duration: time.Since(tc.startTime), + cpuPercentageAvg: rc.CPUPercentAvg, + cpuPercentageMax: rc.CPUPercentMax, + ramMibAvg: rc.RAMMiBAvg, + ramMibMax: rc.RAMMiBMax, + errorCause: tc.errorCause, + }) +} + +// CorrectnessTestValidator implements TestCaseValidator for test suites using CorrectnessResults for summarizing results. +type CorrectnessTestValidator struct { + dataProvider DataProvider + assertionFailures []*AssertionFailure +} + +func NewCorrectTestValidator(provider DataProvider) *CorrectnessTestValidator { + return &CorrectnessTestValidator{ + dataProvider: provider, + assertionFailures: make([]*AssertionFailure, 0), + } +} + +func (v *CorrectnessTestValidator) Validate(tc *TestCase) { + if assert.EqualValues(tc.t, tc.LoadGenerator.DataItemsSent(), tc.MockBackend.DataItemsReceived(), + "Received and sent counters do not match.") { + log.Printf("Sent and received data counters match.") + } + if len(tc.MockBackend.ReceivedTraces) > 0 { + v.assertSentRecdTracingDataEqual(tc.MockBackend.ReceivedTraces) + } + if len(tc.MockBackend.ReceivedTracesOld) > 0 { + tracesList := make([]pdata.Traces, 0, len(tc.MockBackend.ReceivedTracesOld)) + for _, td := range tc.MockBackend.ReceivedTracesOld { + tracesList = append(tracesList, internaldata.OCToTraceData(td)) + } + v.assertSentRecdTracingDataEqual(tracesList) + } + // TODO enable once identified problems are fixed + //assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") +} + +func (v *CorrectnessTestValidator) RecordResults(tc *TestCase) { + var result string + if tc.t.Failed() { + result = "FAIL" + } else { + result = "PASS" + } + + // Remove "Test" prefix from test name. + testName := tc.t.Name()[4:] + tc.resultsSummary.Add(tc.t.Name(), &CorrectnessTestResult{ + testName: testName, + result: result, + duration: time.Since(tc.startTime), + receivedSpanCount: tc.MockBackend.DataItemsReceived(), + sentSpanCount: tc.LoadGenerator.DataItemsSent(), + assertionFailureCount: uint64(len(v.assertionFailures)), + assertionFailures: v.assertionFailures, + }) +} + +func (v *CorrectnessTestValidator) assertSentRecdTracingDataEqual(tracesList []pdata.Traces) { + for _, td := range tracesList { + resourceSpansList := pdata.TracesToOtlp(td) + for _, rs := range resourceSpansList { + for _, ils := range rs.InstrumentationLibrarySpans { + for _, recdSpan := range ils.Spans { + sentSpan := v.dataProvider.GetGeneratedSpan(recdSpan.TraceId, recdSpan.SpanId) + v.diffSpan(sentSpan, recdSpan) + } + } + } + + } +} + +func (v *CorrectnessTestValidator) diffSpan(sentSpan *otlptrace.Span, recdSpan *otlptrace.Span) { + if sentSpan == nil { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: recdSpan.Name, + } + v.assertionFailures = append(v.assertionFailures, af) + return + } + if hex.EncodeToString(sentSpan.TraceId) != hex.EncodeToString(recdSpan.TraceId) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "TraceId", + expectedValue: hex.EncodeToString(sentSpan.TraceId), + actualValue: hex.EncodeToString(recdSpan.TraceId), + } + v.assertionFailures = append(v.assertionFailures, af) + } + if hex.EncodeToString(sentSpan.SpanId) != hex.EncodeToString(recdSpan.SpanId) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "SpanId", + expectedValue: hex.EncodeToString(sentSpan.SpanId), + actualValue: hex.EncodeToString(recdSpan.SpanId), + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.TraceState != recdSpan.TraceState { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "TraceState", + expectedValue: sentSpan.TraceState, + actualValue: recdSpan.TraceState, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if hex.EncodeToString(sentSpan.ParentSpanId) != hex.EncodeToString(recdSpan.ParentSpanId) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "ParentSpanId", + expectedValue: hex.EncodeToString(sentSpan.ParentSpanId), + actualValue: hex.EncodeToString(recdSpan.ParentSpanId), + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.Name != recdSpan.Name { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Name", + expectedValue: sentSpan.Name, + actualValue: recdSpan.Name, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.Kind != recdSpan.Kind { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Kind", + expectedValue: sentSpan.Kind, + actualValue: recdSpan.Kind, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.StartTimeUnixNano != recdSpan.StartTimeUnixNano { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "StartTimeUnixNano", + expectedValue: sentSpan.StartTimeUnixNano, + actualValue: recdSpan.StartTimeUnixNano, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.EndTimeUnixNano != recdSpan.EndTimeUnixNano { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "StartTimeUnixNano", + expectedValue: sentSpan.EndTimeUnixNano, + actualValue: recdSpan.EndTimeUnixNano, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if len(sentSpan.Attributes) != len(recdSpan.Attributes) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Attributes", + expectedValue: len(sentSpan.Attributes), + actualValue: len(recdSpan.Attributes), + } + v.assertionFailures = append(v.assertionFailures, af) + } + //TODO compare keys and values of attributes + if sentSpan.DroppedAttributesCount != recdSpan.DroppedAttributesCount { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "DroppedAttributesCount", + expectedValue: sentSpan.DroppedAttributesCount, + actualValue: recdSpan.DroppedAttributesCount, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if len(sentSpan.Events) != len(recdSpan.Events) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Events", + expectedValue: len(sentSpan.Events), + actualValue: len(recdSpan.Events), + } + v.assertionFailures = append(v.assertionFailures, af) + } + //TODO compare contents of events + if sentSpan.DroppedEventsCount != recdSpan.DroppedEventsCount { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "DroppedEventsCount", + expectedValue: sentSpan.DroppedEventsCount, + actualValue: recdSpan.DroppedEventsCount, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if len(sentSpan.Links) != len(recdSpan.Links) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Links", + expectedValue: len(sentSpan.Links), + actualValue: len(recdSpan.Links), + } + v.assertionFailures = append(v.assertionFailures, af) + } + //TODO compare contents of links + if sentSpan.DroppedLinksCount != recdSpan.DroppedLinksCount { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "DroppedLinksCount", + expectedValue: sentSpan.DroppedLinksCount, + actualValue: recdSpan.DroppedLinksCount, + } + v.assertionFailures = append(v.assertionFailures, af) + } + if sentSpan.Status != nil && recdSpan.Status != nil { + if sentSpan.Status.Code != recdSpan.Status.Code { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Status.Code", + expectedValue: sentSpan.Status.Code, + actualValue: recdSpan.Status.Code, + } + v.assertionFailures = append(v.assertionFailures, af) + } + } else if (sentSpan.Status != nil && recdSpan.Status == nil) || (sentSpan.Status == nil && recdSpan.Status != nil) { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: sentSpan.Name, + fieldPath: "Status", + expectedValue: sentSpan.Status, + actualValue: recdSpan.Status, + } + v.assertionFailures = append(v.assertionFailures, af) + } +} diff --git a/testbed/tests/e2e_test.go b/testbed/tests/e2e_test.go index 123996f1c65..76c84877ff4 100644 --- a/testbed/tests/e2e_test.go +++ b/testbed/tests/e2e_test.go @@ -29,10 +29,16 @@ import ( ) func TestIdleMode(t *testing.T) { + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewPerfTestDataProvider(options) tc := testbed.NewTestCase( t, - testbed.NewJaegerGRPCDataSender(testbed.DefaultJaegerPort), + dataProvider, + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), testbed.NewOCDataReceiver(testbed.DefaultOCPort), + &testbed.ChildProcess{}, + &testbed.PerfTestValidator{}, + performanceResultsSummary, ) defer tc.Stop() @@ -52,11 +58,17 @@ func TestBallastMemory(t *testing.T) { {1000, 100}, } + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewPerfTestDataProvider(options) for _, test := range tests { tc := testbed.NewTestCase( t, - testbed.NewJaegerGRPCDataSender(testbed.DefaultJaegerPort), + dataProvider, + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), testbed.NewOCDataReceiver(testbed.DefaultOCPort), + &testbed.ChildProcess{}, + &testbed.PerfTestValidator{}, + performanceResultsSummary, testbed.WithSkipResults(), ) tc.SetResourceLimits(testbed.ResourceSpec{ExpectedMaxRAM: test.maxRSS}) diff --git a/testbed/tests/metric_test.go b/testbed/tests/metric_test.go index d1a6ceba324..f74a27ff681 100644 --- a/testbed/tests/metric_test.go +++ b/testbed/tests/metric_test.go @@ -24,10 +24,16 @@ import ( ) func TestMetricNoBackend10kDPSOpenCensus(t *testing.T) { + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewPerfTestDataProvider(options) tc := testbed.NewTestCase( t, - testbed.NewOCMetricDataSender(55678), + dataProvider, + testbed.NewOCMetricDataSender(testbed.DefaultHost, 55678), testbed.NewOCDataReceiver(testbed.DefaultOCPort), + &testbed.ChildProcess{}, + &testbed.PerfTestValidator{}, + performanceResultsSummary, ) defer tc.Stop() @@ -48,7 +54,7 @@ func TestMetric10kDPS(t *testing.T) { }{ { "OpenCensus", - testbed.NewOCMetricDataSender(testbed.GetAvailablePort(t)), + testbed.NewOCMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOCDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 50, @@ -57,7 +63,7 @@ func TestMetric10kDPS(t *testing.T) { }, { "OTLP", - testbed.NewOTLPMetricDataSender(testbed.GetAvailablePort(t)), + testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 50, diff --git a/testbed/tests/resource_processor_test.go b/testbed/tests/resource_processor_test.go index 964cc5df994..a293b5a8126 100644 --- a/testbed/tests/resource_processor_test.go +++ b/testbed/tests/resource_processor_test.go @@ -16,7 +16,6 @@ package tests import ( "encoding/json" - "os" "path" "path/filepath" "testing" @@ -272,7 +271,7 @@ func getMetricDataFromJSON(t *testing.T, rmString string) data.MetricData { } func TestMetricResourceProcessor(t *testing.T) { - sender := testbed.NewOTLPMetricDataSender(testbed.GetAvailablePort(t)) + sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) tests := getResourceProcessorTestCases(t) @@ -282,15 +281,26 @@ func TestMetricResourceProcessor(t *testing.T) { resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) + agentProc := &testbed.ChildProcess{} processors := map[string]string{ "resource": test.resourceProcessorConfig, } - configFile := createConfigFile(t, sender, receiver, resultDir, processors) - defer os.Remove(configFile) - - require.NotEmpty(t, configFile, "Cannot create config file") - - tc := testbed.NewTestCase(t, sender, receiver, testbed.WithConfigFile(configFile)) + configStr := createConfigYaml(t, sender, receiver, resultDir, processors) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewPerfTestDataProvider(options) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.PerfTestValidator{}, + performanceResultsSummary, + ) defer tc.Stop() tc.StartBackend() diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 24465fba691..908ad0ac51b 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -19,9 +19,7 @@ package tests import ( "fmt" - "io/ioutil" "math/rand" - "os" "path" "path/filepath" "testing" @@ -32,9 +30,11 @@ import ( "go.opentelemetry.io/collector/testbed/testbed" ) -// createConfigFile creates a collector config file that corresponds to the +var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} + +// createConfigYaml creates a collector config file that corresponds to the // sender and receiver used in the test and returns the config file name. -func createConfigFile( +func createConfigYaml( t *testing.T, sender testbed.DataSender, // Sender to send test data. receiver testbed.DataReceiver, // Receiver to receive test data. @@ -98,7 +98,7 @@ service: ` // Put corresponding elements into the config template to generate the final config. - config := fmt.Sprintf( + return fmt.Sprintf( format, sender.GenConfigYAMLStr(), receiver.GenConfigYAMLStr(), @@ -109,28 +109,6 @@ service: processorsList, receiver.ProtocolName(), ) - - // Write the config string to a temporary file. - file, err := ioutil.TempFile("", "agent*.yaml") - if err != nil { - t.Error(err) - return "" - } - - defer func() { - errClose := file.Close() - if errClose != nil { - t.Error(err) - } - }() - - if _, err = file.WriteString(config); err != nil { - t.Error(err) - return "" - } - - // Return config file name. - return file.Name() } // Run 10k data items/sec test using specified sender and receiver protocols. @@ -144,21 +122,33 @@ func Scenario10kItemsPerSecond( resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) - configFile := createConfigFile(t, sender, receiver, resultDir, processors) - defer os.Remove(configFile) - require.NotEmpty(t, configFile, "Cannot create config file") + options := testbed.LoadOptions{ + DataItemsPerSecond: 10000, + ItemsPerBatch: 100, + } + agentProc := &testbed.ChildProcess{} + configStr := createConfigYaml(t, sender, receiver, resultDir, processors) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() - tc := testbed.NewTestCase(t, sender, receiver, testbed.WithConfigFile(configFile)) + dataProvider := testbed.NewPerfTestDataProvider(options) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.PerfTestValidator{}, + performanceResultsSummary, + ) defer tc.Stop() tc.SetResourceLimits(resourceSpec) tc.StartBackend() tc.StartAgent() - tc.StartLoad(testbed.LoadOptions{ - DataItemsPerSecond: 10000, - ItemsPerBatch: 100, - }) + tc.StartLoad(options) tc.Sleep(tc.Duration) @@ -193,12 +183,18 @@ func genRandByteString(len int) string { func Scenario1kSPSWithAttrs(t *testing.T, args []string, tests []TestCase, opts ...testbed.TestCaseOption) { for i := range tests { test := tests[i] + options := constructLoadOptions(test) + t.Run(fmt.Sprintf("%d*%dbytes", test.attrCount, test.attrSizeByte), func(t *testing.T) { tc := testbed.NewTestCase( t, - testbed.NewJaegerGRPCDataSender(testbed.DefaultJaegerPort), + testbed.NewPerfTestDataProvider(options), + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), testbed.NewOCDataReceiver(testbed.DefaultOCPort), + &testbed.ChildProcess{}, + &testbed.PerfTestValidator{}, + performanceResultsSummary, opts..., ) defer tc.Stop() @@ -211,15 +207,6 @@ func Scenario1kSPSWithAttrs(t *testing.T, args []string, tests []TestCase, opts tc.StartBackend() tc.StartAgent(args...) - options := testbed.LoadOptions{DataItemsPerSecond: 1000} - options.Attributes = make(map[string]string) - - // Generate attributes. - for i := 0; i < test.attrCount; i++ { - attrName := genRandByteString(rand.Intn(199) + 1) - options.Attributes[attrName] = genRandByteString(rand.Intn(test.attrSizeByte*2-1) + 1) - } - tc.StartLoad(options) tc.Sleep(tc.Duration) tc.StopLoad() @@ -250,15 +237,22 @@ func ScenarioTestTraceNoBackend10kSPS(t *testing.T, sender testbed.DataSender, r resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) - configFile := createConfigFile(t, sender, receiver, resultDir, configuration.Processor) - defer os.Remove(configFile) - require.NotEmpty(t, configFile, "Cannot create config file") + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + agentProc := &testbed.ChildProcess{} + configStr := createConfigYaml(t, sender, receiver, resultDir, configuration.Processor) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(options) tc := testbed.NewTestCase( t, + dataProvider, sender, receiver, - testbed.WithConfigFile(configFile), + agentProc, + &testbed.PerfTestValidator{}, + performanceResultsSummary, ) defer tc.Stop() @@ -266,10 +260,22 @@ func ScenarioTestTraceNoBackend10kSPS(t *testing.T, sender testbed.DataSender, r tc.SetResourceLimits(resourceSpec) tc.StartAgent() - tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10}) + tc.StartLoad(options) tc.Sleep(tc.Duration) rss, _, _ := tc.AgentMemoryInfo() assert.True(t, rss > configuration.ExpectedMinFinalRAM) } + +func constructLoadOptions(test TestCase) testbed.LoadOptions { + options := testbed.LoadOptions{DataItemsPerSecond: 1000} + options.Attributes = make(map[string]string) + + // Generate attributes. + for i := 0; i < test.attrCount; i++ { + attrName := genRandByteString(rand.Intn(199) + 1) + options.Attributes[attrName] = genRandByteString(rand.Intn(test.attrSizeByte*2-1) + 1) + } + return options +} diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index c39662615b6..4044dcf03a2 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -22,7 +22,6 @@ package tests import ( "fmt" - "os" "path" "path/filepath" "testing" @@ -39,7 +38,7 @@ import ( // TestMain is used to initiate setup, execution and tear down of testbed. func TestMain(m *testing.M) { - testbed.DoTestMain(m) + testbed.DoTestMain(m, performanceResultsSummary) } func TestTrace10kSPS(t *testing.T) { @@ -51,7 +50,7 @@ func TestTrace10kSPS(t *testing.T) { }{ { "JaegerGRPC", - testbed.NewJaegerGRPCDataSender(testbed.GetAvailablePort(t)), + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 40, @@ -60,7 +59,7 @@ func TestTrace10kSPS(t *testing.T) { }, { "OpenCensus", - testbed.NewOCTraceDataSender(testbed.GetAvailablePort(t)), + testbed.NewOCTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOCDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 39, @@ -69,7 +68,7 @@ func TestTrace10kSPS(t *testing.T) { }, { "OTLP", - testbed.NewOTLPTraceDataSender(testbed.GetAvailablePort(t)), + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20, @@ -78,7 +77,7 @@ func TestTrace10kSPS(t *testing.T) { }, { "Zipkin", - testbed.NewZipkinDataSender(testbed.GetAvailablePort(t)), + testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewZipkinDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 60, @@ -149,7 +148,7 @@ func TestTraceNoBackend10kSPS(t *testing.T) { }{ { "JaegerGRPC", - testbed.NewJaegerGRPCDataSender(testbed.DefaultJaegerPort), + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), testbed.NewOCDataReceiver(testbed.DefaultOCPort), testbed.ResourceSpec{ ExpectedMaxCPU: 60, @@ -159,7 +158,7 @@ func TestTraceNoBackend10kSPS(t *testing.T) { }, { "Zipkin", - testbed.NewZipkinDataSender(testbed.DefaultZipkinAddressPort), + testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.DefaultZipkinAddressPort), testbed.NewOCDataReceiver(testbed.DefaultOCPort), testbed.ResourceSpec{ ExpectedMaxCPU: 80, @@ -317,8 +316,8 @@ func verifySingleSpan( td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() spans.Resize(1) - spans.At(0).SetTraceID(testbed.GenerateTraceID(1)) - spans.At(0).SetSpanID(testbed.GenerateSpanID(1)) + spans.At(0).SetTraceID(testbed.GenerateSequentialTraceID(1)) + spans.At(0).SetSpanID(testbed.GenerateSequentialSpanID(1)) spans.At(0).SetName(spanName) if sender, ok := tc.Sender.(testbed.TraceDataSender); ok { @@ -368,12 +367,12 @@ func TestTraceAttributesProcessor(t *testing.T) { }{ { "JaegerGRPC", - testbed.NewJaegerGRPCDataSender(testbed.GetAvailablePort(t)), + testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t)), }, { "OTLP", - testbed.NewOTLPTraceDataSender(testbed.GetAvailablePort(t)), + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), }, } @@ -404,12 +403,23 @@ func TestTraceAttributesProcessor(t *testing.T) { `, } - configFile := createConfigFile(t, test.sender, test.receiver, resultDir, processors) - defer os.Remove(configFile) - - require.NotEmpty(t, configFile, "Cannot create config file") + agentProc := &testbed.ChildProcess{} + configStr := createConfigYaml(t, test.sender, test.receiver, resultDir, processors) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() - tc := testbed.NewTestCase(t, test.sender, test.receiver, testbed.WithConfigFile(configFile)) + options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10} + dataProvider := testbed.NewPerfTestDataProvider(options) + tc := testbed.NewTestCase( + t, + dataProvider, + test.sender, + test.receiver, + agentProc, + &testbed.PerfTestValidator{}, + performanceResultsSummary, + ) defer tc.Stop() tc.StartBackend()