Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add reliability tests #31848

Merged
merged 13 commits into from
Apr 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
results/*
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/integrationtest/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
280 changes: 280 additions & 0 deletions exporter/elasticsearchexporter/integrationtest/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package integrationtest

import (
"context"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"github.com/shirou/gopsutil/v3/process"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"golang.org/x/sync/errgroup"
)

// createConfigYaml creates a yaml config for an otel collector for testing.
func createConfigYaml(
t testing.TB,
sender testbed.DataSender,
receiver testbed.DataReceiver,
processors map[string]string,
extensions map[string]string,
pipelineType string,
debug bool,
) string {
t.Helper()

processorSection, processorList := createConfigSection(processors)
extensionSection, extensionList := createConfigSection(extensions)
debugVerbosity := "basic"
if debug {
debugVerbosity = "detailed"
}

format := `
receivers:%v
exporters:%v
debug:
verbosity: %s

processors:
%s

extensions:
%s

service:
telemetry:
metrics:
address: 127.0.0.1:%d
extensions: [%s]
pipelines:
%s:
receivers: [%v]
processors: [%s]
exporters: [%v]
`

return fmt.Sprintf(
format,
sender.GenConfigYAMLStr(),
receiver.GenConfigYAMLStr(),
debugVerbosity,
processorSection,
extensionSection,
testutil.GetAvailablePort(t),
extensionList,
pipelineType,
sender.ProtocolName(),
processorList,
receiver.ProtocolName(),
)
}

func createConfigSection(m map[string]string) (sections, list string) {
if len(m) > 0 {
first := true
for name, cfg := range m {
sections += cfg + "\n"
if !first {
list += ","
}
list += name
first = false
}
}
return
}

// recreatableOtelCol creates an otel collector that can be used to simulate
// a crash of the collector. It implements the testbed.OtelColRunner interface.
type recreatableOtelCol struct {
tempDir string
factories otelcol.Factories
settings otelcol.CollectorSettings
configStr string
errGrp errgroup.Group
cancel context.CancelFunc

mu sync.Mutex
col *otelcol.Collector
}

func newRecreatableOtelCol(t testing.TB) *recreatableOtelCol {
var (
err error
factories otelcol.Factories
)
factories.Receivers, err = receiver.MakeFactoryMap(
otlpreceiver.NewFactory(),
)
require.NoError(t, err)
factories.Extensions, err = extension.MakeFactoryMap(
filestorage.NewFactory(),
)
require.NoError(t, err)
factories.Processors, err = processor.MakeFactoryMap()
require.NoError(t, err)
factories.Exporters, err = exporter.MakeFactoryMap(
elasticsearchexporter.NewFactory(),
debugexporter.NewFactory(),
)
require.NoError(t, err)
return &recreatableOtelCol{
tempDir: t.TempDir(),
factories: factories,
}
}

func (c *recreatableOtelCol) PrepareConfig(configStr string) (func(), error) {
configCleanup := func() {
// NoOp
}
c.configStr = configStr
return configCleanup, nil
}

func (c *recreatableOtelCol) Start(_ testbed.StartParams) error {
var err error

confFile, err := os.CreateTemp(c.tempDir, "conf-")
if err != nil {
return err
}

if _, err = confFile.Write([]byte(c.configStr)); err != nil {
os.Remove(confFile.Name())
return err
}

fmp := fileprovider.NewWithSettings(confmap.ProviderSettings{})
cfgProviderSettings := otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{confFile.Name()},
Providers: map[string]confmap.Provider{fmp.Scheme(): fmp},
},
}

c.settings = otelcol.CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: func() (otelcol.Factories, error) { return c.factories, nil },
ConfigProviderSettings: cfgProviderSettings,
SkipSettingGRPCLogger: true,
}

c.mu.Lock()
defer c.mu.Unlock()

c.col, err = otelcol.NewCollector(c.settings)
if err != nil {
return err
}

return c.run()
}

func (c *recreatableOtelCol) Stop() (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.col != nil {
c.col.Shutdown()
c.col = nil
}

if err := c.errGrp.Wait(); err != nil {
return false, err
}
return true, nil
}

func (c *recreatableOtelCol) Restart(graceful bool, shutdownFor time.Duration) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.col == nil {
return nil
}

c.col.Shutdown()
if !graceful {
c.cancel()
}
err := c.errGrp.Wait()
if err != nil {
return fmt.Errorf("failed to stop old collector: %w", err)
}

c.col, err = otelcol.NewCollector(c.settings)
if err != nil {
return fmt.Errorf("failed to create new collector: %w", err)
}

time.Sleep(shutdownFor)
return c.run()
}

func (c *recreatableOtelCol) WatchResourceConsumption() error {
return nil
}

func (c *recreatableOtelCol) GetProcessMon() *process.Process {
return nil
}

func (c *recreatableOtelCol) GetTotalConsumption() *testbed.ResourceConsumption {
return &testbed.ResourceConsumption{
CPUPercentAvg: 0,
CPUPercentMax: 0,
RAMMiBAvg: 0,
RAMMiBMax: 0,
}
}

func (c *recreatableOtelCol) GetResourceConsumption() string {
return ""
}

func (c *recreatableOtelCol) run() error {
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())

col := c.col
c.errGrp.Go(func() error {
// Ignore context canceled errors
if err := col.Run(ctx); !errors.Is(err, context.Canceled) {
return err
}
return nil
})

for {
switch state := col.GetState(); state {
case otelcol.StateStarting:
time.Sleep(time.Second)
case otelcol.StateRunning:
return nil
default:
return fmt.Errorf("unable to start, otelcol state is %d", state)
}
}
}
22 changes: 22 additions & 0 deletions exporter/elasticsearchexporter/integrationtest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package integrationtest

import (
"os"
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

func getDebugFlag(t testing.TB) bool {
raw := os.Getenv("DEBUG")
if raw == "" {
return false
}
debug, err := strconv.ParseBool(raw)
require.NoError(t, err, "debug flag parsing failed")
return debug
}
Loading
Loading