From 8d9fdd2e6626828b770df56cbcc294c1ac15a20b Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 15 Oct 2024 11:34:58 +0100 Subject: [PATCH 1/4] feat(kafka): Add support for SASL auth to Kafka --- pkg/distributor/distributor.go | 5 +- pkg/kafka/{ => client}/logger.go | 2 +- pkg/kafka/{ => client}/reader_client.go | 38 ++++++++- pkg/kafka/client/reader_client_test.go | 104 ++++++++++++++++++++++++ pkg/kafka/{ => client}/writer_client.go | 36 ++++++-- pkg/kafka/client/writer_client_test.go | 71 ++++++++++++++++ pkg/kafka/config.go | 73 +++++------------ pkg/kafka/config_test.go | 54 ++++++------ pkg/kafka/partition/committer_test.go | 4 +- pkg/kafka/partition/reader.go | 5 +- pkg/kafka/partition/reader_test.go | 9 +- pkg/kafka/testkafka/cluster.go | 16 +++- vendor/modules.txt | 1 + 13 files changed, 314 insertions(+), 104 deletions(-) rename pkg/kafka/{ => client}/logger.go (98%) rename pkg/kafka/{ => client}/reader_client.go (52%) create mode 100644 pkg/kafka/client/reader_client_test.go rename pkg/kafka/{ => client}/writer_client.go (90%) create mode 100644 pkg/kafka/client/writer_client_test.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 30383bfcbbbd..6f69f0e02a84 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -47,6 +47,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/kafka" + kafka_client "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" @@ -234,11 +235,11 @@ func New( var kafkaWriter KafkaProducer if cfg.KafkaEnabled { - kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) + kafkaClient, err := kafka_client.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) if err != nil { return nil, fmt.Errorf("failed to start kafka client: %w", err) } - kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, + kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, prometheus.WrapRegistererWithPrefix("_kafka_", registerer)) } diff --git a/pkg/kafka/logger.go b/pkg/kafka/client/logger.go similarity index 98% rename from pkg/kafka/logger.go rename to pkg/kafka/client/logger.go index e055094a4163..3be96839e120 100644 --- a/pkg/kafka/logger.go +++ b/pkg/kafka/client/logger.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package kafka +package client import ( "github.com/go-kit/log" diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/client/reader_client.go similarity index 52% rename from pkg/kafka/reader_client.go rename to pkg/kafka/client/reader_client.go index 9237686fee60..9cf333ec7272 100644 --- a/pkg/kafka/reader_client.go +++ b/pkg/kafka/client/reader_client.go @@ -1,19 +1,25 @@ // SPDX-License-Identifier: AGPL-3.0-only -package kafka +package client import ( + "context" + "fmt" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kprom" + + "github.com/grafana/loki/v3/pkg/kafka" ) // NewReaderClient returns the kgo.Client that should be used by the Reader. -func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { +func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { const fetchMaxBytes = 100_000_000 opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...) @@ -33,7 +39,7 @@ func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, return nil, errors.Wrap(err, "creating kafka client") } if kafkaCfg.AutoCreateTopicEnabled { - kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger) } return client, nil } @@ -44,3 +50,29 @@ func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom. // Do not export the client ID, because we use it to specify options to the backend. kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) } + +// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. +// This is best-effort, if setting the option fails, error is logged, but not returned. +func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.Client, logger log.Logger) { + if cfg.AutoCreateTopicDefaultPartitions <= 0 { + return + } + + adm := kadm.NewClient(cl) + defer adm.Close() + + defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) + _, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ + { + Op: kadm.SetConfig, + Name: "num.partitions", + Value: &defaultNumberOfPartitions, + }, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) + return + } + + level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) +} diff --git a/pkg/kafka/client/reader_client_test.go b/pkg/kafka/client/reader_client_test.go new file mode 100644 index 000000000000..90980ad0e912 --- /dev/null +++ b/pkg/kafka/client/reader_client_test.go @@ -0,0 +1,104 @@ +package client + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +func TestNewReaderClient(t *testing.T) { + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password")) + + tests := []struct { + name string + config kafka.Config + wantErr bool + }{ + { + name: "valid config", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: false, + }, + { + name: "wrong password", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("wrong wrong wrong"), + }, + wantErr: true, + }, + { + name: "wrong username", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "wrong wrong wrong", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewReaderClient(tt.config, nil, nil) + require.NoError(t, err) + + err = client.Ping(context.Background()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { + cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) + require.NoError(t, err) + t.Cleanup(cluster.Close) + + addrs := cluster.ListenAddrs() + require.Len(t, addrs, 1) + + cfg := kafka.Config{ + Address: addrs[0], + AutoCreateTopicDefaultPartitions: 100, + } + + cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { + r := request.(*kmsg.AlterConfigsRequest) + + require.Len(t, r.Resources, 1) + res := r.Resources[0] + require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) + require.Len(t, res.Configs, 1) + cfg := res.Configs[0] + require.Equal(t, "num.partitions", cfg.Name) + require.NotNil(t, *cfg.Value) + require.Equal(t, "100", *cfg.Value) + + return &kmsg.AlterConfigsResponse{}, nil, true + }) + + client, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, log.NewNopLogger())...) + require.NoError(t, err) + + setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, client, log.NewNopLogger()) +} diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/client/writer_client.go similarity index 90% rename from pkg/kafka/writer_client.go rename to pkg/kafka/client/writer_client.go index 59fefda31d19..1493e17f5168 100644 --- a/pkg/kafka/writer_client.go +++ b/pkg/kafka/client/writer_client.go @@ -1,4 +1,4 @@ -package kafka +package client import ( "context" @@ -13,20 +13,30 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/plugin/kotel" "github.com/twmb/franz-go/plugin/kprom" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/util/constants" ) +var ( + // writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. + // You can think about this overhead as an extra time for requests sitting in the client's buffer + // before being sent on the wire and the actual time it takes to send it over the network and + // start being processed by Kafka. + writerRequestTimeoutOverhead = 2 * time.Second +) + // NewWriterClient returns the kgo.Client that should be used by the Writer. // // The input prometheus.Registerer must be wrapped with a prefix (the names of metrics // registered don't have a prefix). -func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { +func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { // Do not export the client ID, because we use it to specify options to the backend. metrics := kprom.NewMetrics( "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. @@ -42,7 +52,7 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.RecordPartitioner(kgo.ManualPartitioner()), // Set the upper bounds the size of a record batch. - kgo.ProducerBatchMaxBytes(producerBatchMaxBytes), + kgo.ProducerBatchMaxBytes(kafka.ProducerBatchMaxBytes), // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher @@ -81,10 +91,14 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". kgo.MaxBufferedBytes(0), ) + client, err := kgo.NewClient(opts...) + if err != nil { + return nil, err + } if kafkaCfg.AutoCreateTopicEnabled { - kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger) } - return kgo.NewClient(opts...) + return client, nil } type onlySampledTraces struct { @@ -99,7 +113,7 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM o.TextMapPropagator.Inject(ctx, carrier) } -func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { +func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { opts := []kgo.Opt{ kgo.ClientID(cfg.ClientID), kgo.SeedBrokers(cfg.Address), @@ -139,6 +153,16 @@ func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Log }), } + // SASL plain auth. + if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" { + opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) { + return plain.Auth{ + User: cfg.SASLUsername, + Pass: cfg.SASLPassword.String(), + }, nil + }))) + } + if cfg.AutoCreateTopicEnabled { opts = append(opts, kgo.AllowAutoTopicCreation()) } diff --git a/pkg/kafka/client/writer_client_test.go b/pkg/kafka/client/writer_client_test.go new file mode 100644 index 000000000000..4feb782ffe63 --- /dev/null +++ b/pkg/kafka/client/writer_client_test.go @@ -0,0 +1,71 @@ +package client + +import ( + "context" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +func TestNewWriterClient(t *testing.T) { + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password")) + + tests := []struct { + name string + config kafka.Config + wantErr bool + }{ + { + name: "valid config", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: false, + }, + { + name: "wrong password", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("wrong wrong wrong"), + }, + wantErr: true, + }, + { + name: "wrong username", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "wrong wrong wrong", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewWriterClient(tt.config, 10, nil, nil) + require.NoError(t, err) + + err = client.Ping(context.Background()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 13cfb618cfdb..09008bec9341 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -1,7 +1,6 @@ package kafka import ( - "context" "errors" "flag" "fmt" @@ -9,10 +8,7 @@ import ( "strings" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kgo" + "github.com/grafana/dskit/flagext" ) const ( @@ -21,29 +17,24 @@ const ( consumeFromEnd = "end" consumeFromTimestamp = "timestamp" - // writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. - // You can think about this overhead as an extra time for requests sitting in the client's buffer - // before being sent on the wire and the actual time it takes to send it over the network and - // start being processed by Kafka. - writerRequestTimeoutOverhead = 2 * time.Second - - // producerBatchMaxBytes is the max allowed size of a batch of Kafka records. - producerBatchMaxBytes = 16_000_000 + // ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records. + ProducerBatchMaxBytes = 16_000_000 // maxProducerRecordDataBytesLimit is the max allowed size of a single record data. Given we have a limit - // on the max batch size (producerBatchMaxBytes), a Kafka record data can't be bigger than the batch size + // on the max batch size (ProducerBatchMaxBytes), a Kafka record data can't be bigger than the batch size // minus some overhead required to serialise the batch and the record itself. We use 16KB as such overhead // in the worst case scenario, which is expected to be way above the actual one. - maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384 + maxProducerRecordDataBytesLimit = ProducerBatchMaxBytes - 16384 minProducerRecordDataBytesLimit = 1024 * 1024 ) var ( - ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") - ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") - ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") - ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") - ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) + ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") + ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") + ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") + ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") + ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set") + ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) ) // Config holds the generic config for the Kafka backend. @@ -54,6 +45,9 @@ type Config struct { DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` + SASLUsername string `yaml:"sasl_username"` + SASLPassword flagext.Secret `yaml:"sasl_password"` + ConsumerGroup string `yaml:"consumer_group"` ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` @@ -80,6 +74,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.") f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.") + f.StringVar(&cfg.SASLUsername, prefix+".sasl-username", "", "The SASL username for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.") + f.Var(&cfg.SASLPassword, prefix+".sasl-password", "The SASL password for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.") + f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.") f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.") @@ -113,6 +110,10 @@ func (cfg *Config) Validate() error { return ErrInvalidMaxConsumerLagAtStartup } + if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") { + return ErrInconsistentSASLUsernameAndPassword + } + return nil } @@ -124,35 +125,3 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string return strings.ReplaceAll(cfg.ConsumerGroup, "", strconv.Itoa(int(partitionID))) } - -// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. -// This is best-effort, if setting the option fails, error is logged, but not returned. -func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) { - if cfg.AutoCreateTopicDefaultPartitions <= 0 { - return - } - - cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...) - if err != nil { - level.Error(logger).Log("msg", "failed to create kafka client", "err", err) - return - } - - adm := kadm.NewClient(cl) - defer adm.Close() - - defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) - _, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ - { - Op: kadm.SetConfig, - Name: "num.partitions", - Value: &defaultNumberOfPartitions, - }, - }) - if err != nil { - level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) - return - } - - level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) -} diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go index 7c21e38fd141..87c456f42adc 100644 --- a/pkg/kafka/config_test.go +++ b/pkg/kafka/config_test.go @@ -3,39 +3,37 @@ package kafka import ( "testing" - "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/require" - "github.com/twmb/franz-go/pkg/kfake" - "github.com/twmb/franz-go/pkg/kmsg" ) -func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { - cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) - require.NoError(t, err) - t.Cleanup(cluster.Close) - - addrs := cluster.ListenAddrs() - require.Len(t, addrs, 1) - +func TestBothSASLParamsMustBeSet(t *testing.T) { cfg := Config{ - Address: addrs[0], - AutoCreateTopicDefaultPartitions: 100, + // Other required params + Address: "abcd", + Topic: "abcd", + ProducerMaxRecordSizeBytes: 1048576, } - cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { - r := request.(*kmsg.AlterConfigsRequest) - - require.Len(t, r.Resources, 1) - res := r.Resources[0] - require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) - require.Len(t, res.Configs, 1) - cfg := res.Configs[0] - require.Equal(t, "num.partitions", cfg.Name) - require.NotNil(t, *cfg.Value) - require.Equal(t, "100", *cfg.Value) - - return &kmsg.AlterConfigsResponse{}, nil, true - }) + // No SASL params is valid + err := cfg.Validate() + require.NoError(t, err) - cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger()) + // Just username is invalid + cfg.SASLUsername = "abcd" + cfg.SASLPassword = flagext.Secret{} + err = cfg.Validate() + require.Error(t, err) + + // Just password is invalid + cfg.SASLUsername = "" + cfg.SASLPassword = flagext.SecretWithValue("abcd") + err = cfg.Validate() + require.Error(t, err) + + // Both username and password is valid + cfg.SASLUsername = "abcd" + cfg.SASLPassword = flagext.SecretWithValue("abcd") + err = cfg.Validate() + require.NoError(t, err) } diff --git a/pkg/kafka/partition/committer_test.go b/pkg/kafka/partition/committer_test.go index 9ef02f910e5d..1739986cd66c 100644 --- a/pkg/kafka/partition/committer_test.go +++ b/pkg/kafka/partition/committer_test.go @@ -14,7 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/testkafka" ) @@ -24,7 +24,7 @@ func TestPartitionCommitter(t *testing.T) { topicName := "test-topic" _, kafkaCfg := testkafka.CreateCluster(t, numPartitions, topicName) - client, err := kafka.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger()) + client, err := client.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger()) require.NoError(t, err) // Create a Kafka admin client diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 74f18b02057f..a6038962222c 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -22,6 +22,7 @@ import ( "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" ) var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded") @@ -94,7 +95,7 @@ func NewReader( // This method is called when the PartitionReader service starts. func (p *Reader) start(ctx context.Context) error { var err error - p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) + p.client, err = client.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } @@ -535,7 +536,7 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics { return readerMetrics{ receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), - kprom: kafka.NewReaderClientMetrics("partition-reader", reg), + kprom: client.NewReaderClientMetrics("partition-reader", reg), fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds", Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index 8d548c831241..38a929285b2c 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -17,6 +17,7 @@ import ( "github.com/twmb/franz-go/pkg/kgo" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/testkafka" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -67,7 +68,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), partitionReader) @@ -121,7 +122,7 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) { partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) stream := logproto.Stream{ @@ -175,11 +176,11 @@ func TestPartitionReader_ProcessCommits(t *testing.T) { partitionID := int32(0) partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) // Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test. - partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(), + partitionReader.client, err = client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(), kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()}, }), diff --git a/pkg/kafka/testkafka/cluster.go b/pkg/kafka/testkafka/cluster.go index cc5847c2bfd3..c70e3da4a71c 100644 --- a/pkg/kafka/testkafka/cluster.go +++ b/pkg/kafka/testkafka/cluster.go @@ -16,8 +16,8 @@ import ( ) // CreateCluster returns a fake Kafka cluster for unit testing. -func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, kafka.Config) { - cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName) +func CreateCluster(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, kafka.Config) { + cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName, opts...) addSupportForConsumerGroups(t, cluster, topicName, numPartitions) return cluster, createTestKafkaConfig(addr, topicName) @@ -34,8 +34,16 @@ func createTestKafkaConfig(clusterAddr, topicName string) kafka.Config { return cfg } -func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, string) { - cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName)) +func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, string) { + cfg := []kfake.Opt{ + kfake.NumBrokers(1), + kfake.SeedTopics(numPartitions, topicName), + } + + // Apply options. + cfg = append(cfg, opts...) + + cluster, err := kfake.NewCluster(cfg...) require.NoError(t, err) t.Cleanup(cluster.Close) diff --git a/vendor/modules.txt b/vendor/modules.txt index 49e7bb611899..8e8e074487f9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1602,6 +1602,7 @@ github.com/twmb/franz-go/pkg/kgo github.com/twmb/franz-go/pkg/kgo/internal/sticky github.com/twmb/franz-go/pkg/kversion github.com/twmb/franz-go/pkg/sasl +github.com/twmb/franz-go/pkg/sasl/plain # github.com/twmb/franz-go/pkg/kadm v1.13.0 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kadm From cce73771bce422e3c42d778bd33e9cb86adb39c6 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 15 Oct 2024 12:58:45 +0100 Subject: [PATCH 2/4] docs --- docs/sources/shared/configuration.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 132de42b8107..a161232ab930 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -793,6 +793,16 @@ kafka_config: # CLI flag: -kafka.write-timeout [write_timeout: | default = 10s] + # The SASL username for authentication to Kafka using the PLAIN mechanism. + # Both username and password must be set. + # CLI flag: -kafka.sasl-username + [sasl_username: | default = ""] + + # The SASL password for authentication to Kafka using the PLAIN mechanism. + # Both username and password must be set. + # CLI flag: -kafka.sasl-password + [sasl_password: | default = ""] + # The consumer group used by the consumer to track the last consumed offset. # The consumer group must be different for each ingester. If the configured # consumer group contains the '' placeholder, it is replaced with From 1fc62ffaef82452de75fa49f345bd69255f2dc7e Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 15 Oct 2024 13:23:20 +0100 Subject: [PATCH 3/4] vendor --- .../twmb/franz-go/pkg/sasl/plain/plain.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go diff --git a/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go new file mode 100644 index 000000000000..97a9369d1372 --- /dev/null +++ b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go @@ -0,0 +1,60 @@ +// Package plain provides PLAIN sasl authentication as specified in RFC4616. +package plain + +import ( + "context" + "errors" + + "github.com/twmb/franz-go/pkg/sasl" +) + +// Auth contains information for authentication. +type Auth struct { + // Zid is an optional authorization ID to use in authenticating. + Zid string + + // User is username to use for authentication. + User string + + // Pass is the password to use for authentication. + Pass string + + _ struct{} // require explicit field initialization +} + +// AsMechanism returns a sasl mechanism that will use 'a' as credentials for +// all sasl sessions. +// +// This is a shortcut for using the Plain function and is useful when you do +// not need to live-rotate credentials. +func (a Auth) AsMechanism() sasl.Mechanism { + return Plain(func(context.Context) (Auth, error) { + return a, nil + }) +} + +// Plain returns a sasl mechanism that will call authFn whenever sasl +// authentication is needed. The returned Auth is used for a single session. +func Plain(authFn func(context.Context) (Auth, error)) sasl.Mechanism { + return plain(authFn) +} + +type plain func(context.Context) (Auth, error) + +func (plain) Name() string { return "PLAIN" } +func (fn plain) Authenticate(ctx context.Context, _ string) (sasl.Session, []byte, error) { + auth, err := fn(ctx) + if err != nil { + return nil, nil, err + } + if auth.User == "" || auth.Pass == "" { + return nil, nil, errors.New("PLAIN user and pass must be non-empty") + } + return session{}, []byte(auth.Zid + "\x00" + auth.User + "\x00" + auth.Pass), nil +} + +type session struct{} + +func (session) Challenge([]byte) (bool, []byte, error) { + return true, nil, nil +} From 83bc405c8512ba0908b4d30b14a81d484ea34ae9 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 15 Oct 2024 16:32:09 +0100 Subject: [PATCH 4/4] fix tests --- pkg/kafka/client/reader_client.go | 2 +- pkg/kafka/partition/reader_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/kafka/client/reader_client.go b/pkg/kafka/client/reader_client.go index 9cf333ec7272..e8bbb2da8c86 100644 --- a/pkg/kafka/client/reader_client.go +++ b/pkg/kafka/client/reader_client.go @@ -58,8 +58,8 @@ func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo. return } + // Note: this client doesn't get closed because it is owned by the caller adm := kadm.NewClient(cl) - defer adm.Close() defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) _, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index 38a929285b2c..dfd653de78e3 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -59,7 +59,7 @@ func (m *mockConsumer) Flush(ctx context.Context) error { } func TestPartitionReader_BasicFunctionality(t *testing.T) { - _, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic") + _, kafkaCfg := testkafka.CreateCluster(t, 1, "test") consumer := newMockConsumer() consumerFactory := func(_ Committer) (Consumer, error) { @@ -83,8 +83,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { require.NoError(t, err) require.Len(t, records, 1) - producer.ProduceSync(context.Background(), records...) - producer.ProduceSync(context.Background(), records...) + require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr()) + require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr()) // Wait for records to be processed assert.Eventually(t, func() bool {