Skip to content

Commit

Permalink
Add Cassandra OTEL exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Mar 24, 2020
1 parent 87c466d commit ec033b0
Show file tree
Hide file tree
Showing 14 changed files with 467 additions and 69 deletions.
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
}
86 changes: 86 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"path"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestDefaultConfig(t *testing.T) {
factory := &Factory{Options: func() *cassandra.Options {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.GetPrimary().Servers)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.Primary.Servers)
assert.Equal(t, 2, defaultCfg.Primary.ConnectionsPerHost)
assert.Equal(t, "jaeger_v1_test", defaultCfg.Primary.Keyspace)
assert.Equal(t, 3, defaultCfg.Primary.MaxRetryAttempts)
assert.Equal(t, 4, defaultCfg.Primary.ProtoVersion)
assert.Equal(t, time.Minute, defaultCfg.Primary.ReconnectInterval)
assert.Equal(t, time.Hour*12, defaultCfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, defaultCfg.Index.Tags)
assert.Equal(t, true, defaultCfg.Index.Logs)
assert.Equal(t, true, defaultCfg.Index.ProcessTags)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--cassandra.servers=bar", "--cassandra.port=9000", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{Options: func() *cassandra.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
return opts
}}

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, []string{"first", "second"}, cfg.Primary.Servers)
assert.Equal(t, 9000, cfg.Primary.Port)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, "my-keyspace", cfg.Primary.Keyspace)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, true, cfg.Index.Logs)
assert.Equal(t, "user", cfg.Primary.Authenticator.Basic.Username)
assert.Equal(t, "pass", cfg.Primary.Authenticator.Basic.Password)
assert.Equal(t, time.Second*12, cfg.SpanStoreWriteCacheTTL)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cassandra implements Jaeger Cassandra storage as OpenTelemetry exporter.
package cassandra
77 changes: 77 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"context"
"io"

"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
spanstoreInterface "github.com/jaegertracing/jaeger/storage/spanstore"
)

// New creates Cassandra exporter/storage
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
fac := cassandra.NewFactory()
fac.InitFromOptions(&config.Options)

err := fac.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
spanWriter, err := fac.CreateSpanWriter()
if err != nil {
return nil, err
}
storage := storage{Writer: spanWriter}
return exporterhelper.NewTraceExporter(
config,
storage.store,
exporterhelper.WithShutdown(func() error {
closer := spanWriter.(io.Closer)
closer.Close()
return nil
}))
}

type storage struct {
Writer spanstoreInterface.Writer
}

// Store stores data into storage
func (s *storage) store(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) {
protoBatch, err := jaegertranslator.OCProtoToJaegerProto(td)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
}
dropped := 0
for _, span := range protoBatch.Spans {
span.Process = protoBatch.Process
err := s.Writer.WriteSpan(span)
// TODO should we wrap errors as we go and return?
if err != nil {
dropped++
}
}
return dropped, nil
}
75 changes: 75 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

const (
// TypeStr defines type of the Cassandra exporter.
TypeStr = "jaeger_cassandra"
)

// OptionsFactory returns initialized cassandra.OptionsFactory structure.
type OptionsFactory func() *cassandra.Options

// DefaultOptions creates Cassandra options supported by this exporter.
func DefaultOptions() *cassandra.Options {
return cassandra.NewOptions("cassandra")
}

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
Options OptionsFactory
}

// Type gets the type of exporter.
func (Factory) Type() string {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.Options()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Cassandra trace exporter.
func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return New(config, log)
}

// CreateMetricsExporter is not implemented.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
66 changes: 66 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := Factory{Options: func() *cassandra.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
require.Nil(t, exporter)
assert.EqualError(t, err, "gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 127.0.0.1:9042: connect: connection refused")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil)
require.Nil(t, exporter)
assert.EqualError(t, err, "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{Options: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{Options: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestType(t *testing.T) {
factory := Factory{}
assert.Equal(t, TypeStr, factory.Type())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
receivers:
examplereceiver:

processors:
exampleprocessor:

exporters:
jaeger_cassandra:
servers: "first,second"
index:
tags: false
username: user
password: pass
span_store_write_cache_ttl: 12s

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [jaeger_cassandra]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cassandra:
keyspace: my-keyspace
8 changes: 7 additions & 1 deletion cmd/opentelemetry-collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab

replace github.com/jaegertracing/jaeger => ./../../

require github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200318211436-c7a11d6181c1 // indirect
require (
github.com/jaegertracing/jaeger v1.17.0
github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200318211436-c7a11d6181c1
github.com/stretchr/testify v1.5.0
github.com/uber/jaeger-lib v2.2.0+incompatible
go.uber.org/zap v1.13.0
)
Loading

0 comments on commit ec033b0

Please sign in to comment.