3838#include <string.h>
3939#include <time.h>
4040
41- /* Lightweight config with credential caching to prevent principal changes */
41+ /* Lightweight config - provider manages credential caching and refresh internally */
4242struct flb_aws_msk_iam {
4343 struct flb_config * flb_config ;
4444 flb_sds_t region ;
4545 flb_sds_t cluster_arn ;
4646 struct flb_tls * cred_tls ; /* TLS instance for AWS credentials (STS) */
4747 struct flb_aws_provider * provider ; /* AWS credentials provider (created once, reused) */
48-
49- /* Credential caching to maintain consistent principal during re-authentication */
50- struct flb_aws_credentials * cached_creds ; /* Cached AWS credentials */
51- time_t creds_expiration ; /* Credential expiration time */
52- pthread_mutex_t creds_lock ; /* Thread-safe access to cached credentials */
5348};
5449
5550/* Utility functions - same as before */
@@ -170,150 +165,8 @@ static char *extract_region(const char *arn)
170165 return out ;
171166}
172167
173- /*
174- * Duplicate AWS credentials structure
175- * Returns NULL on failure
176- */
177- static struct flb_aws_credentials * duplicate_credentials (struct flb_aws_credentials * src )
178- {
179- struct flb_aws_credentials * dst ;
180-
181- if (!src ) {
182- return NULL ;
183- }
184-
185- dst = flb_calloc (1 , sizeof (struct flb_aws_credentials ));
186- if (!dst ) {
187- return NULL ;
188- }
189-
190- if (src -> access_key_id ) {
191- dst -> access_key_id = flb_sds_create (src -> access_key_id );
192- if (!dst -> access_key_id ) {
193- flb_free (dst );
194- return NULL ;
195- }
196- }
197-
198- if (src -> secret_access_key ) {
199- dst -> secret_access_key = flb_sds_create (src -> secret_access_key );
200- if (!dst -> secret_access_key ) {
201- if (dst -> access_key_id ) {
202- flb_sds_destroy (dst -> access_key_id );
203- }
204- flb_free (dst );
205- return NULL ;
206- }
207- }
208-
209- if (src -> session_token ) {
210- dst -> session_token = flb_sds_create (src -> session_token );
211- if (!dst -> session_token ) {
212- if (dst -> access_key_id ) {
213- flb_sds_destroy (dst -> access_key_id );
214- }
215- if (dst -> secret_access_key ) {
216- flb_sds_destroy (dst -> secret_access_key );
217- }
218- flb_free (dst );
219- return NULL ;
220- }
221- }
222-
223- return dst ;
224- }
225-
226- /*
227- * Get cached credentials or refresh if expired
228- * This function ensures the same AWS temporary credentials (with the same session ID)
229- * are reused across multiple token refreshes, preventing "principal change" errors.
230- *
231- * Returns a COPY of credentials that the caller must destroy.
232- * Returns NULL on failure.
233- */
234- static struct flb_aws_credentials * get_cached_or_refresh_credentials (
235- struct flb_aws_msk_iam * config , time_t * expiration )
236- {
237- time_t now ;
238- struct flb_aws_credentials * creds = NULL ;
239- struct flb_aws_credentials * creds_copy = NULL ;
240- int needs_refresh = FLB_FALSE ;
241-
242- now = time (NULL );
243-
244- pthread_mutex_lock (& config -> creds_lock );
245-
246- /* Check if cached credentials are still valid */
247- if (config -> cached_creds &&
248- config -> creds_expiration > now + FLB_AWS_REFRESH_WINDOW ) {
249- /* Credentials are still valid, return a copy */
250- creds_copy = duplicate_credentials (config -> cached_creds );
251- if (expiration ) {
252- * expiration = config -> creds_expiration ;
253- }
254- pthread_mutex_unlock (& config -> creds_lock );
255-
256- if (creds_copy ) {
257- flb_info ("[aws_msk_iam] reusing cached AWS credentials (valid until %ld, %ld seconds remaining)" ,
258- config -> creds_expiration , config -> creds_expiration - now );
259- }
260- return creds_copy ;
261- }
262-
263- needs_refresh = FLB_TRUE ;
264- pthread_mutex_unlock (& config -> creds_lock );
265-
266- /* Credentials expired or don't exist, need to refresh */
267- if (needs_refresh ) {
268- flb_info ("[aws_msk_iam] AWS credentials expired or not cached, fetching new credentials" );
269-
270- /* Get new credentials using the long-lived provider */
271- creds = config -> provider -> provider_vtable -> get_credentials (config -> provider );
272- if (!creds ) {
273- flb_error ("[aws_msk_iam] failed to get AWS credentials from provider" );
274- return NULL ;
275- }
276-
277- /* Update cache with new credentials */
278- pthread_mutex_lock (& config -> creds_lock );
279-
280- if (config -> cached_creds ) {
281- flb_aws_credentials_destroy (config -> cached_creds );
282- config -> cached_creds = NULL ;
283- }
284-
285- config -> cached_creds = duplicate_credentials (creds );
286- if (!config -> cached_creds ) {
287- pthread_mutex_unlock (& config -> creds_lock );
288- flb_error ("[aws_msk_iam] failed to cache credentials" );
289- flb_aws_credentials_destroy (creds );
290- return NULL ;
291- }
292-
293- /*
294- * Set expiration time. AWS temporary credentials typically last 1 hour.
295- * We use a conservative estimate if we can't determine the exact expiration.
296- */
297- config -> creds_expiration = now + 3600 ; /* Default: 1 hour */
298-
299- if (expiration ) {
300- * expiration = config -> creds_expiration ;
301- }
302-
303- pthread_mutex_unlock (& config -> creds_lock );
304-
305- flb_info ("[aws_msk_iam] successfully cached new AWS credentials (valid until %ld, %ld seconds remaining)" ,
306- config -> creds_expiration , config -> creds_expiration - now );
307-
308- /* Return the credentials (caller owns them) */
309- return creds ;
310- }
311-
312- return NULL ;
313- }
314-
315- /* Payload generator using cached credentials to maintain consistent principal */
316- static flb_sds_t build_msk_iam_payload_with_creds (struct flb_aws_msk_iam * config ,
168+ /* Payload generator - builds MSK IAM authentication payload */
169+ static flb_sds_t build_msk_iam_payload (struct flb_aws_msk_iam * config ,
317170 const char * host ,
318171 struct flb_aws_credentials * creds )
319172{
@@ -785,18 +638,18 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
785638 flb_info ("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s" , config -> region , host );
786639
787640 /*
788- * CRITICAL FIX: Use cached credentials to maintain consistent principal
789- * This prevents "Cannot change principals during re-authentication" errors
641+ * Get credentials from provider. The provider handles caching and expiration internally.
642+ * The provider automatically manages credential refresh when needed.
790643 */
791- creds = get_cached_or_refresh_credentials (config , NULL );
644+ creds = config -> provider -> provider_vtable -> get_credentials (config -> provider );
792645 if (!creds ) {
793- flb_error ("[aws_msk_iam] failed to get AWS credentials (cached or refreshed) " );
646+ flb_error ("[aws_msk_iam] failed to get AWS credentials from provider " );
794647 rd_kafka_oauthbearer_set_token_failure (rk , "credential retrieval failed" );
795648 return ;
796649 }
797650
798- /* Generate payload using cached credentials */
799- payload = build_msk_iam_payload_with_creds (config , host , creds );
651+ /* Generate payload using credentials from provider */
652+ payload = build_msk_iam_payload (config , host , creds );
800653 if (!payload ) {
801654 flb_error ("[aws_msk_iam] failed to generate MSK IAM payload" );
802655 flb_aws_credentials_destroy (creds );
@@ -908,22 +761,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con
908761
909762 flb_info ("[aws_msk_iam] TLS instance created for AWS credentials" );
910763
911- /* Initialize credential caching fields */
912- ctx -> cached_creds = NULL ;
913- ctx -> creds_expiration = 0 ;
914-
915- /* Initialize mutex for thread-safe credential access */
916- if (pthread_mutex_init (& ctx -> creds_lock , NULL ) != 0 ) {
917- flb_error ("[aws_msk_iam] failed to initialize credentials mutex" );
918- flb_tls_destroy (ctx -> cred_tls );
919- flb_sds_destroy (ctx -> region );
920- flb_sds_destroy (ctx -> cluster_arn );
921- flb_free (ctx );
922- return NULL ;
923- }
924-
925- flb_info ("[aws_msk_iam] Credential cache initialized with mutex protection" );
926-
927764 /* Create AWS provider once - will be reused for credential refresh */
928765 ctx -> provider = flb_standard_chain_provider_create (config ,
929766 ctx -> cred_tls ,
@@ -934,7 +771,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con
934771 NULL ); /* profile */
935772 if (!ctx -> provider ) {
936773 flb_error ("[aws_msk_iam] failed to create AWS credentials provider" );
937- pthread_mutex_destroy (& ctx -> creds_lock );
938774 flb_tls_destroy (ctx -> cred_tls );
939775 flb_sds_destroy (ctx -> region );
940776 flb_sds_destroy (ctx -> cluster_arn );
@@ -946,7 +782,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con
946782 if (ctx -> provider -> provider_vtable -> init (ctx -> provider ) != 0 ) {
947783 flb_error ("[aws_msk_iam] failed to initialize AWS credentials provider" );
948784 flb_aws_provider_destroy (ctx -> provider );
949- pthread_mutex_destroy (& ctx -> creds_lock );
950785 flb_tls_destroy (ctx -> cred_tls );
951786 flb_sds_destroy (ctx -> region );
952787 flb_sds_destroy (ctx -> cluster_arn );
@@ -966,7 +801,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con
966801 return ctx ;
967802}
968803
969- /* Destroy MSK IAM config - includes cached credentials cleanup */
804+ /* Destroy MSK IAM config */
970805void flb_aws_msk_iam_destroy (struct flb_aws_msk_iam * ctx )
971806{
972807 if (!ctx ) {
@@ -975,20 +810,11 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)
975810
976811 flb_info ("[aws_msk_iam] destroying MSK IAM config" );
977812
978- /* Clean up cached credentials */
979- if (ctx -> cached_creds ) {
980- flb_aws_credentials_destroy (ctx -> cached_creds );
981- ctx -> cached_creds = NULL ;
982- }
983-
984- /* Destroy AWS provider */
813+ /* Destroy AWS provider (provider manages its own credential caching) */
985814 if (ctx -> provider ) {
986815 flb_aws_provider_destroy (ctx -> provider );
987816 }
988817
989- /* Destroy mutex */
990- pthread_mutex_destroy (& ctx -> creds_lock );
991-
992818 /* Clean up TLS instance */
993819 if (ctx -> cred_tls ) {
994820 flb_tls_destroy (ctx -> cred_tls );
@@ -1003,5 +829,5 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)
1003829 }
1004830 flb_free (ctx );
1005831
1006- flb_info ("[aws_msk_iam] MSK IAM config destroyed, cached credentials and provider cleared " );
832+ flb_info ("[aws_msk_iam] MSK IAM config destroyed" );
1007833}
0 commit comments