aws_msk_iam: AWS MSK IAM full endpoints support#11274
aws_msk_iam: AWS MSK IAM full endpoints support#11274kalavt wants to merge 14 commits intofluent:masterfrom
Conversation
Implements the core AWS MSK IAM authentication mechanism including: - OAuth callback mechanism for token generation and refresh - Token lifecycle management and expiration handling - Integration with AWS credential providers - SASL/OAUTHBEARER protocol support for librdkafka This provides the foundation for AWS MSK IAM authentication support in Fluent Bit's Kafka plugins. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Enhance EC2 credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Enhance profile credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Enhance STS credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Update Kafka core functionality to support AWS MSK IAM authentication, including necessary configuration and lifecycle improvements. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Enable AWS MSK IAM authentication in the Kafka input plugin: - Add AWS MSK IAM configuration options - Integrate with OAuth callback mechanism - Support automatic credential refresh - Add TLS configuration for secure connections Signed-off-by: Arbin <arbin.cheng@coins.ph>
Enable AWS MSK IAM authentication in the Kafka output plugin: - Add AWS MSK IAM configuration options - Integrate with OAuth callback mechanism - Support automatic credential refresh - Add TLS configuration for secure connections Signed-off-by: Arbin <arbin.cheng@coins.ph>
Add NULL checks after flb_sds_create() when allocating SASL mechanism strings to prevent crashes on allocation failure. This covers both the initial SASL mechanism configuration and the AWS MSK IAM OAUTHBEARER conversion. Signed-off-by: Arbin <arbin.cheng@coins.ph>
Replace pointer comparison with offset comparison in VPC endpoint detection to improve safety and clarity. Changes 'p >= broker + 5' to 'p - broker >= 5' to properly check offset within string bounds before accessing p - 5. Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Remove is_serverless detection logic - Use actual broker hostname instead of constructed host - Fix memory leak in error cleanup path - Add broker_host field to store actual hostname - Update function signature to accept optional region parameter This aligns with official AWS MSK IAM signers behavior where the signature Host must match the TLS SNI/actual connection host. Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Add aws_region configuration field - Remove hostname pattern check for MSK IAM registration - Pass aws_region to MSK IAM registration function - Support PrivateLink and custom DNS scenarios Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Add aws_region configuration field - Remove hostname pattern check for MSK IAM registration - Pass aws_region to MSK IAM registration function - Support PrivateLink and custom DNS scenarios Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Add comprehensive MSK IAM configuration examples - Cover Standard MSK, Serverless, PrivateLink scenarios - Document aws_region parameter usage - Add troubleshooting guide and IAM permissions - Update README with detailed usage instructions Signed-off-by: Arbin <arbin.cheng@coins.ph>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRefactors AWS MSK IAM support to use broker host(s) + region (instead of cluster ARN), adds broker-region auto-detection, TLS-backed credential provider with mutex protection, fixed 5-minute OAuth token TTL and background refresh, integrates OAUTHBEARER handling in Kafka plugins, and adds example docs/configs for MSK scenarios. Changes
Sequence Diagram(s)sequenceDiagram
participant FB as Fluent Bit
participant Plugin as Kafka Plugin (in/out)
participant rdk as librdkafka
participant MSK as MSK IAM Handler
participant Prov as AWS Credential Provider
participant Broker as MSK Broker
FB->>Plugin: init (brokers[, region])
Plugin->>MSK: register OAuth callback (brokers, region)
MSK->>MSK: parse first broker_host, detect/derive region
MSK->>Prov: create TLS-enabled provider
Plugin->>rdk: set sasl.mechanism=OAUTHBEARER, enable SASL queue/background callbacks
rect rgb(220,235,255)
Plugin->>MSK: request initial token
MSK->>MSK: lock provider
MSK->>Prov: get credentials
Prov-->>MSK: creds
MSK->>MSK: build signed payload (SigV4)
MSK->>rdk: provide OAuth bearer token (5m TTL)
MSK->>MSK: unlock provider
end
rect rgb(220,255,220)
rdk->>MSK: background token refresh callback
MSK->>MSK: lock provider
MSK->>Prov: refresh credentials
Prov-->>MSK: creds
MSK->>MSK: rebuild payload
MSK->>rdk: update bearer token
MSK->>MSK: unlock provider
end
rdk->>Broker: authenticate with bearer token
Broker-->>rdk: auth success
rdk-->>Plugin: connection ready
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (5)
examples/kafka_filter/kafka_msk_iam.conf (1)
5-141: Comprehensive MSK IAM example configThe four scenarios (standard, PrivateLink/custom DNS, Serverless, VPC endpoint) correctly demonstrate
rdkafka.sasl.mechanism aws_msk_iamand whenaws_regionmust be set. The tagging andMatchusage are sensible. You might optionally add a brief note at the top suggesting users enable only the scenario they’re testing to avoid running all four pipelines at once, but functionally this example looks solid.include/fluent-bit/aws/flb_aws_msk_iam.h (1)
39-51: Header/API update matches broker/region-based MSK IAM flowThe updated
flb_aws_msk_iam_register_oauth_cbsignature and parameter docs correctly reflect the new behavior: using the Kafka opaque to hold MSK IAM context, taking brokers for region extraction, and accepting an optional explicitregion. The prototype matches the implementation inflb_aws_msk_iam.c.Since this is a public header, make sure any out-of-tree callers are updated or the change is mentioned in release notes.
plugins/out_kafka/kafka_config.c (1)
220-251: Condition on line 222-223 is redundant.Since
ctx->aws_msk_iamis only set toFLB_TRUEwhen the mechanism isaws_msk_iam(which is then translated toOAUTHBEARER), checking bothctx->aws_msk_iamandstrcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0is redundant. However, this is a defensive check and not harmful.The condition could be simplified, but the current defensive approach is acceptable:
- if (ctx->aws_msk_iam && ctx->sasl_mechanism && - strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + if (ctx->aws_msk_iam) {plugins/in_kafka/in_kafka.c (1)
368-400: Condition on line 370-371 is redundant (same as out_kafka).Same defensive but redundant check pattern as out_kafka.
src/aws/flb_aws_msk_iam.c (1)
551-589: Consider reducing lock duration for better concurrency.The current implementation holds the mutex during
refresh()andget_credentials(). While this is correct for thread safety, therefresh()call may involve network I/O to AWS metadata endpoints, potentially blocking other threads.Consider a pattern where the lock is held only during
get_credentials()after a separate refresh cycle, or using a try-refresh approach that doesn't block on network I/O:/* * CRITICAL CONCURRENCY FIX: - * Lock the credential provider to prevent race conditions. - * The librdkafka refresh callback executes in its internal thread context, - * while Fluent Bit may access the same provider from other threads. - * Without synchronization, concurrent refresh/get_credentials calls can - * corrupt provider state and cause authentication failures. + * Lock the credential provider during access. + * Note: refresh() may involve network I/O. If this becomes a bottleneck, + * consider implementing async refresh with cached credentials fallback. */ pthread_mutex_lock(&config->lock);This is acceptable for now but worth monitoring in high-concurrency scenarios.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
examples/kafka_filter/README.md(1 hunks)examples/kafka_filter/kafka_msk_iam.conf(1 hunks)include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(5 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(1 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)src/flb_kafka.c(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(634-801)
🔇 Additional comments (29)
src/aws/flb_aws_credentials_ec2.c (1)
128-139: Improves code consistency through formatting alignment.The blank line inserted after the debug log statement in
refresh_fn_ec2aligns its formatting with the similarinit_fn_ec2function, improving code consistency and readability without affecting functional behavior.src/aws/flb_aws_credentials_sts.c (2)
177-178: Formatting change - no review needed.
482-483: Formatting change - no review needed.src/flb_kafka.c (1)
96-100: Correct destructor forrd_kafka_conf_tin error pathSwitching from
flb_free(kafka_cfg)tord_kafka_conf_destroy(kafka_cfg)correctly uses librdkafka’s API and avoids undefined behavior on init failure. Allgoto errpaths occur beforekafka->brokersis assigned, so there’s no new leak.Please double-check other rd_kafka config lifecycles to ensure
rd_kafka_conf_destroyis consistently used wherever ard_kafka_conf_tis discarded without being passed tord_kafka_new().src/aws/flb_aws_credentials_profile.c (1)
664-673: Downgrade ENOENT on shared credentials file to debug-only logLowering the ENOENT case to
AWS_CREDS_DEBUGmatchesget_shared_config_credentials()behavior and avoids duplicate error-level logging when the credentials file is simply absent. The function still returns-1, so callers can treat missing credentials as a failure when appropriate.examples/kafka_filter/README.md (1)
31-132: MSK IAM README aligns with newaws_regionsemanticsThe README’s MSK IAM sections look consistent with the implementation: enabling IAM via
rdkafka.sasl.mechanism aws_msk_iam, auto-detecting region from standard broker hostnames, and requiringaws_regionfor PrivateLink/custom DNS are all clearly documented. The parameter table matches the introducedaws_regionoption.plugins/in_kafka/in_kafka.h (1)
57-61: Consistent MSK IAM fields for Kafka inputAdding
aws_msk_iamandaws_regionalongsidemsk_iamunderFLB_HAVE_AWS_MSK_IAMcleanly exposes the same knobs as the output path and keeps AWS-specific state encapsulated in the Kafka config struct.plugins/out_kafka/kafka_config.h (1)
128-132: Output Kafka MSK IAM fields align with new configuration modelThe added
aws_msk_iamflag andaws_regionpointer, alongsidemsk_iam, give the output plugin the same MSK IAM state as the input side and match the newaws_regionconfig option. This layout will integrate cleanly with the updatedflb_aws_msk_iam_register_oauth_cbcallsites.plugins/out_kafka/kafka.c (1)
673-681:aws_regionconfig option for Kafka output correctly wired into config mapThe new
aws_regionentry is properly gated onFLB_HAVE_AWS_MSK_IAM, usesFLB_CONFIG_MAP_STRwith an offset toaws_region, and its description matches the intended behavior (auto-detect for standard endpoints, required for custom DNS/PrivateLink). This aligns with the README and the updated MSK IAM registration API.Please confirm that
flb_out_kafka_create()(or equivalent init path) passesctx->aws_regiontogether with the configured brokers intoflb_aws_msk_iam_register_oauth_cbso the new option is actually honored.plugins/out_kafka/kafka_config.c (5)
61-87: LGTM! SASL mechanism handling is well-structured.The logic correctly:
- Retrieves and stores the SASL mechanism early
- Translates
aws_msk_iamtoOAUTHBEARERfor librdkafka- Sets a sensible default security protocol (
SASL_SSL)- Properly destroys and recreates
ctx->sasl_mechanismafter translation
209-218: LGTM! SASL queue enablement for background token refresh.Enabling the SASL queue for OAUTHBEARER is essential for background token refresh on idle connections. The check for
ctx->sasl_mechanismbeforestrcasecmpprevents null pointer dereference.
254-268: LGTM! Correct ownership semantics for rd_kafka_conf.Setting
ctx->conf = NULLafter successfulrd_kafka_new()correctly reflects that librdkafka now owns the configuration. The comment on failure path clarifies thatctx->confremains valid for cleanup.
270-287: LGTM! SASL background callbacks enabled after producer creation.Enabling background callbacks post-creation is the correct order. The warning (not error) on failure is appropriate since token refresh will still work via
rd_kafka_poll().
346-353: LGTM! Proper configuration cleanup in destroy path.The destroy function now correctly handles the case where
rd_kafka_new()failed andctx->confwas not transferred. This prevents a memory leak.plugins/in_kafka/in_kafka.c (5)
271-307: LGTM! SASL mechanism handling with proper error checking.Unlike out_kafka, this implementation properly checks if
flb_sds_createfails and returns early. The logic correctly translatesaws_msk_iamtoOAUTHBEARERand sets the security protocol default.
357-366: LGTM! SASL queue enabled for OAUTHBEARER.Consistent with out_kafka implementation.
406-435: LGTM! Correct ownership handling and background callback setup.Setting
kafka_conf = NULLafter successfulrd_kafka_new()correctly reflects ownership transfer. Background callbacks are properly enabled with appropriate warning on failure.
497-521: LGTM! Comprehensive error path cleanup.The error path properly handles:
- Topic partition list destruction
- Kafka handle destruction (which destroys the conf it owns)
- Manual conf destruction if
rd_kafka_new()never took ownership- MSK IAM context destruction
- Opaque and SASL mechanism cleanup
614-622: LGTM! aws_region configuration documented clearly.The config map entry clearly documents:
- Auto-detection from standard MSK endpoints
- Requirement for custom DNS/PrivateLink scenarios
src/aws/flb_aws_msk_iam.c (10)
42-55: LGTM! Well-documented token lifetime and struct additions.The 5-minute token lifetime matches AWS SDK implementations. The struct cleanly encapsulates all MSK IAM state including the mutex for thread safety.
214-268: LGTM! Clean input validation in payload generation.The function properly validates:
- Region is set and non-empty
- Host is provided
- Credentials are complete (access key and secret key)
The credential parameter change (receiving creds directly instead of creating provider internally) improves separation of concerns.
296-329: LGTM! Query string construction is correct.The canonical query string follows AWS SigV4 requirements with properly encoded parameters and optional session token handling.
432-478: LGTM! Proper Base64 URL encoding with padding removal.The conversion from standard Base64 to Base64 URL encoding (replacing
+with-and/with_) and padding removal follows the MSK IAM protocol requirements.
480-512: LGTM! Comprehensive cleanup in both success and error paths.All allocated resources are properly freed in both the success path (lines 480-493) and error path (lines 496-511). Setting
empty_payload_hex = NULLafter early destruction prevents double-free.
600-618: LGTM! Token lifetime and credential cleanup are correct.The 5-minute token lifetime constant is used correctly. Credentials are properly destroyed after use, preventing memory leaks.
633-719: LGTM! Robust broker parsing and region handling.The registration function:
- Properly extracts the first broker from a comma-separated list
- Strips the port from the hostname
- Supports both explicit region configuration and auto-detection
- Has clear error messages guiding users to set
aws_regionwhen auto-detection failsThe cleanup on error paths is thorough.
803-829: LGTM! Comprehensive destroy function.The destroy function properly cleans up all resources in reverse allocation order:
- Provider
- TLS context
- Region string
- Broker host string
- Mutex
- Context itself
137-212: Remove this concern—the edge case cannot occur in practice.The proposed scenario (broker string with both
b-prefix and.vpce.amazonaws.com) is not possible because AWS uses distinct naming conventions for different endpoint types. Standard MSK brokers (b-Xformat) do not include.vpcein the hostname, and VPC endpoints (which do include.vpce) always start withvpce-, neverb-. The function's VPC endpoint detection logic at line 180 is safe and only activates for VPC endpoint hostnames.
738-788: TLS and provider initialization order is correct.The sync() → init() → async() sequence is the standard pattern used across Fluent Bit's AWS output plugins (Elasticsearch, BigQuery, Kinesis Firehose). The sync() call sets the provider to synchronous mode before initialization, init() initializes the provider chain, and async() returns it to asynchronous operation. This matches the established pattern in the codebase.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
52b19c7 to
367efef
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/aws/flb_aws_msk_iam.c (1)
551-558: CRITICAL: OAuth token still signed with first broker hostname only.The implementation still generates tokens signed with
config->broker_host(the first broker from the comma-separated list, set once during registration at lines 669-679). This is the same issue flagged in the previous review comment.Impact: When librdkafka connects to brokers other than the first one (normal in multi-broker clusters after metadata refresh or leader changes), the SigV4
Hostheader in the OAuth token won't match the actual broker hostname, causing authentication failures.Root cause: librdkafka's OAUTHBEARER implementation uses a single global token across all broker connections, but MSK IAM requires the Host header in the SigV4 signature to match the actual broker being connected to.
This is a fundamental design limitation that cannot be fixed with the current approach of generating a single token for all brokers.
Recommended solutions:
Per-broker tokens (requires librdkafka changes): This would require librdkafka to support per-broker OAUTHBEARER tokens, which is not currently available.
Use connection interceptor: Implement a librdkafka interceptor that can modify the authentication per connection. This is complex but possible.
Document limitation: If fixing is not feasible, clearly document that MSK IAM only works reliably with single-broker configurations or when all brokers share the same hostname pattern.
Investigate librdkafka per-broker auth: Check if newer librdkafka versions support per-broker authentication callbacks.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
examples/kafka_filter/README.md(1 hunks)examples/kafka_filter/kafka_msk_iam.conf(1 hunks)include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(5 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(1 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- plugins/out_kafka/kafka.c
- examples/kafka_filter/kafka_msk_iam.conf
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/in_kafka/in_kafka.c (2)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(634-801)flb_aws_msk_iam_destroy(804-830)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (2)
flb_standard_chain_provider_create(269-327)flb_aws_provider_destroy(769-791)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(634-801)
🔇 Additional comments (24)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
47-51: LGTM! API signature updated to support broker-based region detection.The function signature change from
cluster_arntobrokersandregionparameters aligns with the PR's goal of supporting custom DNS/PrivateLink scenarios. The documentation clearly indicates region auto-detection behavior.src/aws/flb_aws_msk_iam.c (8)
42-46: LGTM! Token lifetime constant follows AWS industry standard.The 5-minute token lifetime aligns with AWS SDK implementations and is appropriate for OAuth token refresh patterns.
48-55: LGTM! Enhanced struct supports TLS-based credentials and thread safety.The addition of
cred_tls,provider, andlockfields enables proper credential lifecycle management and concurrent access protection.
561-589: LGTM! Proper mutex protection prevents credential provider race conditions.The mutex lock correctly protects the credential provider's refresh and get_credentials operations, preventing corruption from concurrent access by librdkafka's internal threads and Fluent Bit's threads.
600-607: LGTM! Fixed token lifetime with automatic refresh.The 5-minute token lifetime with librdkafka's automatic 80% refresh (4 minutes) ensures tokens never expire on idle connections.
497-510: LGTM! Comprehensive error cleanup prevents memory leaks.All allocated resources are properly cleaned up in the error path, addressing the memory leak mentioned in the PR objectives.
737-788: LGTM! Proper TLS and provider lifecycle with comprehensive error handling.The TLS instance and AWS credentials provider are correctly initialized with proper cleanup in all error paths. The mutex initialization ensures thread-safe credential access.
803-830: LGTM! Complete resource cleanup in destroy function.All resources (provider, TLS, region, broker_host, mutex) are properly destroyed, preventing memory leaks.
137-212: Region extraction logic correctly handles all documented MSK endpoint formats.The implementation properly handles Standard, Serverless, and VPC Endpoint formats with comprehensive edge case handling including port removal, format detection, and region length validation. The logic aligns with the documented endpoint format specifications.
plugins/in_kafka/in_kafka.h (1)
58-61: LGTM! Struct updated to support region-aware MSK IAM configuration.The addition of
aws_msk_iamflag andaws_regionfield, along with removal of deprecatedaws_msk_iam_cluster_arn, aligns with the new broker-based authentication approach.plugins/out_kafka/kafka_config.h (1)
129-132: LGTM! Output plugin struct updated consistently with input plugin.The struct changes match the input plugin's configuration surface for MSK IAM authentication.
plugins/out_kafka/kafka_config.c (6)
61-87: LGTM! Robust SASL mechanism handling with MSK IAM activation.The code properly:
- Retrieves and stores SASL mechanism early
- Detects
aws_msk_iamand switches to OAUTHBEARER- Ensures SASL_SSL security protocol is set
- Includes allocation failure checks for the mechanism string
209-218: LGTM! SASL queue enables background token refresh for all OAUTHBEARER methods.Enabling the SASL queue allows librdkafka to handle OAuth token refresh in a background thread, which is essential for idle connections. This benefits all OAUTHBEARER implementations, not just MSK IAM.
220-252: LGTM! Proper MSK IAM OAuth callback registration with error handling.The code correctly:
- Validates broker configuration is present before registering
- Passes broker list and region to the OAuth callback
- Handles registration failure by destroying context
- Configures
sasl.oauthbearer.configafter successful registration
255-268: LGTM! Clear ownership semantics for librdkafka configuration.The comments correctly document that
rd_kafka_new()takes ownership ofctx->confon success but not on failure. Settingctx->conf = NULLafter success prevents double-free.
270-287: LGTM! SASL background callbacks prevent token expiry on idle connections.Enabling background callbacks ensures OAuth tokens are automatically refreshed even when the connection is idle, preventing authentication failures. The error handling is appropriate (warn but continue).
350-353: LGTM! Proper cleanup when rd_kafka creation fails.If
rd_kafka_new()fails, the conf is still valid and must be destroyed manually. This prevents memory leaks in error paths.plugins/in_kafka/in_kafka.c (7)
271-307: LGTM! Consistent SASL mechanism handling with output plugin.The input plugin properly:
- Retrieves SASL mechanism early with allocation checks
- Detects and activates MSK IAM authentication
- Switches to OAUTHBEARER and ensures SASL_SSL protocol
- Includes comprehensive error handling
357-366: LGTM! SASL queue enabled for background token refresh.Consistent with the output plugin, enabling the SASL queue allows background token refresh for all OAUTHBEARER mechanisms.
368-400: LGTM! MSK IAM OAuth callback registration matches output plugin.The registration logic is consistent with the output plugin, including:
- Validation of broker configuration
- Proper error handling and cleanup
sasl.oauthbearer.configsetup
402-413: LGTM! Ownership semantics documented consistently.The comments and code correctly handle
rd_kafka_new()ownership transfer, matching the output plugin implementation.
415-435: LGTM! SASL background callbacks enabled for token refresh.Consistent with the output plugin, this ensures tokens refresh automatically on idle connections.
503-518: LGTM! Comprehensive error path cleanup.The init_error path properly:
- Destroys kafka_conf when rd_kafka wasn't created
- Cleans up MSK IAM resources
- Destroys opaque context and SASL mechanism string
614-622: LGTM! Configuration surface includes aws_region for custom DNS scenarios.The config map entry properly documents that
aws_regionis optional (auto-detected from standard MSK endpoints) but required for custom DNS/PrivateLink.
Remove service_host from struct flb_aws_msk_iam and construct it dynamically in OAuth callback. This eliminates data redundancy since service_host can be derived from region. Also clean up unused struct flb_msk_iam_cb definition and use flb_sds_len() instead of strlen() for consistency. Signed-off-by: Arbin <arbin.cheng@coins.ph>
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (2)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (3)
flb_aws_credentials_destroy(752-767)flb_standard_chain_provider_create(269-327)flb_aws_provider_destroy(769-791)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(627-767)
🔇 Additional comments (7)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
33-45: LGTM! API signature updated to support broker-based region detection.The updated signature replacing
cluster_arnwithbrokersandregionparameters aligns with the PR's goal of supporting PrivateLink aliases and custom DNS names. The documentation clearly explains the region auto-detection behavior.src/aws/flb_aws_msk_iam.c (6)
31-31: LGTM! Appropriate includes and standard token lifetime.The TLS and pthread includes support the new credential provider lifecycle and mutex protection. The 5-minute OAuth token lifetime is an industry standard and matches AWS SDK implementations.
Also applies to: 40-46
48-54: LGTM! Struct properly extended for TLS-backed credential lifecycle.The additions of
cred_tls,provider, andlockenable proper credential provider lifecycle management with thread-safe access from librdkafka's callback threads.
136-211: LGTM! Robust region extraction supporting multiple MSK endpoint formats.The function correctly handles standard MSK, MSK Serverless, and VPC Endpoint formats. The VPC endpoint special case (
.vpce.amazonaws.com) is properly handled, and the sanity check on region length (max 32 chars) prevents buffer issues.
495-510: LGTM! Comprehensive error path cleanup preventing leaks.The error path properly checks and frees all possible allocations including
credential,credential_enc,canonical,hexhash,string_to_sign,hexsig,query,action_enc,presigned_url,key,payload,session_token_enc, andempty_payload_hex. The defensive NULL assignments after strategicflb_sds_destroycalls (lines 351, 399) prevent double-free issues. This addresses the memory leak fix mentioned in the PR objectives.
626-767: LGTM! Excellent error handling with complete cleanup on all failure paths.The registration function properly validates inputs, extracts region (user-provided or auto-detected), and initializes the TLS-backed credential provider with mutex protection. Every error path correctly cleans up all previously allocated resources in reverse order:
- Lines 668-669, 680-683: Cleanup after region extraction failures
- Lines 698-699: Cleanup after context allocation failure
- Lines 714-716: Cleanup after TLS creation failure
- Lines 728-731: Cleanup after provider creation failure
- Lines 738-742: Cleanup after provider initialization failure
- Lines 749-753: Cleanup after mutex initialization failure
This addresses the memory leak fixes mentioned in the PR objectives.
769-792: LGTM! Proper cleanup order with appropriate NULL checks.The destroy function correctly cleans up resources in reverse order: provider, TLS context, region string, mutex, and finally the context itself. The NULL checks for
provider,cred_tls, andregionhandle partial initialization scenarios gracefully. The mutex is always initialized if the context exists (registration only returns non-NULL after successful mutex init), makingpthread_mutex_destroysafe to call.
| /* | ||
| * Construct service-level hostname for signing (kafka.{region}.amazonaws.com). | ||
| * This approach solves the multi-broker authentication issue since librdkafka's | ||
| * OAuth callback doesn't provide per-broker context. Using a consistent service | ||
| * hostname works for all brokers and supports PrivateLink/Custom DNS scenarios. | ||
| */ | ||
| snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); | ||
|
|
||
| flb_debug("[aws_msk_iam] OAuth token refresh callback triggered for host: %s", host); | ||
|
|
||
| /* | ||
| * CRITICAL CONCURRENCY FIX: | ||
| * Lock the credential provider to prevent race conditions. | ||
| * The librdkafka refresh callback executes in its internal thread context, | ||
| * while Fluent Bit may access the same provider from other threads. | ||
| * Without synchronization, concurrent refresh/get_credentials calls can | ||
| * corrupt provider state and cause authentication failures. | ||
| */ | ||
| pthread_mutex_lock(&config->lock); | ||
|
|
||
| /* Refresh credentials */ | ||
| if (config->provider->provider_vtable->refresh(config->provider) < 0) { | ||
| pthread_mutex_unlock(&config->lock); | ||
| flb_warn("[aws_msk_iam] credential refresh failed, will retry on next callback"); | ||
| rd_kafka_oauthbearer_set_token_failure(rk, "credential refresh failed"); | ||
| return; | ||
| } | ||
| else { | ||
| snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); | ||
| flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); | ||
|
|
||
| /* Get credentials */ | ||
| creds = config->provider->provider_vtable->get_credentials(config->provider); | ||
| if (!creds) { | ||
| pthread_mutex_unlock(&config->lock); | ||
| flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); | ||
| rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); | ||
| return; | ||
| } | ||
|
|
||
| flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); | ||
| /* Unlock immediately after getting credentials - no need to hold lock during payload generation */ | ||
| pthread_mutex_unlock(&config->lock); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
AWS MSK IAM authentication SigV4 signing hostname requirements broker endpoint
💡 Result:
Short answer — when using MSK IAM (SigV4) the signed hostname must be the actual broker DNS name (the broker FQDN returned by GetBootstrapBrokers / list-nodes), and that same name must match the broker TLS certificate CN/SAN. You cannot sign using localhost, an IP, or a custom domain that doesn’t match the broker certificate (or sit behind an NLB that rewrites the hostname) because the SigV4 payload includes the host and the broker will validate it. [1][2][3][4]
Key points:
- The IAM auth payload’s "host" must be the broker address (e.g. b-1...kafka..amazonaws.com) used for SigV4 signing. The aws-msk-iam-auth client builds a SigV4 JSON that contains that host. [3]
- MSK brokers present TLS certs that use the AWS-generated broker DNS names; TLS hostname validation requires the client name to match the cert CN/SAN. If you use a different hostname (custom domain / NLB / localhost) the TLS or IAM signature check will fail. [4][1]
- If you need a custom name you must ensure the custom name resolves to the broker IPs and matches the broker certs (or reconfigure to use a solution that preserves broker hostnames); otherwise IAM SigV4 authentication will be rejected. [4][2]
- Practical implication: clients must sign requests with the broker FQDN returned by GetBootstrapBrokers (or list-nodes endpoints) and use that same hostname for the TLS connection. [1][3]
Sources: AWS MSK bootstrap brokers docs; aws-msk-iam-auth GitHub; AWS blog + community answers showing NLB/custom-host issues. [1][3][4][2]
The service-level hostname approach will fail AWS MSK IAM authentication.
SigV4 signing must use the actual broker FQDN (e.g., b-1.<cluster>.<id>.kafka.<region>.amazonaws.com) returned by GetBootstrapBrokers, not a service-level hostname like kafka.{region}.amazonaws.com. The SigV4 payload includes the hostname, and AWS MSK brokers validate that it matches the actual broker endpoint and the TLS certificate CN/SAN. Using a service-level hostname will cause IAM authentication to be rejected by the broker.
The librdkafka OAuth callback receives the actual broker hostname as a parameter, which should be used directly for SigV4 signing instead of constructing a generic service-level hostname. Update the code to pass the actual broker endpoint to the credential provider for proper SigV4 signature generation.
This PR fixes MSK IAM authentication for AWS MSK clusters accessed through PrivateLink aliases or custom DNS names. The core issue was that the authentication logic required broker hostnames to match specific patterns (
.kafka.and.amazonaws.com), which failed for custom DNS configurations.Key Changes:
aws_regionconfiguration parameter - Allows manual region specification for custom DNS scenarios.amazonaws.comin broker addressesbroker_hostin error pathsSupported Scenarios:
b-1.xxx.kafka.region.amazonaws.com)boot-xxx.kafka-serverless.region.amazonaws.com)vpce-xxx.kafka.region.vpce.amazonaws.com)my-alias.internal.company.com)kafka.prod.example.com)Related Issues
Fixes authentication failures when using AWS MSK with PrivateLink or custom DNS configurations.
Testing
Example configuration file for the change
examples/kafka_filter/kafka_msk_iam.conffor comprehensive examples covering all scenariosDebug log output from testing the change
Attached Valgrind output that shows no leaks or memory corruption was found
183d3ee09broker_hostConfiguration Example:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targetsDocumentation
Documentation required for this feature
examples/kafka_filter/README.mdDoc PR: Not required - documentation included in this PR
Backporting
Implementation Details
Commits:
aws_msk_iam: use actual broker hostname for signing- Core fix for SigV4 signingout_kafka: add aws_region parameter for MSK IAM auth- Output plugin supportin_kafka: add aws_region parameter for MSK IAM auth- Input plugin supportexamples: add MSK IAM auth configuration examples- DocumentationTechnical Approach:
kafka.<region>.amazonaws.comaws_regionparameter)Aligns with AWS Best Practices:
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Documentation
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.