Skip to content

Commit

Permalink
Change fileexporter to use the new SharedComponents because of the fi…
Browse files Browse the repository at this point in the history
…le (#3201)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 18, 2021
1 parent 613df75 commit 1096792
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 66 deletions.
6 changes: 6 additions & 0 deletions exporter/fileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package fileexporter

import (
"errors"

"go.opentelemetry.io/collector/config"
)

Expand All @@ -30,5 +32,9 @@ var _ config.Exporter = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.Path == "" {
return errors.New("path must be non-empty")
}

return nil
}
3 changes: 1 addition & 2 deletions exporter/fileexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func TestLoadConfig(t *testing.T) {
factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.EqualError(t, err, "exporter \"file\" has invalid configuration: path must be non-empty")
require.NotNil(t, cfg)

e0 := cfg.Exporters[config.NewID(typeStr)]
Expand Down
47 changes: 17 additions & 30 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package fileexporter

import (
"context"
"os"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/sharedcomponent"
)

const (
Expand All @@ -46,52 +46,39 @@ func createDefaultConfig() config.Exporter {

func createTracesExporter(
_ context.Context,
_ component.ExporterCreateParams,
params component.ExporterCreateParams,
cfg config.Exporter,
) (component.TracesExporter, error) {
return createExporter(cfg)
fe := exporters.GetOrAdd(cfg, func() component.Component {
return &fileExporter{path: cfg.(*Config).Path}
})
return exporterhelper.NewTracesExporter(cfg, params.Logger, fe.Unwrap().(*fileExporter).ConsumeTraces)
}

func createMetricsExporter(
_ context.Context,
_ component.ExporterCreateParams,
params component.ExporterCreateParams,
cfg config.Exporter,
) (component.MetricsExporter, error) {
return createExporter(cfg)
fe := exporters.GetOrAdd(cfg, func() component.Component {
return &fileExporter{path: cfg.(*Config).Path}
})
return exporterhelper.NewMetricsExporter(cfg, params.Logger, fe.Unwrap().(*fileExporter).ConsumeMetrics)
}

func createLogsExporter(
_ context.Context,
_ component.ExporterCreateParams,
params component.ExporterCreateParams,
cfg config.Exporter,
) (component.LogsExporter, error) {
return createExporter(cfg)
}

func createExporter(config config.Exporter) (*fileExporter, error) {
cfg := config.(*Config)

// There must be one exporter for metrics, traces, and logs. We maintain a
// map of exporters per config.

// Check to see if there is already a exporter for this config.
exporter, ok := exporters[cfg]

if !ok {
file, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
exporter = &fileExporter{file: file}

// Remember the receiver in the map
exporters[cfg] = exporter
}
return exporter, nil
fe := exporters.GetOrAdd(cfg, func() component.Component {
return &fileExporter{path: cfg.(*Config).Path}
})
return exporterhelper.NewLogsExporter(cfg, params.Logger, fe.Unwrap().(*fileExporter).ConsumeLogs)
}

// This is the map of already created File exporters for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
var exporters = map[*Config]*fileExporter{}
var exporters = sharedcomponent.NewSharedComponents()
13 changes: 6 additions & 7 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestCreateMetricsExporter(t *testing.T) {
context.Background(),
component.ExporterCreateParams{Logger: zap.NewNop()},
cfg)
assert.Error(t, err)
require.Nil(t, exp)
assert.NoError(t, err)
require.NotNil(t, exp)
}

func TestCreateTracesExporter(t *testing.T) {
Expand All @@ -48,17 +48,16 @@ func TestCreateTracesExporter(t *testing.T) {
context.Background(),
component.ExporterCreateParams{Logger: zap.NewNop()},
cfg)
assert.Error(t, err)
require.Nil(t, exp)
assert.NoError(t, err)
require.NotNil(t, exp)
}

func TestCreateLogsExporter(t *testing.T) {
cfg := createDefaultConfig()

exp, err := createLogsExporter(
context.Background(),
component.ExporterCreateParams{Logger: zap.NewNop()},
cfg)
assert.Error(t, err)
require.Nil(t, exp)
assert.NoError(t, err)
require.NotNil(t, exp)
}
6 changes: 5 additions & 1 deletion exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fileexporter
import (
"context"
"io"
"os"
"sync"

"github.com/gogo/protobuf/jsonpb"
Expand All @@ -34,6 +35,7 @@ var marshaler = &jsonpb.Marshaler{}
// fileExporter is the implementation of file exporter that writes telemetry data to a file
// in Protobuf-JSON format.
type fileExporter struct {
path string
file io.WriteCloser
mutex sync.Mutex
}
Expand Down Expand Up @@ -68,7 +70,9 @@ func exportMessageAsLine(e *fileExporter, message proto.Message) error {
}

func (e *fileExporter) Start(context.Context, component.Host) error {
return nil
var err error
e.file, err = os.OpenFile(e.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
Expand Down
40 changes: 28 additions & 12 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package fileexporter

import (
"bytes"
"context"
"io/ioutil"
"os"
"testing"

"github.com/gogo/protobuf/jsonpb"
Expand All @@ -31,8 +34,7 @@ import (
)

func TestFileTracesExporter(t *testing.T) {
mf := &testutil.LimitedWriter{}
fe := &fileExporter{file: mf}
fe := &fileExporter{path: tempFileName(t)}
require.NotNil(t, fe)

td := testdata.GenerateTracesTwoSpansSameResource()
Expand All @@ -42,7 +44,9 @@ func TestFileTracesExporter(t *testing.T) {

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectortrace.ExportTraceServiceRequest{}
assert.NoError(t, unmarshaler.Unmarshal(mf, got))
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.TracesToOtlp(td.InternalRep()), got)
}

Expand All @@ -54,14 +58,13 @@ func TestFileTracesExporterError(t *testing.T) {
require.NotNil(t, fe)

td := testdata.GenerateTracesTwoSpansSameResource()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
// Cannot call Start since we inject directly the WriterCloser.
assert.Error(t, fe.ConsumeTraces(context.Background(), td))
assert.NoError(t, fe.Shutdown(context.Background()))
}

func TestFileMetricsExporter(t *testing.T) {
mf := &testutil.LimitedWriter{}
fe := &fileExporter{file: mf}
fe := &fileExporter{path: tempFileName(t)}
require.NotNil(t, fe)

md := testdata.GenerateMetricsTwoMetrics()
Expand All @@ -71,7 +74,9 @@ func TestFileMetricsExporter(t *testing.T) {

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectormetrics.ExportMetricsServiceRequest{}
assert.NoError(t, unmarshaler.Unmarshal(mf, got))
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.MetricsToOtlp(md.InternalRep()), got)
}

Expand All @@ -83,14 +88,13 @@ func TestFileMetricsExporterError(t *testing.T) {
require.NotNil(t, fe)

md := testdata.GenerateMetricsTwoMetrics()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
// Cannot call Start since we inject directly the WriterCloser.
assert.Error(t, fe.ConsumeMetrics(context.Background(), md))
assert.NoError(t, fe.Shutdown(context.Background()))
}

func TestFileLogsExporter(t *testing.T) {
mf := &testutil.LimitedWriter{}
fe := &fileExporter{file: mf}
fe := &fileExporter{path: tempFileName(t)}
require.NotNil(t, fe)

otlp := testdata.GenerateLogsTwoLogRecordsSameResource()
Expand All @@ -100,7 +104,9 @@ func TestFileLogsExporter(t *testing.T) {

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectorlogs.ExportLogsServiceRequest{}
assert.NoError(t, unmarshaler.Unmarshal(mf, got))
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.LogsToOtlp(otlp.InternalRep()), got)
}

Expand All @@ -112,7 +118,17 @@ func TestFileLogsExporterErrors(t *testing.T) {
require.NotNil(t, fe)

otlp := testdata.GenerateLogsTwoLogRecordsSameResource()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
// Cannot call Start since we inject directly the WriterCloser.
assert.Error(t, fe.ConsumeLogs(context.Background(), otlp))
assert.NoError(t, fe.Shutdown(context.Background()))
}

// tempFileName provides a temporary file name for testing.
func tempFileName(t *testing.T) string {
tmpfile, err := ioutil.TempFile("", "*.json")
require.NoError(t, err)
require.NoError(t, tmpfile.Close())
socket := tmpfile.Name()
require.NoError(t, os.Remove(socket))
return socket
}
30 changes: 16 additions & 14 deletions service/defaultcomponents/default_exporters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func verifyExporterLifecycle(t *testing.T, factory component.ExporterFactory, ge
BuildInfo: component.DefaultBuildInfo(),
}

if getConfigFn == nil {
getConfigFn = factory.CreateDefaultConfig
cfg := factory.CreateDefaultConfig()
if getConfigFn != nil {
cfg = getConfigFn()
}

createFns := []createExporterFn{
Expand All @@ -178,19 +179,20 @@ func verifyExporterLifecycle(t *testing.T, factory component.ExporterFactory, ge
wrapCreateMetricsExp(factory),
}

for _, createFn := range createFns {
firstExp, err := createFn(ctx, expCreateParams, getConfigFn())
if errors.Is(err, componenterror.ErrDataTypeIsNotSupported) {
continue
for i := 0; i < 2; i++ {
var exps []component.Exporter
for _, createFn := range createFns {
exp, err := createFn(ctx, expCreateParams, cfg)
if errors.Is(err, componenterror.ErrDataTypeIsNotSupported) {
continue
}
require.NoError(t, err)
require.NoError(t, exp.Start(ctx, host))
exps = append(exps, exp)
}
for _, exp := range exps {
assert.NoError(t, exp.Shutdown(ctx))
}
require.NoError(t, err)
require.NoError(t, firstExp.Start(ctx, host))
require.NoError(t, firstExp.Shutdown(ctx))

secondExp, err := createFn(ctx, expCreateParams, getConfigFn())
require.NoError(t, err)
require.NoError(t, secondExp.Start(ctx, host))
require.NoError(t, secondExp.Shutdown(ctx))
}
}

Expand Down

0 comments on commit 1096792

Please sign in to comment.