-
Notifications
You must be signed in to change notification settings - Fork 1.8k
aws_msk_iam: add AWS MSK IAM authentication support #11270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAPI and config surfaces changed for AWS MSK IAM: the MSK IAM registration signature removed cluster ARN and now accepts brokers+region, MSK IAM gained TLS-backed persistent provider and mutex-protected token lifecycle with region auto-detection, and Kafka plugins drive MSK IAM via Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant FB as Fluent Bit
participant LR as librdkafka
participant MSK as flb_aws_msk_iam
participant AWS as AWS Credentials Provider
Note over FB: Initialization
FB->>LR: create rd_kafka_conf()
FB->>FB: read `rdkafka.sasl.mechanism` == aws_msk_iam
FB->>MSK: flb_aws_msk_iam_register_oauth_cb(config,kconf,opaque,brokers,region)
MSK->>MSK: extract_region_from_broker(brokers) / init TLS / create provider / init mutex
MSK-->>FB: return msk_iam_handle
Note over LR,MSK: Background token refresh
LR->>MSK: oauthbearer_token_refresh_cb(request)
MSK->>MSK: lock mutex
MSK->>AWS: provider->refresh_credentials()
AWS-->>MSK: credentials
MSK->>MSK: build_payload(host, credentials)
MSK-->>LR: rd_kafka_oauthbearer_set_token(token, lifetime=300s)
MSK->>MSK: unlock mutex
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
plugins/out_kafka/kafka_config.c (1)
74-82: Consider checking return values fromflb_output_set_property.The calls to
flb_output_set_property()at lines 74 and 81 don't check return values. While unlikely to fail in practice, property setting can fail on allocation errors. For robustness:- flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); + if (flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER") < 0) { + flb_plg_error(ins, "failed to set OAUTHBEARER SASL mechanism"); + flb_sds_destroy(ctx->sasl_mechanism); + flb_free(ctx); + return NULL; + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(4 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
💤 Files with no reviewable changes (1)
- plugins/out_kafka/kafka.c
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/flb_kafka.csrc/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-761)flb_aws_msk_iam_destroy(764-786)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
plugins/out_kafka/kafka_config.c (2)
src/flb_output.c (2)
flb_output_get_property(1108-1111)flb_output_set_property(843-1068)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
🪛 Cppcheck (2.18.0)
src/aws/flb_aws_credentials_sts.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/flb_kafka.c
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_ec2.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_profile.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/out_kafka/kafka_config.c
[information] 61-61: Include file
(missingIncludeSystem)
[information] 62-62: Include file
(missingIncludeSystem)
[information] 63-63: Include file
(missingIncludeSystem)
[information] 65-65: Include file
(missingIncludeSystem)
[information] 66-66: Include file
(missingIncludeSystem)
[information] 67-67: Include file
(missingIncludeSystem)
[information] 68-68: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (24)
src/aws/flb_aws_credentials_ec2.c (1)
128-139: Whitespace-only change acknowledged.This is a minor formatting change adding a blank line after the debug log. No functional impact.
src/aws/flb_aws_credentials_sts.c (1)
173-186: Formatting improvements enhance code consistency.The added blank lines after debug statements in
refresh_fn_stsandrefresh_fn_eksalign these functions with the existing style ininit_fn_sts,init_fn_eks,get_credentials_fn_sts, andget_credentials_fn_eks. This improves readability by separating logging statements from control flow logic.Also applies to: 478-489
src/aws/flb_aws_credentials_profile.c (1)
664-674: LGTM! Log level adjustment is appropriate.Downgrading the log level for a missing credentials file (ENOENT) to debug is correct. A missing
~/.aws/credentialsfile is not an error when other credential sources (EC2/ECS metadata, config file withcredential_process, environment variables) may be available. This aligns with the similar handling for the config file at line 623.src/flb_kafka.c (1)
96-100: LGTM! Critical bug fix for proper resource cleanup.Using
rd_kafka_conf_destroy()is the correct way to clean up ard_kafka_conf_t*object allocated byrd_kafka_conf_new(). The previous use offlb_free()would have caused memory corruption or leaks since librdkafka's configuration objects have internal structures that require proper destruction.plugins/in_kafka/in_kafka.h (1)
57-60: LGTM! Clean API simplification.The explicit
aws_msk_iamflag replaces the removedaws_msk_iam_cluster_arnfield, aligning with the PR's shift to broker-based region detection. The flag clearly indicates user intent viardkafka.sasl.mechanism=aws_msk_iam, making the opt-in explicit rather than implicit.plugins/out_kafka/kafka_config.h (1)
128-137: LGTM! Consistent with in_kafka changes.The
aws_msk_iamflag andsasl_mechanismfield additions mirror thein_kafka.hstructure, maintaining consistency across Kafka input and output plugins. The explicit flag design is cleaner than the previous ARN-based approach.include/fluent-bit/aws/flb_aws_msk_iam.h (1)
37-49: LGTM! API simplification with clear documentation.The updated signature removes
cluster_arnin favor ofbrokers, enabling automatic region extraction from broker addresses. This simplifies user configuration—users no longer need to provide the cluster ARN. The documentation clearly explains each parameter's purpose.plugins/out_kafka/kafka_config.c (4)
209-218: LGTM! Essential for OAuth token refresh on idle connections.Enabling the SASL queue before producer creation is correct. This allows librdkafka's background thread to handle OAuth token refresh even when
rd_kafka_poll()isn't called frequently, preventing authentication failures on idle connections.
220-250: LGTM! Well-structured MSK IAM registration with proper validation.Good defensive checks: explicit flag, OAUTHBEARER mechanism, and MSK broker patterns must all be present. The callback registration failure correctly triggers cleanup and return.
One observation: the
sasl.oauthbearer.configerror at lines 243-247 only logs but doesn't fail. This is likely fine since it's a secondary configuration, but verify this doesn't cause issues with librdkafka's OAUTHBEARER validation.
253-285: LGTM! Correct ownership semantics and background callback handling.Setting
ctx->conf = NULLafter successfulrd_kafka_new()correctly reflects ownership transfer—librdkafka now owns the configuration. The SASL background callback enabling is done post-creation as required, and the non-fatal warning on failure is appropriate (graceful degradation).
344-351: LGTM! Proper two-path cleanup for configuration ownership.The conditional cleanup correctly handles both scenarios:
- Producer created:
rd_kafka_destroy()handles both producer and configuration- Producer creation failed:
ctx->confis still valid and needs explicitrd_kafka_conf_destroy()This pairs correctly with the
ctx->conf = NULLassignment after successfulrd_kafka_new().plugins/in_kafka/in_kafka.c (6)
339-345: LGTM!Opaque context creation and configuration follows proper error handling patterns.
347-356: LGTM!Enabling SASL queue for OAUTHBEARER is the correct approach for background token refresh. The comment clearly explains the benefit for all OAUTHBEARER methods.
358-389: LGTM with minor observation.The MSK IAM OAuth callback registration logic is well-structured. The broker pattern validation (
".kafka."or".kafka-serverless."with".amazonaws.com") appropriately identifies MSK endpoints.Note: The
sasl.oauthbearer.configsetting failure at lines 379-385 only logs an error but doesn't fail initialization. This appears intentional since theprincipal=adminvalue is primarily for librdkafka's internal validation and the actual authentication uses the generated token.
391-424: LGTM!Excellent ownership semantics handling:
kafka_conf = NULLafter successfulrd_kafka_new()correctly prevents double-free- Clear comments document the ownership transfer
- Graceful degradation when SASL background callbacks fail (warn but continue)
The SASL background callback enablement ensures tokens refresh even during idle periods.
486-512: LGTM!Comprehensive error cleanup path:
- Correctly handles the mutual exclusivity between
kafka.rk(owns conf) and standalonekafka_conf- MSK IAM resources cleaned up under proper compile guard
- Proper cleanup order prevents use-after-free
549-559: LGTM!Exit cleanup mirrors the error path cleanup correctly. The cleanup order (MSK IAM → opaque → sasl_mechanism) is appropriate.
src/aws/flb_aws_msk_iam.c (7)
42-55: LGTM!Well-designed structure with proper thread safety considerations. The 5-minute token lifetime aligns with AWS MSK IAM standards, and the mutex protects credential provider access during concurrent refresh callbacks.
214-268: LGTM!Clean function signature with explicit credential passing (rather than fetching internally). Input validation is thorough with informative error messages.
432-478: LGTM!The presigned URL construction and Base64 URL encoding are implemented correctly:
- User-Agent parameter added for identification
- Base64 URL encoding properly converts
+→-,/→_, and strips padding- Memory cleanup is thorough in both success and error paths
555-612: LGTM!Excellent concurrency handling:
- Mutex protects credential provider access during refresh/get_credentials
- Lock held for minimal duration (released immediately after getting credentials)
- Credentials destroyed after use (security best practice)
The explicit vtable calls (
provider_vtable->refresh,provider_vtable->get_credentials) assume the provider is valid, which is guaranteed by the context lifecycle.
763-785: LGTM!Proper cleanup order respects dependencies:
- Provider (uses TLS)
- TLS
- Region string
- Mutex
- Context struct
The unconditional
pthread_mutex_destroyis safe since a context only reaches this function if initialization succeeded (including mutex init).
519-551: LGTM!Host buffer sizing is safe - the maximum formatted string length (~63 characters) is well within the 256-byte buffer, especially given the 32-character region length limit enforced in
extract_region_from_broker.
728-738: Verify provider initialization sequence order.The code calls
sync()beforeinit(), but Fluent Bit's documented AWS credential provider lifecycle callsinit()at startup before periodicsync()calls. Confirm whether this reversed sequence is intentional for MSK IAM OAuth callback initialization or ifinit()should be called first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
Still failing our linter: |
b486cb7 to
2264d52
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
src/aws/flb_aws_msk_iam.c (2)
137-212: Verify bounds check for VPC endpoint detection.Line 180: The condition
if (p >= broker + 5 && ...)uses pointer comparison. While likely correct, using offset comparisonif (p - broker >= 5 && ...)would be clearer and more portable, explicitly checking there are at least 5 bytes beforepbefore accessingp - 5.Apply this diff for clarity:
/* Check for VPC endpoint format: .vpce.amazonaws.com */ - if (p >= broker + 5 && strncmp(p - 5, ".vpce", 5) == 0) { + if (p - broker >= 5 && strncmp(p - 5, ".vpce", 5) == 0) { /* For VPC endpoints, region ends at .vpce */ end = p - 5; }Consider adding unit tests for:
- VPC endpoint format (
vpce-xxx.kafka.region.vpce.amazonaws.com)- Brokers with/without ports
- Edge cases near 32-character region limit
701-738: Verify TLS ownership to prevent potential double-free.Lines 702-711 create
ctx->cred_tls, which is then passed toflb_standard_chain_provider_createat line 714. If the provider stores this pointer internally (e.g., inprovider->cred_tls), then the destroy path at lines 771-776 may cause a double-free:flb_aws_provider_destroy(ctx->provider)would free the TLS handle, and thenflb_tls_destroy(ctx->cred_tls)would attempt to free it again.Run the following script to check if the provider stores the TLS pointer:
#!/bin/bash # Check if flb_standard_chain_provider_create stores the cred_tls pointer ast-grep --pattern $'flb_standard_chain_provider_create($$$) { $$$ $PROVIDER->cred_tls = $TLS; $$$ }' # Also check the provider destroy function rg -A 10 "flb_aws_provider_destroy|flb_standard_chain_provider_destroy" --type cplugins/in_kafka/in_kafka.c (1)
271-297: Add NULL checks after flb_sds_create calls.Both
flb_sds_createcalls (lines 274 and 286) can fail and return NULL, but there are no checks. If allocation fails,ctx->sasl_mechanismwill be NULL, causing crashes in subsequentstrcasecmpcalls at lines 279, 353, 361, and 412.Apply this diff to add proper NULL checks:
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); if (conf) { ctx->sasl_mechanism = flb_sds_create(conf); + if (!ctx->sasl_mechanism) { + flb_plg_error(ins, "failed to allocate SASL mechanism string"); + flb_free(ctx); + return -1; + } flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); #ifdef FLB_HAVE_AWS_MSK_IAM /* Check if using aws_msk_iam as SASL mechanism */ if (strcasecmp(conf, "aws_msk_iam") == 0) { /* Mark that user explicitly requested AWS MSK IAM */ ctx->aws_msk_iam = FLB_TRUE; /* Set SASL mechanism to OAUTHBEARER for librdkafka */ flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); flb_sds_destroy(ctx->sasl_mechanism); ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER"); + if (!ctx->sasl_mechanism) { + flb_plg_error(ins, "failed to allocate SASL mechanism string"); + flb_free(ctx); + return -1; + }
🧹 Nitpick comments (4)
src/aws/flb_aws_credentials_sts.c (1)
178-178: Trailing whitespace detected.Lines 178 and 483 contain trailing whitespace/spaces on otherwise blank lines. While not a functional issue, this may cause linter warnings or be flagged in CI.
- +Also applies to: 483-483
src/aws/flb_aws_credentials_ec2.c (1)
133-133: Trailing whitespace on blank line.Same pattern as in other credential provider files—line 133 has trailing whitespace. Consider removing for consistency with project style.
plugins/out_kafka/kafka_config.c (1)
220-250: Consider makingsasl.oauthbearer.configfailure fatal.The OAuth callback registration correctly validates broker patterns for MSK. However, if
rd_kafka_conf_setfails forsasl.oauthbearer.config(lines 241-247), execution continues. Since this configuration is part of the OAUTHBEARER setup, a failure here may cause authentication issues at runtime.res = rd_kafka_conf_set(ctx->conf, "sasl.oauthbearer.config", "principal=admin", errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { flb_plg_error(ctx->ins, "failed to set sasl.oauthbearer.config: %s", errstr); + flb_out_kafka_destroy(ctx); + return NULL; }plugins/in_kafka/in_kafka.c (1)
347-356: Consider checking rd_kafka_conf_enable_sasl_queue return value.While unlikely to fail,
rd_kafka_conf_enable_sasl_queuereturns an error code. Adding a check would make error handling more complete.if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - rd_kafka_conf_enable_sasl_queue(kafka_conf, 1); + res = rd_kafka_conf_enable_sasl_queue(kafka_conf, 1); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_warn(ins, "Failed to enable SASL queue (non-critical)"); + } flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism"); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(4 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
💤 Files with no reviewable changes (1)
- plugins/out_kafka/kafka.c
🚧 Files skipped from review as they are similar to previous changes (1)
- include/fluent-bit/aws/flb_aws_msk_iam.h
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (2)
flb_output_get_property(1108-1111)flb_output_set_property(843-1068)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
src/aws/flb_aws_msk_iam.c (1)
src/tls/flb_tls.c (1)
flb_tls_create(183-232)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-761)flb_aws_msk_iam_destroy(764-786)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
🪛 Cppcheck (2.18.0)
plugins/out_kafka/kafka_config.c
[information] 61-61: Include file
(missingIncludeSystem)
[information] 62-62: Include file
(missingIncludeSystem)
[information] 63-63: Include file
(missingIncludeSystem)
[information] 65-65: Include file
(missingIncludeSystem)
[information] 66-66: Include file
(missingIncludeSystem)
[information] 67-67: Include file
(missingIncludeSystem)
[information] 68-68: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/flb_kafka.c
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_ec2.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_profile.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_sts.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (16)
src/aws/flb_aws_credentials_profile.c (1)
664-674: LGTM! Appropriate log level adjustment for missing credentials file.Downgrading the log level to
AWS_CREDS_DEBUGforENOENTis correct—a missing credentials file is an expected scenario when using alternative credential sources (environment variables, EC2 IMDS, EKS, etc.). This aligns with the handling inget_shared_config_credentialsat line 623.plugins/out_kafka/kafka_config.h (1)
128-131: LGTM! Clean struct refactoring for MSK IAM configuration.Replacing the
aws_msk_iam_cluster_arnfield with a simpleaws_msk_iamflag is a good simplification. Per the PR objectives, region and cluster type are now auto-extracted from broker hostnames, making the explicit cluster ARN unnecessary.src/flb_kafka.c (1)
96-100: LGTM! Critical fix for proper librdkafka configuration cleanup.Using
rd_kafka_conf_destroy()instead offlb_free()is the correct approach. Therd_kafka_conf_tobject allocated byrd_kafka_conf_new()has internal structures that must be properly released by librdkafka's destructor, not by a simple memory free.plugins/in_kafka/in_kafka.h (1)
57-60: LGTM! Consistent with out_kafka configuration structure.The
aws_msk_iamflag addition mirrors the changes inplugins/out_kafka/kafka_config.h, ensuring both input and output Kafka plugins have aligned MSK IAM configuration approaches.plugins/out_kafka/kafka_config.c (4)
61-87: LGTM! Well-structured SASL mechanism handling for MSK IAM.The logic correctly:
- Captures the user-configured SASL mechanism
- Detects the
aws_msk_iamalias and converts it toOAUTHBEARER- Sets appropriate defaults for
security.protocol- Properly manages the
sasl_mechanismSDS string lifecycle
209-218: LGTM! Universal OAUTHBEARER enhancement.Enabling the SASL queue for all OAUTHBEARER configurations is a good design choice. This ensures token refresh works correctly on idle connections regardless of the OAuth provider (AWS IAM, OIDC, custom, etc.).
252-286: LGTM! Correct ownership semantics for librdkafka configuration.The ownership handling is well-documented and correct:
- On
rd_kafka_new()success:ctx->conf = NULLprevents double-free since librdkafka now owns it- On failure:
ctx->confremains valid for cleanup inflb_out_kafka_destroy()The degraded handling for SASL background callback failures (warning instead of fatal) is reasonable—the connection may still function, though with potential token refresh issues on idle connections.
344-351: LGTM! Proper cleanup handling for all failure scenarios.The destroy logic correctly handles both cases:
- When
rd_kafka_new()succeeded:rd_kafka_destroy()handles the configuration- When
rd_kafka_new()failed: manualrd_kafka_conf_destroy()is neededSince
ctx->confis set toNULLafter successful producer creation (line 266), there's no risk of double-free.plugins/in_kafka/in_kafka.c (4)
358-389: LGTM: MSK IAM registration logic is sound.The conditional registration based on explicit user request, SASL mechanism, and broker patterns is appropriate. Error handling properly cleans up on failure.
391-424: LGTM: Ownership semantics and SASL background callbacks.The ownership transfer of
kafka_confis properly handled: preserved on failure, nulled on success. SASL background callback enablement is correctly implemented with appropriate error handling.
486-507: LGTM: Error cleanup path is comprehensive.The cleanup logic properly handles all resources: kafka_topics, kafka.rk, kafka_conf (conditional), opaque, and msk_iam. The conditional destruction of
kafka_confbased on whetherrd_kafka_newsucceeded is correct.
549-553: LGTM: Exit cleanup is correct.MSK IAM cleanup in the exit path is properly implemented and consistent with the init_error cleanup.
src/aws/flb_aws_msk_iam.c (4)
31-55: LGTM: Struct and include updates support new lifecycle management.The added includes (TLS, pthread) and struct fields (is_serverless, cred_tls, provider, lock) properly support the enhanced credential provider lifecycle and concurrency control.
214-512: LGTM: Payload generation refactored with proper credential handling.The updated signature accepts credentials directly, separating concerns. Validation, error handling, and cleanup are comprehensive. The User-Agent addition is a useful enhancement.
514-625: LGTM: OAuth callback with proper concurrency control.The mutex protection for credential provider access is critical and correctly implemented. The lock is held only during credential operations, not payload generation. Error paths properly unlock the mutex, and credentials are destroyed after use.
740-761: LGTM: Mutex initialization and OAuth callback registration.Mutex initialization includes proper error handling and cleanup. The opaque context setup and OAuth callback registration are correctly implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/aws/flb_aws_msk_iam.c (1)
627-760: Verify TLS ownership to avoid potential double‑free in MSK IAM destroy.
flb_aws_msk_iam_register_oauth_cb()createsctx->cred_tlsand passes it intoflb_standard_chain_provider_create(), thenflb_aws_msk_iam_destroy()calls:if (ctx->provider) { flb_aws_provider_destroy(ctx->provider); } if (ctx->cred_tls) { flb_tls_destroy(ctx->cred_tls); }Depending on how
flb_standard_chain_provider_create()andflb_aws_provider_destroy()are implemented, the provider may already own and destroycred_tls. If so, callingflb_tls_destroy(ctx->cred_tls)would double‑free the same TLS handle.This concern mirrors an earlier review comment on the same area; please re‑confirm the ownership contract and adjust either the provider or the MSK IAM destroy path so TLS is released exactly once (e.g., by having the provider own TLS and dropping the explicit
flb_tls_destroy, or vice versa).#!/bin/bash # Inspect AWS provider/TLS ownership to confirm whether cred_tls is freed by the provider. rg -n "struct flb_aws_provider" src/aws include -n -C3 || true rg -n "flb_standard_chain_provider_create" src/aws include -n -C5 || true rg -n "cred_tls" src/aws include -n -C5 || trueAlso applies to: 763-785
plugins/in_kafka/in_kafka.c (1)
271-276: Handleflb_sds_create(conf)OOM before logging/usingctx->sasl_mechanism.
flb_sds_create(conf)can return NULL, butctx->sasl_mechanismis immediately formatted with%s(Line 275) and later used in comparisons. On OOM this is undefined behavior and can crash the process.Consider failing init (or at least skipping SASL‑specific logic) on allocation failure:
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); if (conf) { - ctx->sasl_mechanism = flb_sds_create(conf); - flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); + ctx->sasl_mechanism = flb_sds_create(conf); + if (!ctx->sasl_mechanism) { + flb_plg_error(ins, "failed to allocate SASL mechanism string"); + goto init_error; + } + flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
🧹 Nitpick comments (3)
plugins/in_kafka/in_kafka.c (1)
365-393: Log whenaws_msk_iamwas requested but brokers don’t look like MSK.If
ctx->aws_msk_iamis true butctx->kafka.brokersis unset or doesn’t contain the expected.kafka./.kafka-serverless.and.amazonaws.comsubstrings, MSK IAM is silently skipped. That can be confusing for users who setrdkafka.sasl.mechanism=aws_msk_iambut get no IAM callback.Consider adding an explicit warning in the “else” case to make this visible:
#ifdef FLB_HAVE_AWS_MSK_IAM - if (ctx->aws_msk_iam && ctx->sasl_mechanism && - strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - /* Check if brokers are configured for MSK IAM */ - if (ctx->kafka.brokers && - (strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) && - strstr(ctx->kafka.brokers, ".amazonaws.com")) { + if (ctx->aws_msk_iam && ctx->sasl_mechanism && + strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + /* Check if brokers are configured for MSK IAM */ + if (ctx->kafka.brokers && + (strstr(ctx->kafka.brokers, ".kafka.") || + strstr(ctx->kafka.brokers, ".kafka-serverless.")) && + strstr(ctx->kafka.brokers, ".amazonaws.com")) { ... - } + } + else { + flb_plg_warn(ins, + "aws_msk_iam requested but brokers do not look like MSK; " + "skipping MSK IAM OAuth callback registration"); + } } #endifsrc/aws/flb_aws_msk_iam.c (2)
42-55: Region extraction helper looks safe; add focused tests for broker variants.
extract_region_from_broker()now:
- Strips the port,
- Ensures
.amazonaws.comlies within the host portion,- Safely detects
.vpce.amazonaws.comwith thep - broker >= 5guard, and- Bounds region length to
(0, 32].The pointer arithmetic and length checks look correct, but this logic is subtle and critical. It would benefit from unit tests covering at least:
- Standard MSK:
b-1.example.c1.kafka.us-east-1.amazonaws.com[:9098]- Serverless:
boot-xxx.c1.kafka-serverless.eu-west-1.amazonaws.com[:9098]- VPC endpoint:
vpce-xxx.kafka.ap-southeast-1.vpce.amazonaws.com[:9098]- Malformed inputs: missing region, missing
.amazonaws.com, or trailing junk after.amazonaws.com.Also applies to: 137-212
514-625: OAuth refresh callback concurrency fix is sound; consider minor defensive checks.The callback now:
- Validates
opaqueandmsk_iam_ctx,- Builds the MSK host based on
config->is_serverless,- Locks
config->lockaroundprovider->refresh()andprovider->get_credentials(),- Unlocks before building the payload and interacting with librdkafka, and
- Always destroys
credson all paths.This is a good fix for concurrent access to the shared provider. As a minor hardening step, you could also guard against a null
config->provider(e.g., if future refactors allow partial teardown) before dereferencing it inside the locked section, returning token failure early if it’s missing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
plugins/in_kafka/in_kafka.c(4 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (1)
flb_input_get_property(776-780)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
🪛 Cppcheck (2.18.0)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (5)
plugins/in_kafka/in_kafka.c (4)
279-297: aws_msk_iam branch looks correct; minor allocation path nuance only.The aws_msk_iam handling (flagging explicit use, flipping mechanism to OAUTHBEARER, defaulting
security.protocolto SASL_SSL) is consistent and the secondflb_sds_create("OAUTHBEARER")is now correctly NULL‑checked. The earlyreturn -1on OAUTHBEARER SDS OOM is safe since no additional resources have been created yet beyondctx.No further changes needed here once the initial
ctx->sasl_mechanismOOM is guarded.
352-362: SASL queue enablement for OAUTHBEARER is well‑scoped.Enabling the SASL queue only when
ctx->sasl_mechanism == "OAUTHBEARER"confines the behavior to OAuth flows and avoids surprising other mechanisms. The debug log is also useful for troubleshooting.
417-429: SASL background callbacks enablement is robustly guarded.The background callbacks are only enabled for the OAUTHBEARER mechanism, the error path logs a clear warning and destroys the returned
rd_kafka_error_t *, and the success path logs at info. This is a solid integration of librdkafka’s background refresh with good observability.
491-503: Kafka conf / MSK IAM cleanup paths are consistent with ownership semantics.The init error path correctly distinguishes:
rd_kafka_destroy(ctx->kafka.rk)when rk exists (conf owned by rk),rd_kafka_conf_destroy(kafka_conf)when rk was never created, andflb_aws_msk_iam_destroy(ctx->msk_iam)when MSK IAM was partially initialized.This avoids leaking
kafka_confand MSK IAM resources without risking double‑destroy.Also applies to: 508-512
src/aws/flb_aws_msk_iam.c (1)
214-512: MSK IAM payload builder refactor is thorough and cleanup‑safe.The refactored
build_msk_iam_payload():
- Validates
config->region,host, and key fields oncredsearly with clear error logs.- Builds SigV4 credential, query, canonical request, string‑to‑sign, and signature with
goto erroron any failure.- Correctly frees all intermediate SDS values and the presigned URL in the error path (including
empty_payload_hexandkey), and nullsempty_payload_hexon the success path to avoid double destroy.- Generates a base64 payload, then converts to URL‑safe base64 and strips padding in place.
This looks correct and memory‑safe with no obvious leaks or double‑frees.
a94d17e to
095b814
Compare
@cosmo0920 should be clean now? |
Yes, it's clean now: |
|
You'll need to rebase or update your merge commit |
Implements the core AWS MSK IAM authentication mechanism including: - OAuth callback mechanism for token generation and refresh - Token lifecycle management and expiration handling - Integration with AWS credential providers - SASL/OAUTHBEARER protocol support for librdkafka This provides the foundation for AWS MSK IAM authentication support in Fluent Bit's Kafka plugins. Signed-off-by: Arbin <[email protected]>
Enhance EC2 credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <[email protected]>
Enhance profile credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <[email protected]>
Enhance STS credential provider to better support MSK IAM authentication by improving credential refresh behavior and lifecycle management. Signed-off-by: Arbin <[email protected]>
Update Kafka core functionality to support AWS MSK IAM authentication, including necessary configuration and lifecycle improvements. Signed-off-by: Arbin <[email protected]>
Enable AWS MSK IAM authentication in the Kafka input plugin: - Add AWS MSK IAM configuration options - Integrate with OAuth callback mechanism - Support automatic credential refresh - Add TLS configuration for secure connections Signed-off-by: Arbin <[email protected]>
Enable AWS MSK IAM authentication in the Kafka output plugin: - Add AWS MSK IAM configuration options - Integrate with OAuth callback mechanism - Support automatic credential refresh - Add TLS configuration for secure connections Signed-off-by: Arbin <[email protected]>
Add NULL checks after flb_sds_create() when allocating SASL mechanism strings to prevent crashes on allocation failure. This covers both the initial SASL mechanism configuration and the AWS MSK IAM OAUTHBEARER conversion. Signed-off-by: Arbin <[email protected]>
Replace pointer comparison with offset comparison in VPC endpoint detection to improve safety and clarity. Changes 'p >= broker + 5' to 'p - broker >= 5' to properly check offset within string bounds before accessing p - 5. Signed-off-by: Arbin <[email protected]>
- Remove is_serverless detection logic - Use actual broker hostname instead of constructed host - Fix memory leak in error cleanup path - Add broker_host field to store actual hostname - Update function signature to accept optional region parameter This aligns with official AWS MSK IAM signers behavior where the signature Host must match the TLS SNI/actual connection host. Signed-off-by: Arbin <[email protected]>
- Add aws_region configuration field - Remove hostname pattern check for MSK IAM registration - Pass aws_region to MSK IAM registration function - Support PrivateLink and custom DNS scenarios Signed-off-by: Arbin <[email protected]>
- Add aws_region configuration field - Remove hostname pattern check for MSK IAM registration - Pass aws_region to MSK IAM registration function - Support PrivateLink and custom DNS scenarios Signed-off-by: Arbin <[email protected]>
- Add comprehensive MSK IAM configuration examples - Cover Standard MSK, Serverless, PrivateLink scenarios - Document aws_region parameter usage - Add troubleshooting guide and IAM permissions - Update README with detailed usage instructions Signed-off-by: Arbin <[email protected]>
Remove service_host from struct flb_aws_msk_iam and construct it dynamically in OAuth callback. This eliminates data redundancy since service_host can be derived from region. Also clean up unused struct flb_msk_iam_cb definition and use flb_sds_len() instead of strlen() for consistency. Signed-off-by: Arbin <[email protected]>
Fix critical security issue and improve code quality: 1. Fix potential buffer overread in extract_region_from_broker(): - Changed iteration from 'start = end - 1' to 'start = end' - Check boundary before reading: while (start > broker && *(start - 1) != '.') - Eliminates undefined behavior when broker string is malformed 2. Avoid implicit NUL-termination in base64 URL encoding: - Replace pointer-based iteration with length-based for loop - Remove dependency on flb_base64_encode() NUL-termination behavior - Remove unused variable declaration These changes address security concerns and improve code maintainability. Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
4e43d23 to
1bde90a
Compare
@patrick-stephens rebased. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
examples/kafka_filter/README.md (1)
73-101: Consider addingDescribeGrouppermission for consumer groups.The IAM policy example is helpful, but for consumers that use consumer groups,
kafka-cluster:DescribeGroupmay also be required depending on the consumer group operations. Consider adding a note about this.🔎 Suggested addition
**Note:** Adjust permissions based on your use case: - Consumers need: `Connect`, `DescribeCluster`, `ReadData` +- Consumer groups may also need: `DescribeGroup` - Producers need: `Connect`, `WriteData`src/aws/flb_aws_msk_iam.c (1)
770-793: Mutex destruction should check for initialization.If
pthread_mutex_initfails during registration (lines 748-755), the context is freed without the mutex ever being initialized. However, inflb_aws_msk_iam_destroy,pthread_mutex_destroyis called unconditionally (line 790). While this specific path won't reachdestroysince registration returns NULL on mutex init failure, the code would be more defensive if it tracked mutex initialization state.This is a minor robustness concern since current flow prevents calling destroy on partially initialized context.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
examples/kafka_filter/README.md(1 hunks)examples/kafka_filter/kafka_msk_iam.conf(1 hunks)include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(5 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(1 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- src/aws/flb_aws_credentials_ec2.c
- src/aws/flb_aws_credentials_sts.c
- src/aws/flb_aws_credentials_profile.c
- include/fluent-bit/aws/flb_aws_msk_iam.h
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-12-17T18:45:02.283Z
Learnt from: edsiper
Repo: fluent/fluent-bit PR: 11286
File: src/flb_http_client.c:1657-1715
Timestamp: 2025-12-17T18:45:02.283Z
Learning: In the Fluent Bit OAuth2 implementation (src/flb_oauth2.c), the function flb_oauth2_get_access_token returns a pointer to the cached token stored in the OAuth2 context (ctx->access_token), not a new allocation. The OAuth2 context manages the token lifecycle, so callers must not free the returned token pointer.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (2)
flb_output_get_property(1108-1111)flb_output_set_property(843-1068)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-768)
plugins/in_kafka/in_kafka.c (3)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-768)flb_aws_msk_iam_destroy(771-793)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
src/aws/flb_aws_msk_iam.c (4)
src/aws/flb_aws_credentials.c (2)
flb_aws_credentials_destroy(752-767)flb_aws_provider_destroy(769-791)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
🔇 Additional comments (23)
src/flb_kafka.c (1)
96-100: Correct fix for rdkafka configuration cleanup.Using
rd_kafka_conf_destroy()is the proper way to release anrd_kafka_conf_tobject. The previous use offlb_free()would have caused memory leaks and undefined behavior since librdkafka allocates internal structures.examples/kafka_filter/kafka_msk_iam.conf (1)
1-141: Comprehensive example configuration for MSK IAM authentication.This example file effectively demonstrates the four main MSK deployment scenarios with clear comments explaining when region auto-detection works versus when
aws_regionmust be explicitly set. The notes section provides valuable operational guidance on credentials, IAM permissions, and security protocol defaults.plugins/out_kafka/kafka_config.h (1)
128-138: Consistent MSK IAM configuration fields with input plugin.The
aws_msk_iamflag andaws_regionpointer mirror the changes inin_kafka.h, maintaining consistency between input and output Kafka plugins for MSK IAM configuration.plugins/out_kafka/kafka.c (1)
673-681: Config map entry foraws_regionis correctly defined.The configuration map entry properly exposes the
aws_regionparameter with a clear description explaining when auto-detection works and when explicit configuration is required.plugins/out_kafka/kafka_config.c (6)
61-87: MSK IAM opt-in via SASL mechanism is well-implemented.The approach of intercepting
aws_msk_iamas a SASL mechanism and converting it toOAUTHBEARERinternally is clean. This provides a simple user interface while properly integrating with librdkafka's OAuth machinery. The automaticSASL_SSLdefaulting is a sensible convenience.
209-218: SASL queue enablement for OAUTHBEARER is correctly placed.Enabling the SASL queue before
rd_kafka_new()is essential for background OAuth token refresh to function properly on idle connections. This benefits all OAUTHBEARER users, not just MSK IAM.
220-251: MSK IAM OAuth callback registration is correctly guarded.The double-check on
ctx->aws_msk_iamandOAUTHBEARERmechanism ensures the callback is only registered when explicitly requested. Theprincipal=admininsasl.oauthbearer.configis a placeholder required by librdkafka; the actual identity comes from the AWS credentials used during token generation.
255-268: Correct ownership handling for rdkafka configuration.Setting
ctx->conf = NULLafter successfulrd_kafka_new()prevents double-free since librdkafka takes ownership of the configuration. The comment accurately documents this behavior for future maintainers.
346-353: Cleanup logic correctly handles partial initialization failures.The conditional destruction of
ctx->confonly whenrd_kafkawas never successfully created prevents both memory leaks and double-frees. This properly complements the ownership transfer logic earlier in the file.
270-287: SASL background callbacks enable automatic token refresh on idle connections.The
rd_kafka_sasl_background_callbacks_enable()call automatically forwards the SASL queue to librdkafka's background thread, ensuring OAuth tokens are refreshed even whenrd_kafka_poll()isn't being called regularly. This solves the problem where an application has a custom SASL OAUTHBEARER refresh callback and thus needs to call rd_kafka_poll() before being able to connect to brokers, whereas the background callbacks enable refresh callbacks to be triggered automatically on the librdkafka background thread. Treating failures as warnings is appropriate since token refresh can still work via normal polling paths.plugins/in_kafka/in_kafka.h (1)
57-64: Verify aws_region cleanup pattern and type consistency.The header changes are correct, but aws_region requires clarification. While aws_region is properly defined as a char* field and receives its value from the config map, it is not freed in the destroy function. However, since FLB_CONFIG_MAP_STR fields with set_property=FLB_TRUE are assigned directly from the config system's owned flb_sds_t values (not duplicated), aws_region holds a borrowed reference. This is acceptable if intentional, but it creates an inconsistency: sasl_mechanism (also a string config field) is declared as flb_sds_t and explicitly freed, while aws_region is char* and not freed. For consistency with the plugin's string handling patterns and similar plugins like azure_kusto, consider either declaring aws_region as flb_sds_t (and freeing it), or document that it's a borrowed reference.
plugins/in_kafka/in_kafka.c (7)
271-307: LGTM! Properly handles MSK IAM mechanism switching and NULL checks.The SASL mechanism retrieval, NULL check after
flb_sds_create, and mechanism switching fromaws_msk_iamtoOAUTHBEARERare correctly implemented. Auto-defaultingsecurity.protocoltoSASL_SSLwhen not configured is a sensible UX improvement.
357-366: LGTM! Enabling SASL queue for OAUTHBEARER is correct.This enables librdkafka's internal SASL handling queue, which is essential for background OAuth token refresh. The
strcasecmpcheck properly handles case-insensitive matching.
368-400: Non-fatal config set result is silently ignored.Lines 386-392 set
sasl.oauthbearer.configbut only log an error without failing initialization if the operation fails. If this configuration is optional/fallback, this is acceptable. Otherwise, consider failing initialization.res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config", "principal=admin", errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { flb_plg_error(ins, "failed to set sasl.oauthbearer.config: %s", errstr); // Should this goto init_error? }
407-435: LGTM! Proper Kafka conf ownership handling and SASL background callbacks.Setting
kafka_conf = NULLafter successfulrd_kafka_newcorrectly reflects that librdkafka has taken ownership. The SASL background callbacks are enabled with appropriate warning (not error) on failure since authentication may still work via polling.
497-521: LGTM! Cleanup paths are properly ordered.The cleanup logic correctly:
- Destroys
kafka_topicsif created- Closes and destroys
kafka.rkif created (which also destroys the conf it owns)- Only destroys
kafka_confmanually ifrd_kafka_newwas never reached- Destroys MSK IAM context and opaque in correct order
560-568: LGTM! Exit cleanup properly destroys MSK IAM resources.The MSK IAM context is destroyed before the opaque context, maintaining correct teardown order and matching the initialization order.
614-622: LGTM! aws_region config map entry is well documented.The description clearly explains auto-detection behavior and when explicit configuration is required (PrivateLink/custom DNS scenarios).
src/aws/flb_aws_msk_iam.c (5)
42-54: LGTM! Well-structured MSK IAM context with concurrency protection.The struct properly encapsulates TLS context, AWS provider, region, and mutex for thread-safe credential access. Including
flb_configreference enables proper lifecycle management.
136-203: LGTM! Region extraction handles multiple MSK endpoint formats safely.The function correctly handles:
- MSK Standard brokers
- MSK Serverless brokers
- VPC Endpoint format (
.vpce.amazonaws.com)- Port stripping
The OOB read fix from the past review is correctly applied at line 186:
while (start > broker && *(start - 1) != '.').
205-499: LGTM! Robust SigV4 payload generation with comprehensive cleanup.The
build_msk_iam_payloadfunction:
- Validates all inputs before processing
- Properly constructs the canonical request and string-to-sign
- Handles session token when present
- Uses Base64URL encoding with padding removal
- Has complete cleanup in both success and error paths
- Sets
empty_payload_hex = NULLafter destroy (line 342) to prevent double-free
501-625: LGTM! Thread-safe OAuth callback with proper credential lifecycle.The callback correctly:
- Validates opaque context before use
- Uses mutex to protect provider access (addressed in past review)
- Unlocks immediately after obtaining credentials to minimize lock contention
- Destroys credentials after use (lines 589 and 612)
- Uses consistent service-level hostname (
kafka.{region}.amazonaws.com) for all brokersThe 5-minute token lifetime with librdkafka's 80% refresh threshold ensures tokens refresh at ~4 minutes, preventing expiration on idle connections.
720-755: No double-free issue exists withctx->cred_tls.The TLS context passed to
flb_standard_chain_provider_createis not stored in the provider'scred_tlsfield. The provider struct is allocated withflb_calloc(), which zeros memory, leavingprovider->cred_tlsas NULL. Sinceflb_aws_provider_destroy()only freesprovider->cred_tlsif it's set, the passed TLS pointer is freed only once by the MSK IAM context itself. Ownership remains exclusively with the caller throughout the provider's lifetime.
Summary
Add comprehensive AWS MSK IAM authentication support with simplified configuration and fix OAuth token expiration on idle connections. This PR automatically extracts region and cluster type information from broker addresses, provides explicit opt-in for MSK IAM, enhances OAUTHBEARER token refresh for all OAuth methods, and enables automatic background token refresh to prevent authentication failures on idle connections.
Changes
Key Features
Explicit MSK IAM Opt-in
rdkafka.sasl.mechanism=aws_msk_iamaws_msk_iamflag to track user intentSimplified Configuration
cluster_arnparameterrdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERinternally and registers OAuth callbackAutomatic Region Extraction
Automatic Cluster Type Detection
Universal OAUTHBEARER Enhancements
OAuth Token Lifetime Management
rd_kafka_poll()TLS Support for AWS Credentials
Technical Details
Explicit MSK IAM Activation:
Configuration Simplification:
rdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERand registers OAuth callbackrdkafka.security.protocol=SASL_SSL(if not configured)Region Extraction Logic:
b-1.example.kafka.us-east-1.amazonaws.com)*.kafka.<region>.amazonaws.com*.kafka-serverless.<region>.amazonaws.comCluster Type Detection:
.kafka-serverless.to determine cluster typekafkaorkafka-serverless)Universal OAUTHBEARER Background Processing:
Modified Files
AWS MSK IAM Core (2 files)
include/fluent-bit/aws/flb_aws_msk_iam.h- Updated function signature (removed cluster_arn parameter)src/aws/flb_aws_msk_iam.c- Refactored region extraction and cluster type detection logicKafka Input Plugin (2 files)
plugins/in_kafka/in_kafka.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/in_kafka/in_kafka.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportKafka Output Plugin (3 files)
plugins/out_kafka/kafka_config.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/out_kafka/kafka_config.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportplugins/out_kafka/kafka.c- Removed deprecated configuration mappingAWS Credentials & TLS Support (4 files)
src/aws/flb_aws_credentials_ec2.c- Enhanced TLS support for EC2 metadata credential fetchingsrc/aws/flb_aws_credentials_profile.c- Enhanced TLS support for profile credential fetchingsrc/aws/flb_aws_credentials_sts.c- Enhanced TLS support for STS credential fetchingsrc/flb_kafka.c- Core Kafka integration improvementsTotal: 11 files modified
Configuration
Simple AWS MSK IAM Setup:
[INPUT] Name kafka Brokers b-1.example.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iamNo
cluster_arnor additional AWS-specific parameters needed!Supported Configurations
This PR ensures compatibility with multiple OAuth scenarios:
1. AWS MSK IAM (Fluent Bit convenience syntax)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iam2. librdkafka OIDC (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method oidc rdkafka.sasl.oauthbearer.client.id my_client_id rdkafka.sasl.oauthbearer.client.secret my_secret rdkafka.sasl.oauthbearer.token.endpoint.url https://auth.example.com/token3. librdkafka AWS method (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method awsAll configurations benefit from automatic background token refresh!
Design for Extensibility
This PR establishes a clean, extensible pattern for adding cloud provider IAM authentication:
1. Layered Configuration Approach
2. Explicit Opt-in Pattern
3. Benefits of This Design
4. Future Extensions
This architecture makes it straightforward to add:
Each can be added with the same explicit opt-in pattern without affecting existing functionality.
OAuth Token Expiration Fix
Problem Statement:
After prolonged idle periods (5+ minutes), Kafka outputs experienced authentication failures:
Root Cause:
librdkafka's OAuth token refresh mechanism relies on
rd_kafka_poll()being called regularly. For idle connections,rd_kafka_poll()is only called when producing messages. This is documented in librdkafka issue #3871:Timeline without background callbacks:
Solution: Background Callbacks
librdkafka v1.9.0+ provides
rd_kafka_sasl_background_callbacks_enable()specifically for this use case:Timeline with background callbacks:
Benefits:
rd_kafka_poll()not required)TLS Support
This PR includes proper TLS support for AWS credential fetching:
Features:
Usage:
Testing
Packaging
ok-package-testlabel to test for all targets (requires maintainer to do)Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Configuration Changes
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.