diff --git a/Makefile b/Makefile index fef17ab2fdb..cea651edde6 100644 --- a/Makefile +++ b/Makefile @@ -152,7 +152,7 @@ fmt: .PHONY: lint-gosec lint-gosec: - time gosec -quiet -exclude=G104,G107 ./... + time gosec -quiet -exclude=G104,G107 -exclude-dir=cmd/opentelemetry-collector ./... .PHONY: lint-staticcheck lint-staticcheck: @@ -177,6 +177,7 @@ lint: lint-staticcheck lint-gosec lint-otel .PHONY: lint-otel lint-otel: cd ${OTEL_COLLECTOR_DIR} && $(GOVET) ./... + cd ${OTEL_COLLECTOR_DIR} && time gosec -quiet -exclude=G104,G107 ./... .PHONY: go-lint go-lint: diff --git a/cmd/opentelemetry-collector/app/defaults/defaults.go b/cmd/opentelemetry-collector/app/defaults/defaults.go index edeb99abd0a..e6265fd3524 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults.go @@ -19,7 +19,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector/defaults" "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra" storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) @@ -30,8 +32,14 @@ func Components(v *viper.Viper) config.Factories { opts.InitFromViper(v) return opts }} + cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options { + opts := cassandra.DefaultOptions() + opts.InitFromViper(v) + return opts + }} factories, _ := defaults.Components() factories.Exporters[kafkaExp.Type()] = kafkaExp + factories.Exporters[cassandraExp.Type()] = cassandraExp return factories } diff --git a/cmd/opentelemetry-collector/app/defaults/defaults_test.go b/cmd/opentelemetry-collector/app/defaults/defaults_test.go index 6b7670defa1..dc80ef565ef 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults_test.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults_test.go @@ -19,16 +19,22 @@ import ( "github.com/magiconair/properties/assert" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" jConfig "github.com/jaegertracing/jaeger/pkg/config" ) func TestComponents(t *testing.T) { - v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags) + v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags) factories := Components(v) assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type()) + assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type()) kafkaFactory := factories.Exporters[kafka.TypeStr] kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config) assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers) + + cassandraFactory := factories.Exporters[cassandra.TypeStr] + cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config) + assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers) } diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/config.go b/cmd/opentelemetry-collector/app/exporter/cassandra/config.go new file mode 100644 index 00000000000..cc7c2c91285 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/config.go @@ -0,0 +1,27 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cassandra + +import ( + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" +) + +// Config holds configuration of Jaeger Cassandra exporter/storage. +type Config struct { + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + cassandra.Options `mapstructure:",squash"` +} diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go b/cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go new file mode 100644 index 00000000000..a8ec17fc015 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go @@ -0,0 +1,88 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cassandra + +import ( + "path" + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/cmd/flags" + jConfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" +) + +func TestDefaultConfig(t *testing.T) { + factory := &Factory{OptionsFactory: func() *cassandra.Options { + v, _ := jConfig.Viperize(DefaultOptions().AddFlags) + opts := DefaultOptions() + opts.InitFromViper(v) + return opts + }} + defaultCfg := factory.CreateDefaultConfig().(*Config) + assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.GetPrimary().Servers) + assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.Primary.Servers) + assert.Equal(t, 2, defaultCfg.Primary.ConnectionsPerHost) + assert.Equal(t, "jaeger_v1_test", defaultCfg.Primary.Keyspace) + assert.Equal(t, 3, defaultCfg.Primary.MaxRetryAttempts) + assert.Equal(t, 4, defaultCfg.Primary.ProtoVersion) + assert.Equal(t, time.Minute, defaultCfg.Primary.ReconnectInterval) + assert.Equal(t, time.Hour*12, defaultCfg.SpanStoreWriteCacheTTL) + assert.Equal(t, true, defaultCfg.Index.Tags) + assert.Equal(t, true, defaultCfg.Index.Logs) + assert.Equal(t, true, defaultCfg.Index.ProcessTags) +} + +func TestLoadConfigAndFlags(t *testing.T) { + factories, err := config.ExampleComponents() + require.NoError(t, err) + + v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag) + err = c.ParseFlags([]string{"--cassandra.servers=bar", "--cassandra.port=9000", "--config-file=./testdata/jaeger-config.yaml"}) + require.NoError(t, err) + + err = flags.TryLoadConfigFile(v) + require.NoError(t, err) + + factory := &Factory{OptionsFactory: func() *cassandra.Options { + opts := DefaultOptions() + opts.InitFromViper(v) + require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers) + return opts + }} + + factories.Exporters[TypeStr] = factory + colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, colConfig) + + cfg := colConfig.Exporters[TypeStr].(*Config) + assert.Equal(t, TypeStr, cfg.Name()) + assert.Equal(t, []string{"first", "second"}, cfg.Primary.Servers) + assert.Equal(t, 9000, cfg.Primary.Port) + assert.Equal(t, false, cfg.Index.Tags) + assert.Equal(t, "my-keyspace", cfg.Primary.Keyspace) + assert.Equal(t, false, cfg.Index.Tags) + assert.Equal(t, true, cfg.Index.Logs) + assert.Equal(t, "user", cfg.Primary.Authenticator.Basic.Username) + assert.Equal(t, "pass", cfg.Primary.Authenticator.Basic.Password) + assert.Equal(t, time.Second*12, cfg.SpanStoreWriteCacheTTL) + assert.Equal(t, true, cfg.Primary.TLS.Enabled) + assert.Equal(t, "/foo/bar", cfg.Primary.TLS.CAPath) +} diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/doc.go b/cmd/opentelemetry-collector/app/exporter/cassandra/doc.go new file mode 100644 index 00000000000..6060721ee79 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/doc.go @@ -0,0 +1,16 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package cassandra implements Jaeger Cassandra storage as OpenTelemetry exporter. +package cassandra diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go b/cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go new file mode 100644 index 00000000000..743130770bc --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go @@ -0,0 +1,36 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cassandra + +import ( + "github.com/open-telemetry/opentelemetry-collector/exporter" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" +) + +// New creates Cassandra exporter/storage +func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) { + f := cassandra.NewFactory() + f.InitFromOptions(&config.Options) + + err := f.Initialize(metrics.NullFactory, log) + if err != nil { + return nil, err + } + return storageOtelExporter.NewSpanWriterExporter(config, f) +} diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/factory.go b/cmd/opentelemetry-collector/app/exporter/cassandra/factory.go new file mode 100644 index 00000000000..91d3ec66e97 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/factory.go @@ -0,0 +1,78 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cassandra + +import ( + "fmt" + + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/exporter" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" +) + +const ( + // TypeStr defines type of the Cassandra exporter. + TypeStr = "jaeger_cassandra" +) + +// OptionsFactory returns initialized cassandra.OptionsFactory structure. +type OptionsFactory func() *cassandra.Options + +// DefaultOptions creates Cassandra options supported by this exporter. +func DefaultOptions() *cassandra.Options { + return cassandra.NewOptions("cassandra") +} + +// Factory is the factory for Jaeger Cassandra exporter. +type Factory struct { + OptionsFactory OptionsFactory +} + +// Type gets the type of exporter. +func (Factory) Type() string { + return TypeStr +} + +// CreateDefaultConfig returns default configuration of Factory. +// This function implements OTEL exporter.BaseFactory interface. +func (f Factory) CreateDefaultConfig() configmodels.Exporter { + opts := f.OptionsFactory() + return &Config{ + Options: *opts, + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: TypeStr, + NameVal: TypeStr, + }, + } +} + +// CreateTraceExporter creates Jaeger Cassandra trace exporter. +// This function implements OTEL exporter.Factory interface. +func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) { + config, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("could not cast configuration to %s", TypeStr) + } + return New(config, log) +} + +// CreateMetricsExporter is not implemented. +// This function implements OTEL exporter.Factory interface. +func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) { + return nil, configerror.ErrDataTypeIsNotSupported +} diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go b/cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go new file mode 100644 index 00000000000..edf5662f4a4 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cassandra + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + jConfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" +) + +func TestCreateTraceExporter(t *testing.T) { + v, _ := jConfig.Viperize(DefaultOptions().AddFlags) + opts := DefaultOptions() + opts.InitFromViper(v) + factory := Factory{OptionsFactory: func() *cassandra.Options { + return opts + }} + exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig()) + require.Nil(t, exporter) + assert.EqualError(t, err, "gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 127.0.0.1:9042: connect: connection refused") +} + +func TestCreateTraceExporter_NilConfig(t *testing.T) { + factory := Factory{} + exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil) + require.Nil(t, exporter) + assert.EqualError(t, err, "could not cast configuration to jaeger_cassandra") +} + +func TestCreateDefaultConfig(t *testing.T) { + factory := Factory{OptionsFactory: DefaultOptions} + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestCreateMetricsExporter(t *testing.T) { + f := Factory{OptionsFactory: DefaultOptions} + mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig()) + assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) + assert.Nil(t, mReceiver) +} + +func TestType(t *testing.T) { + factory := Factory{} + assert.Equal(t, TypeStr, factory.Type()) +} diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/config.yaml b/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/config.yaml new file mode 100644 index 00000000000..2440d5dbb5d --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/config.yaml @@ -0,0 +1,24 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + jaeger_cassandra: + servers: "first,second" + index: + tags: false + username: user + password: pass + span_store_write_cache_ttl: 12s + tls: + enabled: true + ca: /foo/bar + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [jaeger_cassandra] diff --git a/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/jaeger-config.yaml b/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/jaeger-config.yaml new file mode 100644 index 00000000000..5e1e16e5ce1 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/cassandra/testdata/jaeger-config.yaml @@ -0,0 +1,2 @@ +cassandra: + keyspace: my-keyspace diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go b/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go index 19a34e799f8..be83b50766b 100644 --- a/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go +++ b/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go @@ -21,7 +21,6 @@ import ( storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter" "github.com/jaegertracing/jaeger/plugin/storage/kafka" - "github.com/jaegertracing/jaeger/storage" ) // New creates new Kafka exporter @@ -32,14 +31,5 @@ func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) { if err != nil { return nil, err } - return create(f, config) -} - -func create(factory storage.Factory, config *Config) (exporter.TraceExporter, error) { - // ignoring error for code coverage. Kafka factory never returns an error - spanWriter, err := factory.CreateSpanWriter() - if err != nil { - return nil, err - } - return storageOtelExporter.NewSpanWriterExporter(config, spanWriter) + return storageOtelExporter.NewSpanWriterExporter(config, f) } diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go b/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go deleted file mode 100644 index 5b067348b15..00000000000 --- a/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2020 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber/jaeger-lib/metrics" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/storage/dependencystore" - "github.com/jaegertracing/jaeger/storage/spanstore" -) - -func TestNew(t *testing.T) { - m := &mockProducerBuilder{} - c := &Config{} - exporter, err := create(m, c) - require.Nil(t, err) - assert.NotNil(t, exporter) - m = &mockProducerBuilder{err: errors.New("failed to create")} - exporter, err = create(m, c) - assert.Error(t, err, "failed to create") - assert.Nil(t, exporter) -} - -type mockProducerBuilder struct { - err error -} - -func (m mockProducerBuilder) CreateSpanWriter() (spanstore.Writer, error) { - return nil, m.err -} -func (mockProducerBuilder) CreateSpanReader() (spanstore.Reader, error) { - return nil, nil -} -func (mockProducerBuilder) CreateDependencyReader() (dependencystore.Reader, error) { - return nil, nil -} -func (mockProducerBuilder) Initialize(metrics.Factory, *zap.Logger) error { - return nil -} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/factory.go b/cmd/opentelemetry-collector/app/exporter/kafka/factory.go index 45092bb6acd..21cffc9a81c 100644 --- a/cmd/opentelemetry-collector/app/exporter/kafka/factory.go +++ b/cmd/opentelemetry-collector/app/exporter/kafka/factory.go @@ -48,6 +48,7 @@ func (Factory) Type() string { } // CreateDefaultConfig returns default configuration of Factory. +// This function implements OTEL exporter.BaseFactory interface. func (f Factory) CreateDefaultConfig() configmodels.Exporter { opts := f.OptionsFactory() return &Config{ @@ -60,6 +61,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { } // CreateTraceExporter creates Jaeger Kafka trace exporter. +// This function implements OTEL exporter.Factory interface. func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) { kafkaCfg, ok := cfg.(*Config) if !ok { @@ -69,6 +71,7 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) ( } // CreateMetricsExporter is not implemented. +// This function implements OTEL exporter.Factory interface. func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) { return nil, configerror.ErrDataTypeIsNotSupported } diff --git a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go index a8ba04fa714..e16c29f4571 100644 --- a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go +++ b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go @@ -26,11 +26,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector/oterr" jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger" + jaegerstorage "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/spanstore" ) // NewSpanWriterExporter returns exporter.TraceExporter -func NewSpanWriterExporter(config configmodels.Exporter, spanWriter spanstore.Writer) (exporter.TraceExporter, error) { +func NewSpanWriterExporter(config configmodels.Exporter, factory jaegerstorage.Factory) (exporter.TraceExporter, error) { + spanWriter, err := factory.CreateSpanWriter() + if err != nil { + return nil, err + } storage := storage{Writer: spanWriter} return exporterhelper.NewTraceExporter( config, diff --git a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go index ebe06a75a10..71a98bed855 100644 --- a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go +++ b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go @@ -24,21 +24,34 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" ) -func TestNew(t *testing.T) { - exporter, err := NewSpanWriterExporter(&configmodels.ExporterSettings{}, spanWriter{}) +func TestNew_closableWriter(t *testing.T) { + exporter, err := NewSpanWriterExporter(&configmodels.ExporterSettings{}, mockStorageFactory{spanWriter: spanWriter{}}) require.NoError(t, err) assert.NotNil(t, exporter) assert.Nil(t, exporter.Shutdown()) - exporter, err = NewSpanWriterExporter(&configmodels.ExporterSettings{}, noClosableWriter{}) +} + +func TestNew_noClosableWriter(t *testing.T) { + exporter, err := NewSpanWriterExporter(&configmodels.ExporterSettings{}, mockStorageFactory{spanWriter: noClosableWriter{}}) require.NoError(t, err) assert.NotNil(t, exporter) assert.Nil(t, exporter.Shutdown()) } +func TestNew_failedToCreateWriter(t *testing.T) { + exporter, err := NewSpanWriterExporter(&configmodels.ExporterSettings{}, mockStorageFactory{err: errors.New("failed to create writer"), spanWriter: spanWriter{}}) + require.Nil(t, exporter) + assert.Error(t, err, "failed to create writer") +} + func TestStore(t *testing.T) { traceID := []byte("0123456789abcdef") spanId := []byte("01234567") @@ -118,3 +131,21 @@ type noClosableWriter struct { func (noClosableWriter) WriteSpan(span *model.Span) error { return nil } + +type mockStorageFactory struct { + err error + spanWriter spanstore.Writer +} + +func (m mockStorageFactory) CreateSpanWriter() (spanstore.Writer, error) { + return m.spanWriter, m.err +} +func (mockStorageFactory) CreateSpanReader() (spanstore.Reader, error) { + return nil, nil +} +func (mockStorageFactory) CreateDependencyReader() (dependencystore.Reader, error) { + return nil, nil +} +func (mockStorageFactory) Initialize(metrics.Factory, *zap.Logger) error { + return nil +} diff --git a/cmd/opentelemetry-collector/go.sum b/cmd/opentelemetry-collector/go.sum index bc6534e838d..c84520a15c8 100644 --- a/cmd/opentelemetry-collector/go.sum +++ b/cmd/opentelemetry-collector/go.sum @@ -259,6 +259,7 @@ github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/V github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= +github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb h1:H3tisfjQwq9FTyWqlKsZpgoYrsvn2pmTWvAiDHa5pho= github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/googleapis v1.0.1-0.20180501115203-b23578765ee5/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -366,6 +367,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.11.1 h1:/dBYI+n4xIL+Y9SKXQrjlKTmJJDwCS github.com/grpc-ecosystem/grpc-gateway v1.11.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.13.0 h1:sBDQoHXrOlfPobnKw69FIKa1wg9qsLLvvQ/Y19WtFgI= github.com/grpc-ecosystem/grpc-gateway v1.13.0/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.2.0 h1:oPsuzLp2uk7I7rojPKuncWbZ+m5TMoD4Ivs+2Rkeh4Y= diff --git a/cmd/opentelemetry-collector/main.go b/cmd/opentelemetry-collector/main.go index edc159632d9..e38b52bcbbb 100644 --- a/cmd/opentelemetry-collector/main.go +++ b/cmd/opentelemetry-collector/main.go @@ -24,6 +24,7 @@ import ( jflags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" jconfig "github.com/jaegertracing/jaeger/pkg/config" ) @@ -59,6 +60,7 @@ func main() { cmd, jflags.AddConfigFileFlag, kafka.DefaultOptions().AddFlags, + cassandra.DefaultOptions().AddFlags, ) // parse flags to propagate Jaeger config file flag value to viper diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 1faf06a1b7e..2453ca97d30 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -29,35 +29,35 @@ import ( // Configuration describes the configuration properties needed to connect to a Cassandra cluster type Configuration struct { - Servers []string `validate:"nonzero"` - Keyspace string `validate:"nonzero"` - LocalDC string `yaml:"local_dc"` - ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"` - Timeout time.Duration `validate:"min=500"` - ConnectTimeout time.Duration `yaml:"connect_timeout"` - ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"` - SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"` - MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"` - ProtoVersion int `yaml:"proto_version"` - Consistency string `yaml:"consistency"` - DisableCompression bool `yaml:"disable-compression"` - Port int `yaml:"port"` - Authenticator Authenticator `yaml:"authenticator"` - DisableAutoDiscovery bool `yaml:"disable_auto_discovery"` - EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"` - TLS tlscfg.Options + Servers []string `validate:"nonzero" mapstructure:"servers"` + Keyspace string `validate:"nonzero" mapstructure:"keyspace"` + LocalDC string `yaml:"local_dc" mapstructure:"local_dc"` + ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host" mapstructure:"connections_per_host"` + Timeout time.Duration `validate:"min=500" mapstructure:"-"` + ConnectTimeout time.Duration `yaml:"connect_timeout" mapstructure:"connection_timeout"` + ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval" mapstructure:"reconnect_interval"` + SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive" mapstructure:"socket_keep_alive"` + MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt" mapstructure:"max_retry_attempts"` + ProtoVersion int `yaml:"proto_version" mapstructure:"proto_version"` + Consistency string `yaml:"consistency" mapstructure:"consistency"` + DisableCompression bool `yaml:"disable-compression" mapstructure:"disable_compression"` + Port int `yaml:"port" mapstructure:"port"` + Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"` + DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"` + EnableDependenciesV2 bool `yaml:"enable_dependencies_v2" mapstructure:"-"` + TLS tlscfg.Options `mapstructure:"tls"` } // Authenticator holds the authentication properties needed to connect to a Cassandra cluster type Authenticator struct { - Basic BasicAuthenticator `yaml:"basic"` + Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"` // TODO: add more auth types } // BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster type BasicAuthenticator struct { - Username string `yaml:"username"` - Password string `yaml:"password"` + Username string `yaml:"username" mapstructure:"username"` + Password string `yaml:"password" mapstructure:"password"` } // ApplyDefaults copies settings from source unless its own value is non-zero. diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index f4778c08d94..6bd74959303 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -73,6 +73,15 @@ func (f *Factory) InitFromViper(v *viper.Viper) { } } +// InitFromOptions initializes factory from options. +func (f *Factory) InitFromOptions(o *Options) { + f.Options = o + f.primaryConfig = o.GetPrimary() + if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { + f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error + } +} + // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil}) @@ -141,8 +150,8 @@ func writerOptions(opts *Options) ([]cSpanStore.Option, error) { var tagFilters []dbmodel.TagFilter // drop all tag filters - if opts.DisableTagsIndex || opts.DisableProcessTagsIndex || opts.DisableLogsIndex { - tagFilters = append(tagFilters, dbmodel.NewTagFilterDropAll(opts.DisableTagsIndex, opts.DisableProcessTagsIndex, opts.DisableLogsIndex)) + if !opts.Index.Tags || !opts.Index.ProcessTags || !opts.Index.Logs { + tagFilters = append(tagFilters, dbmodel.NewTagFilterDropAll(!opts.Index.Tags, !opts.Index.ProcessTags, !opts.Index.Logs)) } // black/white list tag filters diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 2b739a71b3a..2430bc9bf7e 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -181,3 +181,13 @@ func TestWriterOptions(t *testing.T) { options, _ = writerOptions(opts) assert.Len(t, options, 0) } + +func TestInitFromOptions(t *testing.T) { + f := NewFactory() + o := NewOptions("foo", archiveStorageConfig) + o.others[archiveStorageConfig].Enabled = true + f.InitFromOptions(o) + assert.Equal(t, o, f.Options) + assert.Equal(t, o.GetPrimary(), f.primaryConfig) + assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) +} diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index f0307f7ad7a..24592c22363 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -61,32 +61,38 @@ const ( // to bind them to command line flag and apply overlays, so that some configurations // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { - primary *namespaceConfig - others map[string]*namespaceConfig - SpanStoreWriteCacheTTL time.Duration - tagIndexBlacklist string - tagIndexWhitelist string - DisableLogsIndex bool - DisableTagsIndex bool - DisableProcessTagsIndex bool + Primary namespaceConfig `mapstructure:",squash"` + others map[string]*namespaceConfig + SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` + Index IndexConfig `mapstructure:"index"` +} + +// IndexConfig configures indexing. +// By default all indexing is enabled. +type IndexConfig struct { + Logs bool `mapstructure:"logs"` + Tags bool `mapstructure:"tags"` + ProcessTags bool `mapstructure:"process_tags"` + TagBlackList string `mapstructure:"tag_blacklist"` + TagWhiteList string `mapstructure:"tag_whitelist"` } // the Servers field in config.Configuration is a list, which we cannot represent with flags. // This struct adds a plain string field that can be bound to flags and is then parsed when // preparing the actual config.Configuration. type namespaceConfig struct { - config.Configuration - servers string - namespace string - primary bool - Enabled bool + config.Configuration `mapstructure:",squash"` + servers string + namespace string + primary bool + Enabled bool `mapstructure:"-"` } // NewOptions creates a new Options struct. func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { // TODO all default values should be defined via cobra flags options := &Options{ - primary: &namespaceConfig{ + Primary: namespaceConfig{ Configuration: config.Configuration{ MaxRetryAttempts: 3, Keyspace: "jaeger_v1_test", @@ -112,36 +118,36 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { - addFlags(flagSet, opt.primary) + addFlags(flagSet, opt.Primary) for _, cfg := range opt.others { - addFlags(flagSet, cfg) + addFlags(flagSet, *cfg) } - flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL, + flagSet.Duration(opt.Primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") flagSet.String( - opt.primary.namespace+suffixIndexTagsBlacklist, - opt.tagIndexBlacklist, + opt.Primary.namespace+suffixIndexTagsBlacklist, + opt.Index.TagBlackList, "The comma-separated list of span tags to blacklist from being indexed. All other tags will be indexed. Mutually exclusive with the whitelist option.") flagSet.String( - opt.primary.namespace+suffixIndexTagsWhitelist, - opt.tagIndexWhitelist, + opt.Primary.namespace+suffixIndexTagsWhitelist, + opt.Index.TagWhiteList, "The comma-separated list of span tags to whitelist for being indexed. All other tags will not be indexed. Mutually exclusive with the blacklist option.") flagSet.Bool( - opt.primary.namespace+suffixIndexLogs, - !opt.DisableLogsIndex, + opt.Primary.namespace+suffixIndexLogs, + !opt.Index.Logs, "Controls log field indexing. Set to false to disable.") flagSet.Bool( - opt.primary.namespace+suffixIndexTags, - !opt.DisableTagsIndex, + opt.Primary.namespace+suffixIndexTags, + !opt.Index.Tags, "Controls tag indexing. Set to false to disable.") flagSet.Bool( - opt.primary.namespace+suffixIndexProcessTags, - !opt.DisableProcessTagsIndex, + opt.Primary.namespace+suffixIndexProcessTags, + !opt.Index.ProcessTags, "Controls process tag indexing. Set to false to disable.") } -func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { +func addFlags(flagSet *flag.FlagSet, nsConfig namespaceConfig) { var tlsFlagsConfig = tlsFlagsConfig(nsConfig.namespace) tlsFlagsConfig.AddFlags(flagSet) @@ -223,16 +229,16 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { - opt.primary.initFromViper(v) + opt.Primary.initFromViper(v) for _, cfg := range opt.others { cfg.initFromViper(v) } - opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL) - opt.tagIndexBlacklist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixIndexTagsBlacklist)) - opt.tagIndexWhitelist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixIndexTagsWhitelist)) - opt.DisableTagsIndex = !v.GetBool(opt.primary.namespace + suffixIndexTags) - opt.DisableLogsIndex = !v.GetBool(opt.primary.namespace + suffixIndexLogs) - opt.DisableProcessTagsIndex = !v.GetBool(opt.primary.namespace + suffixIndexProcessTags) + opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.Primary.namespace + suffixSpanStoreWriteCacheTTL) + opt.Index.TagBlackList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsBlacklist)) + opt.Index.TagWhiteList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsWhitelist)) + opt.Index.Tags = v.GetBool(opt.Primary.namespace + suffixIndexTags) + opt.Index.Logs = v.GetBool(opt.Primary.namespace + suffixIndexLogs) + opt.Index.ProcessTags = v.GetBool(opt.Primary.namespace + suffixIndexProcessTags) } func tlsFlagsConfig(namespace string) tlscfg.ClientFlagsConfig { @@ -273,8 +279,8 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { // GetPrimary returns primary configuration. func (opt *Options) GetPrimary() *config.Configuration { - opt.primary.Servers = strings.Split(opt.primary.servers, ",") - return &opt.primary.Configuration + opt.Primary.Servers = strings.Split(opt.Primary.servers, ",") + return &opt.Primary.Configuration } // Get returns auxiliary named configuration. @@ -287,9 +293,9 @@ func (opt *Options) Get(namespace string) *config.Configuration { if !nsCfg.Enabled { return nil } - nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration) + nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) if nsCfg.servers == "" { - nsCfg.servers = opt.primary.servers + nsCfg.servers = opt.Primary.servers } nsCfg.Servers = strings.Split(nsCfg.servers, ",") return &nsCfg.Configuration @@ -297,8 +303,8 @@ func (opt *Options) Get(namespace string) *config.Configuration { // TagIndexBlacklist returns the list of blacklisted tags func (opt *Options) TagIndexBlacklist() []string { - if len(opt.tagIndexBlacklist) > 0 { - return strings.Split(opt.tagIndexBlacklist, ",") + if len(opt.Index.TagBlackList) > 0 { + return strings.Split(opt.Index.TagBlackList, ",") } return nil @@ -306,8 +312,8 @@ func (opt *Options) TagIndexBlacklist() []string { // TagIndexWhitelist returns the list of whitelisted tags func (opt *Options) TagIndexWhitelist() []string { - if len(opt.tagIndexWhitelist) > 0 { - return strings.Split(opt.tagIndexWhitelist, ",") + if len(opt.Index.TagWhiteList) > 0 { + return strings.Split(opt.Index.TagWhiteList, ",") } return nil diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 1b5b164a1a9..148cee3700c 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -80,9 +80,9 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, false, primary.EnableDependenciesV2) assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist()) assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist()) - assert.Equal(t, false, opts.DisableTagsIndex) - assert.Equal(t, true, opts.DisableProcessTagsIndex) - assert.Equal(t, false, opts.DisableLogsIndex) + assert.Equal(t, true, opts.Index.Tags) + assert.Equal(t, false, opts.Index.ProcessTags) + assert.Equal(t, true, opts.Index.Logs) aux := opts.Get("cas-aux") require.NotNil(t, aux)