Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Add support for SASL auth to Kafka #14487

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,16 @@ kafka_config:
# CLI flag: -kafka.write-timeout
[write_timeout: <duration> | default = 10s]

# The SASL username for authentication to Kafka using the PLAIN mechanism.
# Both username and password must be set.
# CLI flag: -kafka.sasl-username
[sasl_username: <string> | default = ""]

# The SASL password for authentication to Kafka using the PLAIN mechanism.
# Both username and password must be set.
# CLI flag: -kafka.sasl-password
[sasl_password: <string> | default = ""]

# The consumer group used by the consumer to track the last consumed offset.
# The consumer group must be different for each ingester. If the configured
# consumer group contains the '<partition>' placeholder, it is replaced with
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
Expand Down Expand Up @@ -234,11 +235,11 @@ func New(

var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
kafkaClient, err := kafka_client.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
if err != nil {
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/logger.go → pkg/kafka/client/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package kafka
package client

import (
"github.com/go-kit/log"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
// SPDX-License-Identifier: AGPL-3.0-only

package kafka
package client

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"
)

// NewReaderClient returns the kgo.Client that should be used by the Reader.
func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
const fetchMaxBytes = 100_000_000

opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...)
Expand All @@ -33,7 +39,7 @@ func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger,
return nil, errors.Wrap(err, "creating kafka client")
}
if kafkaCfg.AutoCreateTopicEnabled {
kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
}
return client, nil
}
Expand All @@ -44,3 +50,29 @@ func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.
// Do not export the client ID, because we use it to specify options to the backend.
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
}

// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
// This is best-effort, if setting the option fails, error is logged, but not returned.
func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.Client, logger log.Logger) {
if cfg.AutoCreateTopicDefaultPartitions <= 0 {
return
}

// Note: this client doesn't get closed because it is owned by the caller
adm := kadm.NewClient(cl)

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
_, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
{
Op: kadm.SetConfig,
Name: "num.partitions",
Value: &defaultNumberOfPartitions,
},
})
if err != nil {
level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
return
}

level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
}
104 changes: 104 additions & 0 deletions pkg/kafka/client/reader_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package client

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

func TestNewReaderClient(t *testing.T) {
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))

tests := []struct {
name string
config kafka.Config
wantErr bool
}{
{
name: "valid config",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
},
wantErr: true,
},
{
name: "wrong username",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "wrong wrong wrong",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewReaderClient(tt.config, nil, nil)
require.NoError(t, err)

err = client.Ping(context.Background())
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
require.NoError(t, err)
t.Cleanup(cluster.Close)

addrs := cluster.ListenAddrs()
require.Len(t, addrs, 1)

cfg := kafka.Config{
Address: addrs[0],
AutoCreateTopicDefaultPartitions: 100,
}

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
r := request.(*kmsg.AlterConfigsRequest)

require.Len(t, r.Resources, 1)
res := r.Resources[0]
require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
require.Len(t, res.Configs, 1)
cfg := res.Configs[0]
require.Equal(t, "num.partitions", cfg.Name)
require.NotNil(t, *cfg.Value)
require.Equal(t, "100", *cfg.Value)

return &kmsg.AlterConfigsResponse{}, nil, true
})

client, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, log.NewNopLogger())...)
require.NoError(t, err)

setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, client, log.NewNopLogger())
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package client

import (
"context"
Expand All @@ -13,20 +13,30 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/plugin/kotel"
"github.com/twmb/franz-go/plugin/kprom"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/util/constants"
)

var (
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
// You can think about this overhead as an extra time for requests sitting in the client's buffer
// before being sent on the wire and the actual time it takes to send it over the network and
// start being processed by Kafka.
writerRequestTimeoutOverhead = 2 * time.Second
)

// NewWriterClient returns the kgo.Client that should be used by the Writer.
//
// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics
// registered don't have a prefix).
func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
// Do not export the client ID, because we use it to specify options to the backend.
metrics := kprom.NewMetrics(
"", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
Expand All @@ -42,7 +52,7 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
kgo.RecordPartitioner(kgo.ManualPartitioner()),

// Set the upper bounds the size of a record batch.
kgo.ProducerBatchMaxBytes(producerBatchMaxBytes),
kgo.ProducerBatchMaxBytes(kafka.ProducerBatchMaxBytes),

// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
Expand Down Expand Up @@ -81,10 +91,14 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited".
kgo.MaxBufferedBytes(0),
)
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
if kafkaCfg.AutoCreateTopicEnabled {
kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
}
return kgo.NewClient(opts...)
return client, nil
}

type onlySampledTraces struct {
Expand All @@ -99,7 +113,7 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM
o.TextMapPropagator.Inject(ctx, carrier)
}

func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.ClientID(cfg.ClientID),
kgo.SeedBrokers(cfg.Address),
Expand Down Expand Up @@ -139,6 +153,16 @@ func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Log
}),
}

// SASL plain auth.
if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
return plain.Auth{
User: cfg.SASLUsername,
Pass: cfg.SASLPassword.String(),
}, nil
})))
}

if cfg.AutoCreateTopicEnabled {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/kafka/client/writer_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package client

import (
"context"
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

func TestNewWriterClient(t *testing.T) {
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))

tests := []struct {
name string
config kafka.Config
wantErr bool
}{
{
name: "valid config",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
},
wantErr: true,
},
{
name: "wrong username",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "wrong wrong wrong",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewWriterClient(tt.config, 10, nil, nil)
require.NoError(t, err)

err = client.Ping(context.Background())
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading
Loading