Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kafka SASL OAUTH token refreshing #2834

Merged
merged 13 commits into from
Sep 26, 2024
51 changes: 35 additions & 16 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ default-retention-policy = ""
# Password for basic user authorization when using meta API. meta-username must also be set.
# meta-password = "kapapass"

# Shared secret for JWT bearer token authentication when using meta API.
# Shared secret for JWT bearer token authentication when using meta API.
# If this is set, then the `meta-username` and `meta-password` settings are ignored.
# This should match the `[meta] internal-shared-secret` setting on the meta nodes.
# meta-internal-shared-secret = "MyVoiceIsMyPassport"
Expand Down Expand Up @@ -573,27 +573,46 @@ default-retention-policy = ""
# Use SSL but skip chain & host verification
insecure-skip-verify = false
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
# sasl-username = "kafka"
# sasl-password = "secret"
## Arbitrary key value string pairs to pass as a TOML table. For example:
# {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl-extensions = {}
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
# sasl_mechanism = ""
# sasl-mechanism = ""
## used if sasl_mechanism is GSSAPI
# sasl_gssapi_service_name = ""
# sasl-gssapi-service-name = ""
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
# sasl_gssapi_kerberos_config_path = "/"
# sasl_gssapi_realm = "realm"
# sasl_gssapi_key_tab_path = ""
# sasl_gssapi_disable_pafxfast = false
## Access token used if sasl_mechanism is OAUTHBEARER
# sasl_access_token = ""
## Arbitrary key value string pairs to pass as a TOML table. For example:
# {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl_extensions = {}
# sasl-gssapi-auth-type = "KRB5_USER_AUTH"
# sasl-gssapi-kerberos-config-path = "/"
# sasl-gssapi-realm = "realm"
# sasl-gssapi-key-tab-path = ""
# sasl-gssapi-disable-pafxfast = false
## Options if sasl-mechanism is OAUTHBEARER
## The service name to use when authenticating with SASL/OAUTH.
# ## One of: "" or custom, auth0, azuread
# sasl-oauth-service = ""
## The client ID to use when authenticating with SASL/OAUTH.
# sasl-oauth-client-id = ""
## The client secret to use when authenticating with SASL/OAUTH.
# sasl-oauth-client-secret = ""
## The token URL to use when sasl-oauth-service is custom or auth0. Leave empty otherwise.
# sasl-oauth-token-url = ""
## The margin for the token's expiration time.
# sasl-oauth-token-expiry-margin = "10s"
## Optional scopes to use when authenticating with SASL/OAUTH.
# sasl-oauth-scopes = ""
## Tenant ID for the AzureAD service.
# sasl-oauth-tenant-id = ""
## The optional params for SASL/OAUTH. e.g. audience for AUTH0
[kafka.sasl-oauth-parameters]
# audience = ""
## Static OAUTH token. Use this instead of other OAUTH params.
# sasl-access-token = ""
## SASL protocol version. When connecting to Azure EventHub set to 0.
# sasl_version = 1
# sasl-version = 1

[alerta]
# Configure Alerta.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
cloud.google.com/go/bigquery v1.50.0 // indirect
cloud.google.com/go/bigtable v1.10.1 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/longrunning v0.4.1 // indirect
collectd.org v0.3.0 // indirect
Expand Down Expand Up @@ -242,7 +242,7 @@ require (
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvu
cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IKd5/kvShxE=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/datacatalog v1.13.0 h1:4H5IJiyUE0X6ShQBqgFFZvGGcrwGVndTwUSLP4c52gw=
cloud.google.com/go/datacatalog v1.13.0/go.mod h1:E4Rj9a5ZtAxcQJlEBTLgMTphfP11/lNaAshpoBgemX8=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
Expand Down Expand Up @@ -1623,6 +1625,8 @@ golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
34 changes: 23 additions & 11 deletions services/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

const (
DefaultTimeout = 10 * time.Second
DefaultBatchSize = 100
DefaultBatchTimeout = 1 * time.Second
DefaultID = "default"
DefaultTimeout = 10 * time.Second
DefaultBatchSize = 100
DefaultBatchTimeout = 1 * time.Second
DefaultID = "default"
DefaultSASLOAUTHExpiryMargin = 10 * time.Second
)

type Config struct {
Expand Down Expand Up @@ -49,7 +50,7 @@ type Config struct {
}

func NewConfig() Config {
return Config{ID: DefaultID}
return Config{ID: DefaultID, SASLAuth: SASLAuth{SASLOAUTHExpiryMargin: DefaultSASLOAUTHExpiryMargin}}
}

func (c Config) Validate() error {
Expand All @@ -63,7 +64,7 @@ func (c Config) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("no brokers specified, must provide at least one broker URL")
}
return nil
return c.SASLAuth.Validate()
}

func (c *Config) ApplyConditionalDefaults() {
Expand All @@ -78,17 +79,27 @@ func (c *Config) ApplyConditionalDefaults() {
}
}

type Closer interface {
Close()
}

type WriterConfig struct {
// additional resource to close
Closer Closer
Config *kafka.Config
}

type WriteTarget struct {
Topic string
PartitionById bool
PartitionAlgorithm string
}

func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.Config, error) {
func (c Config) writerConfig(target WriteTarget) (*WriterConfig, error) {
cfg := kafka.NewConfig()

if target.Topic == "" {
return cfg, errors.New("topic must not be empty")
return &WriterConfig{nil, cfg}, errors.New("topic must not be empty")
}
var partitioner kafka.PartitionerConstructor
if target.PartitionById {
Expand All @@ -104,7 +115,7 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.
case "fnv-1a":
partitioner = kafka.NewHashPartitioner
default:
return cfg, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm)
return &WriterConfig{nil, cfg}, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm)
}
cfg.Producer.Partitioner = partitioner
}
Expand Down Expand Up @@ -135,10 +146,11 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.
cfg.Producer.Flush.Frequency = time.Duration(c.BatchTimeout)

// SASL
if err := c.SASLAuth.SetSASLConfig(cfg); err != nil {
if o, err := c.SASLAuth.SetSASLConfig(cfg); err != nil {
return nil, err
} else {
return &WriterConfig{o, cfg}, cfg.Validate()
}
return cfg, cfg.Validate()
}

type Configs []Config
Expand Down
Loading