@@ -58,37 +58,33 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
5858 return NULL ;
5959 }
6060
61+ /* Retrieve SASL mechanism if configured */
62+ tmp = flb_output_get_property ("rdkafka.sasl.mechanism" , ins );
63+ if (tmp ) {
64+ ctx -> sasl_mechanism = flb_sds_create (tmp );
65+ flb_plg_info (ins , "SASL mechanism configured: %s" , ctx -> sasl_mechanism );
66+
6167#ifdef FLB_HAVE_AWS_MSK_IAM
62- /*
63- * When MSK IAM auth is enabled, default the required
64- * security settings so users don't need to specify them.
65- */
66- if (ctx -> aws_msk_iam && ctx -> aws_msk_iam_cluster_arn ) {
67- tmp = flb_output_get_property ("rdkafka.security.protocol" , ins );
68- if (!tmp ) {
69- flb_output_set_property (ins , "rdkafka.security.protocol" , "SASL_SSL" );
70- }
71-
72- tmp = flb_output_get_property ("rdkafka.sasl.mechanism" , ins );
73- if (!tmp ) {
68+ /* Check if using aws_msk_iam as SASL mechanism */
69+ if (strcasecmp (tmp , "aws_msk_iam" ) == 0 ) {
70+ /* Mark that user explicitly requested AWS MSK IAM */
71+ ctx -> aws_msk_iam = FLB_TRUE ;
72+
73+ /* Set SASL mechanism to OAUTHBEARER for librdkafka */
7474 flb_output_set_property (ins , "rdkafka.sasl.mechanism" , "OAUTHBEARER" );
75+ flb_sds_destroy (ctx -> sasl_mechanism );
7576 ctx -> sasl_mechanism = flb_sds_create ("OAUTHBEARER" );
77+
78+ /* Ensure security protocol is set */
79+ tmp = flb_output_get_property ("rdkafka.security.protocol" , ins );
80+ if (!tmp ) {
81+ flb_output_set_property (ins , "rdkafka.security.protocol" , "SASL_SSL" );
82+ }
83+
84+ flb_plg_info (ins , "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism" );
7685 }
77- else {
78- ctx -> sasl_mechanism = flb_sds_create (tmp );
79- }
80- }
81- else {
8286#endif
83- /* Retrieve SASL mechanism if configured */
84- tmp = flb_output_get_property ("rdkafka.sasl.mechanism" , ins );
85- if (tmp ) {
86- ctx -> sasl_mechanism = flb_sds_create (tmp );
87- }
88-
89- #ifdef FLB_HAVE_AWS_MSK_IAM
9087 }
91- #endif
9288
9389 /* rdkafka config context */
9490 ctx -> conf = flb_kafka_conf_create (& ctx -> kafka , & ins -> properties , 0 );
@@ -210,18 +206,38 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
210206 flb_kafka_opaque_set (ctx -> opaque , ctx , NULL );
211207 rd_kafka_conf_set_opaque (ctx -> conf , ctx -> opaque );
212208
209+ /*
210+ * Enable SASL queue for all OAUTHBEARER configurations.
211+ * This allows librdkafka to handle OAuth token refresh in a background thread,
212+ * which is essential for idle connections where rd_kafka_poll() is not called.
213+ * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
214+ */
215+ if (ctx -> sasl_mechanism && strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
216+ rd_kafka_conf_enable_sasl_queue (ctx -> conf , 1 );
217+ flb_plg_debug (ins , "SASL queue enabled for OAUTHBEARER mechanism" );
218+ }
219+
213220#ifdef FLB_HAVE_AWS_MSK_IAM
214- if (ctx -> aws_msk_iam && ctx -> aws_msk_iam_cluster_arn && ctx -> sasl_mechanism &&
221+ /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */
222+ if (ctx -> aws_msk_iam && ctx -> sasl_mechanism &&
215223 strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
216-
217- ctx -> msk_iam = flb_aws_msk_iam_register_oauth_cb (config ,
218- ctx -> conf ,
219- ctx -> aws_msk_iam_cluster_arn ,
220- ctx -> opaque );
221- if (!ctx -> msk_iam ) {
222- flb_plg_error (ctx -> ins , "failed to setup MSK IAM authentication" );
223- }
224- else {
224+ /* Check if brokers are configured for MSK IAM */
225+ if (ctx -> kafka .brokers &&
226+ (strstr (ctx -> kafka .brokers , ".kafka." ) || strstr (ctx -> kafka .brokers , ".kafka-serverless." )) &&
227+ strstr (ctx -> kafka .brokers , ".amazonaws.com" )) {
228+
229+ /* Register MSK IAM OAuth callback - pass brokers string directly */
230+ flb_plg_info (ins , "registering AWS MSK IAM authentication OAuth callback" );
231+ ctx -> msk_iam = flb_aws_msk_iam_register_oauth_cb (config ,
232+ ctx -> conf ,
233+ ctx -> opaque ,
234+ ctx -> kafka .brokers );
235+ if (!ctx -> msk_iam ) {
236+ flb_plg_error (ctx -> ins , "failed to setup MSK IAM authentication OAuth callback" );
237+ flb_out_kafka_destroy (ctx );
238+ return NULL ;
239+ }
240+
225241 res = rd_kafka_conf_set (ctx -> conf , "sasl.oauthbearer.config" ,
226242 "principal=admin" , errstr , sizeof (errstr ));
227243 if (res != RD_KAFKA_CONF_OK ) {
@@ -236,13 +252,38 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
236252 /* Kafka Producer */
237253 ctx -> kafka .rk = rd_kafka_new (RD_KAFKA_PRODUCER , ctx -> conf ,
238254 errstr , sizeof (errstr ));
255+
239256 if (!ctx -> kafka .rk ) {
240257 flb_plg_error (ctx -> ins , "failed to create producer: %s" ,
241258 errstr );
259+ /* rd_kafka_new() did NOT take ownership on failure; ctx->conf is
260+ * still valid and will be destroyed by flb_out_kafka_destroy(). */
242261 flb_out_kafka_destroy (ctx );
243262 return NULL ;
244263 }
245264
265+ /* rd_kafka_new() takes ownership of ctx->conf on success */
266+ ctx -> conf = NULL ;
267+
268+ /*
269+ * Enable SASL background callbacks for all OAUTHBEARER configurations.
270+ * This ensures OAuth tokens are refreshed automatically even on idle connections.
271+ * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
272+ */
273+ if (ctx -> sasl_mechanism && strcasecmp (ctx -> sasl_mechanism , "OAUTHBEARER" ) == 0 ) {
274+ rd_kafka_error_t * error ;
275+ error = rd_kafka_sasl_background_callbacks_enable (ctx -> kafka .rk );
276+ if (error ) {
277+ flb_plg_warn (ctx -> ins , "failed to enable SASL background callbacks: %s. "
278+ "OAuth tokens may not refresh on idle connections." ,
279+ rd_kafka_error_string (error ));
280+ rd_kafka_error_destroy (error );
281+ }
282+ else {
283+ flb_plg_info (ctx -> ins , "OAUTHBEARER: SASL background callbacks enabled" );
284+ }
285+ }
286+
246287#ifdef FLB_HAVE_AVRO_ENCODER
247288 /* Config AVRO */
248289 tmp = flb_output_get_property ("schema_str" , ins );
@@ -301,8 +342,13 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
301342 flb_kafka_topic_destroy_all (ctx );
302343
303344 if (ctx -> kafka .rk ) {
345+ /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */
304346 rd_kafka_destroy (ctx -> kafka .rk );
305347 }
348+ else if (ctx -> conf ) {
349+ /* If rd_kafka was never created, we need to destroy conf manually */
350+ rd_kafka_conf_destroy (ctx -> conf );
351+ }
306352
307353 if (ctx -> opaque ) {
308354 flb_kafka_opaque_destroy (ctx -> opaque );
0 commit comments