@@ -268,40 +268,33 @@ static int in_kafka_init(struct flb_input_instance *ins,
268268 return -1 ;
269269 }
270270
271+ /* Retrieve SASL mechanism if configured */
272+ conf = flb_input_get_property ("rdkafka.sasl.mechanism" , ins );
273+ if (conf ) {
274+ ctx -> sasl_mechanism = flb_sds_create (conf );
275+ flb_plg_info (ins , "SASL mechanism configured: %s" , ctx -> sasl_mechanism );
276+
271277#ifdef FLB_HAVE_AWS_MSK_IAM
272- /*
273- * When MSK IAM auth is enabled, default the required
274- * security settings so users don't need to specify them.
275- */
276- if (ctx -> aws_msk_iam && ctx -> aws_msk_iam_cluster_arn ) {
277- conf = flb_input_get_property ("rdkafka.security.protocol" , ins );
278- if (!conf ) {
279- flb_input_set_property (ins , "rdkafka.security.protocol" , "SASL_SSL" );
280- }
281-
282- conf = flb_input_get_property ("rdkafka.sasl.mechanism" , ins );
283- if (!conf ) {
278+ /* Check if using aws_msk_iam as SASL mechanism */
279+ if (strcasecmp (conf , "aws_msk_iam" ) == 0 ) {
280+ /* Mark that user explicitly requested AWS MSK IAM */
281+ ctx -> aws_msk_iam = FLB_TRUE ;
282+
283+ /* Set SASL mechanism to OAUTHBEARER for librdkafka */
284284 flb_input_set_property (ins , "rdkafka.sasl.mechanism" , "OAUTHBEARER" );
285+ flb_sds_destroy (ctx -> sasl_mechanism );
285286 ctx -> sasl_mechanism = flb_sds_create ("OAUTHBEARER" );
287+
288+ /* Ensure security protocol is set */
289+ conf = flb_input_get_property ("rdkafka.security.protocol" , ins );
290+ if (!conf ) {
291+ flb_input_set_property (ins , "rdkafka.security.protocol" , "SASL_SSL" );
292+ }
293+
294+ flb_plg_info (ins , "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism" );
286295 }
287- else {
288- ctx -> sasl_mechanism = flb_sds_create (conf );
289- flb_plg_info (ins , "SASL mechanism configured: %s" , ctx -> sasl_mechanism );
290- }
291- }
292- else {
293296#endif
294-
295- /* Retrieve SASL mechanism if configured */
296- conf = flb_input_get_property ("rdkafka.sasl.mechanism" , ins );
297- if (conf ) {
298- ctx -> sasl_mechanism = flb_sds_create (conf );
299- flb_plg_info (ins , "SASL mechanism configured: %s" , ctx -> sasl_mechanism );
300- }
301-
302- #ifdef FLB_HAVE_AWS_MSK_IAM
303297 }
304- #endif
305298
306299 kafka_conf = flb_kafka_conf_create (& ctx -> kafka , & ins -> properties , 1 );
307300 if (!kafka_conf ) {
@@ -351,25 +344,45 @@ static int in_kafka_init(struct flb_input_instance *ins,
351344 flb_kafka_opaque_set (ctx -> opaque , ctx , NULL );
352345 rd_kafka_conf_set_opaque (kafka_conf , ctx -> opaque );
353346
347+ /*
348+ * Enable SASL queue for all OAUTHBEARER configurations.
349+ * This allows librdkafka to handle OAuth token refresh in a background thread,
350+ * which is essential for idle connections or when poll intervals are large.
351+ * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
352+ */
353+ if (ctx -> sasl_mechanism && strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
354+ rd_kafka_conf_enable_sasl_queue (kafka_conf , 1 );
355+ flb_plg_debug (ins , "SASL queue enabled for OAUTHBEARER mechanism" );
356+ }
357+
354358#ifdef FLB_HAVE_AWS_MSK_IAM
355- if (ctx -> aws_msk_iam && ctx -> aws_msk_iam_cluster_arn && ctx -> sasl_mechanism &&
359+ /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */
360+ if (ctx -> aws_msk_iam && ctx -> sasl_mechanism &&
356361 strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
357- flb_plg_info (ins , "registering MSK IAM authentication with cluster ARN: %s" ,
358- ctx -> aws_msk_iam_cluster_arn );
359- ctx -> msk_iam = flb_aws_msk_iam_register_oauth_cb (config ,
360- kafka_conf ,
361- ctx -> aws_msk_iam_cluster_arn ,
362- ctx -> opaque );
363- if (!ctx -> msk_iam ) {
364- flb_plg_error (ins , "failed to setup MSK IAM authentication" );
365- }
366- else {
367- res = rd_kafka_conf_set (kafka_conf , "sasl.oauthbearer.config" ,
368- "principal=admin" , errstr , sizeof (errstr ));
369- if (res != RD_KAFKA_CONF_OK ) {
370- flb_plg_error (ins ,
371- "failed to set sasl.oauthbearer.config: %s" ,
372- errstr );
362+ /* Check if brokers are configured for MSK IAM */
363+ if (ctx -> kafka .brokers &&
364+ (strstr (ctx -> kafka .brokers , ".kafka." ) || strstr (ctx -> kafka .brokers , ".kafka-serverless." )) &&
365+ strstr (ctx -> kafka .brokers , ".amazonaws.com" )) {
366+
367+ /* Register MSK IAM OAuth callback - pass brokers string directly */
368+ flb_plg_info (ins , "registering AWS MSK IAM authentication OAuth callback" );
369+ ctx -> msk_iam = flb_aws_msk_iam_register_oauth_cb (config ,
370+ kafka_conf ,
371+ ctx -> opaque ,
372+ ctx -> kafka .brokers );
373+
374+ if (!ctx -> msk_iam ) {
375+ flb_plg_error (ins , "failed to setup MSK IAM authentication OAuth callback" );
376+ goto init_error ;
377+ }
378+ else {
379+ res = rd_kafka_conf_set (kafka_conf , "sasl.oauthbearer.config" ,
380+ "principal=admin" , errstr , sizeof (errstr ));
381+ if (res != RD_KAFKA_CONF_OK ) {
382+ flb_plg_error (ins ,
383+ "failed to set sasl.oauthbearer.config: %s" ,
384+ errstr );
385+ }
373386 }
374387 }
375388 }
@@ -380,9 +393,36 @@ static int in_kafka_init(struct flb_input_instance *ins,
380393 /* Create Kafka consumer handle */
381394 if (!ctx -> kafka .rk ) {
382395 flb_plg_error (ins , "Failed to create new consumer: %s" , errstr );
396+ /* rd_kafka_new() did NOT take ownership on failure; kafka_conf is
397+ * still valid and will be destroyed by init_error cleanup path. */
383398 goto init_error ;
384399 }
385400
401+ /* rd_kafka_new() takes ownership of kafka_conf on success */
402+ kafka_conf = NULL ;
403+
404+ /*
405+ * Enable SASL background callbacks for all OAUTHBEARER configurations.
406+ * This ensures OAuth tokens are refreshed automatically even when:
407+ * - Poll intervals are large
408+ * - Topics have no messages
409+ * - Collector is paused
410+ * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
411+ */
412+ if (ctx -> sasl_mechanism && strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
413+ rd_kafka_error_t * error ;
414+ error = rd_kafka_sasl_background_callbacks_enable (ctx -> kafka .rk );
415+ if (error ) {
416+ flb_plg_warn (ins , "failed to enable SASL background callbacks: %s. "
417+ "OAuth tokens may not refresh during idle periods." ,
418+ rd_kafka_error_string (error ));
419+ rd_kafka_error_destroy (error );
420+ }
421+ else {
422+ flb_plg_info (ins , "OAUTHBEARER: SASL background callbacks enabled" );
423+ }
424+ }
425+
386426 /* Trigger initial token refresh for OAUTHBEARER */
387427 rd_kafka_poll (ctx -> kafka .rk , 0 );
388428
@@ -449,15 +489,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
449489 }
450490 if (ctx -> kafka .rk ) {
451491 rd_kafka_consumer_close (ctx -> kafka .rk );
492+ /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */
452493 rd_kafka_destroy (ctx -> kafka .rk );
453494 }
495+ else if (kafka_conf ) {
496+ /* If rd_kafka was never created, we need to destroy conf manually */
497+ rd_kafka_conf_destroy (kafka_conf );
498+ }
454499 if (ctx -> opaque ) {
455500 flb_kafka_opaque_destroy (ctx -> opaque );
456501 }
457- else if (kafka_conf ) {
458- /* conf is already destroyed when rd_kafka is initialized */
459- rd_kafka_conf_destroy (kafka_conf );
502+
503+ #ifdef FLB_HAVE_AWS_MSK_IAM
504+ if (ctx -> msk_iam ) {
505+ flb_aws_msk_iam_destroy (ctx -> msk_iam );
460506 }
507+ #endif
508+
461509 flb_sds_destroy (ctx -> sasl_mechanism );
462510 flb_free (ctx );
463511
@@ -571,19 +619,6 @@ static struct flb_config_map config_map[] = {
571619 "Rely on kafka auto-commit and commit messages in batches"
572620 },
573621
574- #ifdef FLB_HAVE_AWS_MSK_IAM
575- {
576- FLB_CONFIG_MAP_STR , "aws_msk_iam_cluster_arn" , (char * )NULL ,
577- 0 , FLB_TRUE , offsetof(struct flb_in_kafka_config , aws_msk_iam_cluster_arn ),
578- "ARN of the MSK cluster when using AWS IAM authentication"
579- },
580- {
581- FLB_CONFIG_MAP_BOOL , "aws_msk_iam" , "false" ,
582- 0 , FLB_TRUE , offsetof(struct flb_in_kafka_config , aws_msk_iam ),
583- "Enable AWS MSK IAM authentication"
584- },
585- #endif
586-
587622 /* EOF */
588623 {0 }
589624};
0 commit comments