Skip to content

Commit

Permalink
Add authentication support to kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Sep 16, 2020
1 parent 471c4a6 commit 9994700
Show file tree
Hide file tree
Showing 17 changed files with 308 additions and 4 deletions.
17 changes: 17 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,23 @@ The following settings can be optionally configured:
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `authentication`
- `type` (default = none): The authentication type. Supported types are `plain_text`, `tls`, `kerberos`.
- `plain_text`
- `username`: The username to use.
- `password`: The password to use
- `tls`
- `ca_file` path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
- `cert_file` path to the TLS cert to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `key_file` path to the TLS key to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `insecure` (default = false): Disable verifying the server's certificate chain and host
name (`InsecureSkipVerify` in the tls config)
- `server_name_override`: ServerName indicates the name of the server requested by the client
in order to support virtual hosting.
>>>>>>> Add authentication support to kafka
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
Expand Down
74 changes: 74 additions & 0 deletions exporter/kafkaexporter/authentication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"fmt"
"strings"

"github.com/Shopify/sarama"

"go.opentelemetry.io/collector/config/configtls"
)

// ConfigureAuthentication configures authentication in sarama.Config.
func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error {
switch AuthType(strings.TrimSpace(string(config.Type))) {
case AuthTypeNone, "":
return nil
case AuthTypeTLS:
return configureTLS(config.TLS, saramaConfig)
case AuthTypeKerberos:
configureKerberos(config.Kerberos, saramaConfig)
return nil
case AuthTypePlaintext:
configurePlaintext(config.PlainText, saramaConfig)
return nil
default:
return fmt.Errorf("unknown/unsupported authentication method %v to kafka cluster", config.Type)
}
}

func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.Username
saramaConfig.Net.SASL.Password = config.Password
}

func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error {
tlsConfig, err := config.LoadTLSConfig()
if err != nil {
return fmt.Errorf("error loading tls config: %w", err)
}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
return nil
}

func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaConfig.Net.SASL.Enable = true
if config.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.Password = config.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
}
97 changes: 97 additions & 0 deletions exporter/kafkaexporter/authentication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config/configtls"
)

func TestAuthentication(t *testing.T) {
saramaPlaintext := &sarama.Config{}
saramaPlaintext.Net.SASL.Enable = true
saramaPlaintext.Net.SASL.User = "jdoe"
saramaPlaintext.Net.SASL.Password = "pass"

saramaTLSCfg := &sarama.Config{}
saramaTLSCfg.Net.TLS.Enable = true
tlsClient := configtls.TLSClientSetting{}
tlscfg, err := tlsClient.LoadTLSConfig()
require.NoError(t, err)
saramaTLSCfg.Net.TLS.Config = tlscfg

saramaKerberosCfg := &sarama.Config{}
saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosCfg.Net.SASL.Enable = true
saramaKerberosCfg.Net.SASL.GSSAPI.ServiceName = "foobar"
saramaKerberosCfg.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH

saramaKerberosKeyTabCfg := &sarama.Config{}
saramaKerberosKeyTabCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosKeyTabCfg.Net.SASL.Enable = true
saramaKerberosKeyTabCfg.Net.SASL.GSSAPI.KeyTabPath = "/path"
saramaKerberosKeyTabCfg.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH

tests := []struct {
auth Authentication
saramaConfig *sarama.Config
err string
}{
{
auth: Authentication{Type: AuthType("foo")},
err: "unknown/unsupported authentication method",
},
{
auth: Authentication{Type: AuthTypePlaintext, PlainText: PlainTextConfig{Username: "jdoe", Password: "pass"}},
saramaConfig: saramaPlaintext,
},
{
auth: Authentication{Type: AuthTypeTLS, TLS: configtls.TLSClientSetting{}},
saramaConfig: saramaTLSCfg,
},
{
auth: Authentication{Type: AuthTypeTLS, TLS: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{CAFile: "/doesnotexists"},
}},
saramaConfig: saramaTLSCfg,
err: "failed to load TLS config",
},
{
auth: Authentication{Type: AuthTypeKerberos, Kerberos: KerberosConfig{ServiceName: "foobar"}},
saramaConfig: saramaKerberosCfg,
},
{
auth: Authentication{Type: AuthTypeKerberos, Kerberos: KerberosConfig{UseKeyTab: true, KeyTabPath: "/path"}},
saramaConfig: saramaKerberosKeyTabCfg,
},
}
for _, test := range tests {
t.Run(string(test.auth.Type), func(t *testing.T) {
config := &sarama.Config{}
err := ConfigureAuthentication(test.auth, config)
if test.err != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.err)
} else {
assert.Equal(t, test.saramaConfig, config)
}
})
}
}
44 changes: 42 additions & 2 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand All @@ -41,8 +42,47 @@ type Config struct {
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`

// TODO authentication
// TODO batch settings
// Authentication defines used authentication mechanism.
Authentication Authentication `mapstructure:"authentication"`
}

// AuthType defines authentication type
type AuthType string

const (
// No authentication
AuthTypeNone AuthType = "none"
// Plain text authentication
AuthTypePlaintext AuthType = "plain_text"
// TLS authentication
AuthTypeTLS AuthType = "tls"
// Kerberos authentication
AuthTypeKerberos AuthType = "kerberos"
)

// Authentication defines authentication.
type Authentication struct {
Type AuthType `mapstructure:"type"`
PlainText PlainTextConfig `mapstructure:"plain_text"`
TLS configtls.TLSClientSetting `mapstructure:"tls"`
Kerberos KerberosConfig `mapstructure:"kerberos"`
}

// PlainTextConfig defines plaintext authentication.
type PlainTextConfig struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}

// KerberosConfig defines kereros configuration.
type KerberosConfig struct {
ServiceName string `mapstructure:"service_name"`
Realm string `mapstructure:"realm"`
UseKeyTab bool `mapstructure:"use_keytab"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
ConfigPath string `mapstructure:"config_file"`
KeyTabPath string `mapstructure:"keytab_file"`
}

