Skip to content

Commit

Permalink
Add SASL&mTLS authentication support for Kafka in Promtail (#4663)
Browse files Browse the repository at this point in the history
* Add xdg-go module

* Add mTLS, SASL/PLAIN, SASL/SCRAM authentication support

* docs: Add authentication configuration

* Add test tools for kafka authentication

* Add test stacks for SSL, SASL/PLAIN, SASL/SCRAM, SASL over TLS authentication

* Fix shelllint
  • Loading branch information
taisho6339 authored Nov 9, 2021
1 parent 9762b31 commit 1557dab
Show file tree
Hide file tree
Showing 62 changed files with 6,082 additions and 1 deletion.
26 changes: 26 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-plain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: PLAIN
user: kafkaadmin
password: kafkaadmin-pass
use_tls: false
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
26 changes: 26 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-scram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: SCRAM-SHA-512
user: kafkaadmin
password: kafkaadmin-pass
use_tls: false
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
28 changes: 28 additions & 0 deletions clients/cmd/promtail/promtail-kafka-sasl-ssl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-sasl-plain
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: sasl
sasl_config:
mechanism: PLAIN
user: kafkaadmin
password: kafkaadmin-pass
use_tls: true
ca_file: ../../../tools/kafka/secrets/promtail-kafka-ca.pem
insecure_skip_verify: true
group_id: kafka_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-sasl-plain
27 changes: 27 additions & 0 deletions clients/cmd/promtail/promtail-kafka-ssl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka-mtls
kafka:
use_incoming_timestamp: false
brokers:
- localhost:29092
authentication:
type: ssl
tls_config:
ca_file: ../../../tools/kafka/secrets/promtail-kafka-ca.pem
cert_file: ../../../tools/kafka/secrets/kafka.consumer.keystore.cer.pem
key_file: ../../../tools/kafka/secrets/kafka.consumer.keystore.key.pem
server_name: localhost
insecure_skip_verify: true
group_id: kafka_mtls_group
topics:
- foo
- ^promtail.*
labels:
job: kafka-mtls
49 changes: 49 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"reflect"
"time"

"github.com/Shopify/sarama"
"github.com/grafana/dskit/flagext"

promconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
Expand Down Expand Up @@ -244,6 +247,52 @@ type KafkaTargetConfig struct {

// Rebalancing strategy to use. (e.g sticky, roundrobin or range)
Assignor string `yaml:"assignor"`

// Authentication strategy with Kafka brokers
Authentication KafkaAuthentication `yaml:"authentication"`
}

// KafkaAuthenticationType specifies method to authenticate with Kafka brokers
type KafkaAuthenticationType string

const (
// KafkaAuthenticationTypeNone represents using no authentication
KafkaAuthenticationTypeNone = "none"
// KafkaAuthenticationTypeSSL represents using SSL/TLS to authenticate
KafkaAuthenticationTypeSSL = "ssl"
// KafkaAuthenticationTypeSASL represents using SASL to authenticate
KafkaAuthenticationTypeSASL = "sasl"
)

// KafkaAuthentication describe the configuration for authentication with Kafka brokers
type KafkaAuthentication struct {
// Type is authentication type
// Possible values: none, sasl and ssl (defaults to none).
Type KafkaAuthenticationType `yaml:"type"`

// TLSConfig is used for TLS encryption and authentication with Kafka brokers
TLSConfig promconfig.TLSConfig `yaml:"tls_config,omitempty"`

// SASLConfig is used for SASL authentication with Kafka brokers
SASLConfig KafkaSASLConfig `yaml:"sasl_config,omitempty"`
}

// KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers
type KafkaSASLConfig struct {
// SASL mechanism. Supports PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
Mechanism sarama.SASLMechanism `yaml:"mechanism"`

// SASL Username
User string `yaml:"user"`

// SASL Password for the User
Password flagext.Secret `yaml:"password"`

// UseTLS sets whether TLS is used with SASL
UseTLS bool `yaml:"use_tls"`

// TLSConfig is used for SASL over TLS. It is used only when UseTLS is true
TLSConfig promconfig.TLSConfig `yaml:",inline"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
Expand Down
69 changes: 69 additions & 0 deletions clients/pkg/promtail/targets/kafka/authentication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"crypto/x509"
"os"

promconfig "github.com/prometheus/common/config"
"github.com/xdg-go/scram"
)

func createTLSConfig(cfg promconfig.TLSConfig) (*tls.Config, error) {
tc := &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify,
ServerName: cfg.ServerName,
}
// load ca cert
if len(cfg.CAFile) > 0 {
caCert, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tc.RootCAs = caCertPool
}
// load client cert
if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
tc.Certificates = []tls.Certificate{cert}
}
return tc, nil
}

// copied from https://github.com/Shopify/sarama/blob/44627b731c60bb90efe25573e7ef2b3f8df3fa23/examples/sasl_scram_client/scram_client.go
var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)

// XDGSCRAMClient implements sarama.SCRAMClient
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
72 changes: 72 additions & 0 deletions clients/pkg/promtail/targets/kafka/target_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewSyncer(
default:
return nil, fmt.Errorf("unrecognized consumer group partition assignor: %s", cfg.KafkaConfig.Assignor)
}
config, err = withAuthentication(*config, cfg.KafkaConfig.Authentication)
if err != nil {
return nil, fmt.Errorf("error setting up kafka authentication: %w", err)
}
client, err := sarama.NewClient(cfg.KafkaConfig.Brokers, config)
if err != nil {
return nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -113,6 +117,74 @@ func NewSyncer(
return t, nil
}

func withAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
if len(authCfg.Type) == 0 || authCfg.Type == scrapeconfig.KafkaAuthenticationTypeNone {
return &cfg, nil
}

switch authCfg.Type {
case scrapeconfig.KafkaAuthenticationTypeSSL:
return withSSLAuthentication(cfg, authCfg)
case scrapeconfig.KafkaAuthenticationTypeSASL:
return withSASLAuthentication(cfg, authCfg)
default:
return nil, fmt.Errorf("unsupported authentication type %s", authCfg.Type)
}
}

func withSSLAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
cfg.Net.TLS.Enable = true
tc, err := createTLSConfig(authCfg.TLSConfig)
if err != nil {
return nil, err
}
cfg.Net.TLS.Config = tc
return &cfg, nil
}

func withSASLAuthentication(cfg sarama.Config, authCfg scrapeconfig.KafkaAuthentication) (*sarama.Config, error) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = authCfg.SASLConfig.User
cfg.Net.SASL.Password = authCfg.SASLConfig.Password.Value
cfg.Net.SASL.Mechanism = authCfg.SASLConfig.Mechanism
if cfg.Net.SASL.Mechanism == "" {
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
}

supportedMechanism := []string{
sarama.SASLTypeSCRAMSHA512,
sarama.SASLTypeSCRAMSHA256,
sarama.SASLTypePlaintext,
}
if !util.StringSliceContains(supportedMechanism, string(authCfg.SASLConfig.Mechanism)) {
return nil, fmt.Errorf("error unsupported sasl mechanism: %s", authCfg.SASLConfig.Mechanism)
}

if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA512,
}
}
}
if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA256,
}
}
}
if authCfg.SASLConfig.UseTLS {
tc, err := createTLSConfig(authCfg.SASLConfig.TLSConfig)
if err != nil {
return nil, err
}
cfg.Net.TLS.Config = tc
cfg.Net.TLS.Enable = true
}
return &cfg, nil
}

func (ts *TargetSyncer) loop() {
topicChanged := make(chan []string)
ts.wg.Add(2)
Expand Down
Loading

0 comments on commit 1557dab

Please sign in to comment.