// Metadata defines configuration for retrieving metadata from the broker.
Expand Down
7 changes: 7 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Authentication: Authentication{
Type: "plain_text",
PlainText: PlainTextConfig{
Username: "jdoe",
Password: "pass",
},
},
Metadata: Metadata{
Full: false,
Retry: MetadataRetry{
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func createDefaultConfig() configmodels.Exporter {
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Encoding: defaultEncoding,
Authentication: Authentication{
Type: AuthTypeNone,
},
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, defaultTopic, cfg.Topic)
assert.Equal(t, AuthTypeNone, cfg.Authentication.Type)
}

func TestCreateTracesExporter(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func newExporter(config Config, params component.ExporterCreateParams, marshalle
}
c.Version = version
}
if err := ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
producer, err := sarama.NewSyncProducer(config.Brokers, c)
if err != nil {
return nil, err
Expand Down
17 changes: 16 additions & 1 deletion exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,26 @@ func TestNewExporter_err_version(t *testing.T) {

func TestNewExporter_err_encoding(t *testing.T) {
c := Config{Encoding: "foo"}
exp, err := newExporter(c, component.ExporterCreateParams{}, map[string]Marshaller{})
exp, err := newExporter(c, component.ExporterCreateParams{}, defaultMarshallers())
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
assert.Nil(t, exp)
}

func TestNewExporter_err_auth_type(t *testing.T) {
c := Config{
ProtocolVersion: "2.0.0",
Authentication: Authentication{Type: AuthType("foo")},
Encoding: defaultEncoding,
Metadata: Metadata{
Full: false,
},
}
exp, err := newExporter(c, component.ExporterCreateParams{}, defaultMarshallers())
assert.Error(t, err)
assert.Contains(t, err.Error(), "unknown/unsupported authentication method")
assert.Nil(t, exp)
}

func TestTraceDataPusher(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down
5 changes: 5 additions & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ exporters:
retry:
max: 15
timeout: 10s
authentication:
type: plain_text
plain_text:
username: jdoe
password: pass
sending_queue:
enabled: true
num_consumers: 2
Expand Down
16 changes: 16 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ The following settings can be optionally configured:
- `zipkin_thrift`: the payload is deserialized into Zipkin Thrift spans.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `authentication`
- `type` (default = none): The authentication type. Supported types are `plain_text`, `tls`, `kerberos`.
- `plain_text`
- `username`: The username to use.
- `password`: The password to use
- `tls`
- `ca_file` path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
- `cert_file` path to the TLS cert to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `key_file` path to the TLS key to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `insecure` (default = false): Disable verifying the server's certificate chain and host
name (`InsecureSkipVerify` in the tls config)
- `server_name_override`: ServerName indicates the name of the server requested by the client
in order to support virtual hosting.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ type Config struct {
// Client, and shared by the Producer/Consumer.
Metadata kafkaexporter.Metadata `mapstructure:"metadata"`

// TODO authentication
Authentication kafkaexporter.Authentication `mapstructure:"authentication"`
}
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
)

Expand All @@ -49,6 +50,16 @@ func TestLoadConfig(t *testing.T) {
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
Authentication: kafkaexporter.Authentication{
Type: kafkaexporter.AuthTypeTLS,
TLS: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
Metadata: kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Expand Down
Loading

0 comments on commit 9994700

Please sign in to comment.