-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add simple_aggregation to kinesis/firehose plugins #11284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a new AWS in-memory aggregation API and implementation, and integrates an optional simple_aggregation path into Kinesis Streams and Firehose plugins with buffer management, event processing, finalization, send-on-full retry, and accompanying unit and runtime tests. Changes
Sequence Diagram(s)sequenceDiagram
actor Input
participant Plugin as Kinesis/Firehose Plugin
participant AggBuf as Aggregation Buffer
participant AWS as AWS Service
Input->>Plugin: deliver event(s)
alt simple_aggregation disabled
Plugin->>AWS: send batched records immediately
else simple_aggregation enabled
loop per event
Plugin->>AggBuf: flb_aws_aggregation_process_event(...)
alt FLB_AWS_AGG_OK
AggBuf->>AggBuf: append JSON + newline
else FLB_AWS_AGG_FULL
AggBuf-->>Plugin: buffer full (signal)
Plugin->>AggBuf: flb_aws_aggregation_finalize(add_final_newline?)
Plugin->>AWS: send aggregated record (encode/compress as configured)
Plugin->>AggBuf: flb_aws_aggregation_reset()
Plugin->>AggBuf: retry current event
else FLB_AWS_AGG_DISCARD/ERROR
Plugin->>Plugin: drop/log and continue
end
end
Plugin->>AggBuf: flb_aws_aggregation_finalize(end of batch)
Plugin->>AWS: send final aggregated record
Plugin->>AggBuf: flb_aws_aggregation_reset()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: defaults Review profile: CHILL Plan: Pro 📒 Files selected for processing (13)
🚧 Files skipped from review as they are similar to previous changes (2)
🧰 Additional context used🧠 Learnings (9)📚 Learning: 2025-09-04T12:35:36.904ZApplied to files:
📚 Learning: 2025-08-29T06:25:02.561ZApplied to files:
📚 Learning: 2025-08-29T06:25:27.250ZApplied to files:
📚 Learning: 2025-08-29T06:24:55.855ZApplied to files:
📚 Learning: 2025-08-31T12:46:11.940ZApplied to files:
📚 Learning: 2025-11-21T06:23:29.770ZApplied to files:
📚 Learning: 2025-08-29T06:24:26.170ZApplied to files:
📚 Learning: 2025-08-29T06:25:27.250ZApplied to files:
📚 Learning: 2025-08-29T06:24:44.797ZApplied to files:
🧬 Code graph analysis (6)plugins/out_kinesis_streams/kinesis_api.c (2)
plugins/out_kinesis_streams/kinesis.c (3)
plugins/out_kinesis_firehose/firehose.c (3)
tests/internal/aws_aggregation.c (1)
tests/runtime/out_kinesis.c (2)
plugins/out_kinesis_firehose/firehose_api.c (1)
🔇 Additional comments (29)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
plugins/out_kinesis_firehose/firehose_api.c (1)
386-480: Consider adding a guard for events array.The aggregated record finalization and sending logic is well-structured. The compression and base64 encoding paths are handled correctly, and memory management appears sound.
One minor consideration: at line 461, the code assumes
buf->eventsis allocated. While this should always be true when called viaprocess_and_send_records, a defensive check could prevent potential crashes in edge cases.Optional defensive check:
+ /* Ensure events array is allocated */ + if (buf->events == NULL) { + flb_plg_error(ctx->ins, "Events buffer not initialized"); + flb_aws_aggregation_reset(&buf->agg_buf); + return -1; + } + /* Create event record */ event = &buf->events[0];src/aws/flb_aws_aggregation.c (1)
113-233: LGTM: Event processing logic is correct.The
flb_aws_aggregation_process_eventfunction properly handles:
- msgpack-to-JSON conversion
- log_key extraction (removing enclosing quotes)
- Size validation at multiple stages
- time_key injection with custom formatting
- Newline appending before aggregation
The function correctly returns different codes for different scenarios (buffer full, record too large, error), enabling proper caller handling.
Optional: Consider future refactoring opportunity.
The event processing logic in
flb_aws_aggregation_process_eventshares significant code with the non-aggregationprocess_eventfunction in kinesis_api.c (msgpack conversion, log_key handling, time_key injection). While the current duplication is acceptable for clarity and the aggregation/non-aggregation paths diverge at the end, a future refactor could extract common logic into shared helper functions.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
include/fluent-bit/aws/flb_aws_aggregation.h(1 hunks)plugins/out_kinesis_firehose/firehose.c(4 hunks)plugins/out_kinesis_firehose/firehose.h(3 hunks)plugins/out_kinesis_firehose/firehose_api.c(7 hunks)plugins/out_kinesis_streams/kinesis.c(4 hunks)plugins/out_kinesis_streams/kinesis.h(3 hunks)plugins/out_kinesis_streams/kinesis_api.c(9 hunks)src/aws/CMakeLists.txt(1 hunks)src/aws/flb_aws_aggregation.c(1 hunks)tests/internal/CMakeLists.txt(1 hunks)tests/internal/aws_aggregation.c(1 hunks)tests/runtime/out_firehose.c(1 hunks)tests/runtime/out_kinesis.c(2 hunks)
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.
Applied to files:
include/fluent-bit/aws/flb_aws_aggregation.h
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.
Applied to files:
plugins/out_kinesis_streams/kinesis_api.cplugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
plugins/out_kinesis_streams/kinesis_api.cplugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.
Applied to files:
plugins/out_kinesis_streams/kinesis_api.cplugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
🧬 Code graph analysis (7)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
flb_aws_aggregation_init(30-46)flb_aws_aggregation_destroy(48-56)flb_aws_aggregation_add(58-77)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
flb_aws_aggregation_init(30-46)flb_aws_aggregation_destroy(48-56)flb_aws_aggregation_add(58-77)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_process_event(113-233)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
new_flush_buffer(311-354)src/aws/flb_aws_aggregation.c (1)
flb_aws_aggregation_init(30-46)plugins/out_kinesis_firehose/firehose_api.c (1)
flush_destroy(1114-1126)
tests/runtime/out_firehose.c (2)
src/flb_lib.c (9)
flb_create(143-225)flb_input(266-276)flb_input_set(305-335)flb_output(279-289)flb_output_set(520-551)flb_start(983-994)flb_lib_push(843-870)flb_stop(1011-1055)flb_destroy(228-263)tests/runtime/out_kinesis.c (1)
flb_test_firehose_success(13-45)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_pack.c (1)
flb_msgpack_to_json(1011-1024)include/fluent-bit/flb_compat.h (1)
gmtime_r(75-81)src/aws/flb_aws_util.c (1)
flb_aws_strftime_precision(1240-1326)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
flb_aws_aggregation_process_event(113-233)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_destroy(48-56)include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
plugins/out_kinesis_firehose/firehose_api.c (3)
plugins/out_kinesis_streams/kinesis_api.c (4)
send_log_events(486-555)process_event_simple_aggregation(213-230)send_aggregated_record(422-483)reset_flush_buf(414-419)src/aws/flb_aws_aggregation.c (4)
flb_aws_aggregation_process_event(113-233)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_destroy(48-56)include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
🔇 Additional comments (35)
src/aws/CMakeLists.txt (1)
18-18: LGTM!The aggregation source file is correctly added to the AWS library build. The unconditional inclusion is appropriate since aggregation is a core feature of the kinesis/firehose plugins.
tests/internal/CMakeLists.txt (1)
141-141: LGTM!The aggregation unit test is correctly added under the
FLB_AWSconditional, following the established pattern for AWS-related tests.plugins/out_kinesis_firehose/firehose.h (3)
29-29: LGTM!The aggregation header include is correctly placed with other Fluent Bit headers.
60-62: LGTM!The aggregation buffer and initialization flag are appropriately added to the flush structure. The
agg_buf_initializedflag properly tracks whether cleanup is needed inflush_destroy.
102-102: LGTM!The
simple_aggregationconfiguration option is correctly added to the context structure.plugins/out_kinesis_streams/kinesis.h (3)
29-29: LGTM!The aggregation header include is correctly placed, consistent with the firehose plugin.
60-62: LGTM!The aggregation buffer fields are consistent with the firehose plugin structure.
100-100: LGTM!The
simple_aggregationconfiguration option is correctly added to the Kinesis context structure.tests/internal/aws_aggregation.c (10)
1-27: LGTM!Well-structured test file with appropriate includes and constant definition. The
MAX_RECORD_SIZEmatches the expected usage in production code.
29-47: LGTM!Good lifecycle test verifying init sets expected values and destroy properly cleans up state.
49-107: LGTM!Comprehensive tests for single and multiple record additions with proper content verification via
memcmp.
109-163: LGTM!Good boundary testing for buffer-full scenarios, including both oversized single records and gradual fill to capacity.
165-200: LGTM!Properly tests both Kinesis Streams mode (no newline) and Firehose mode (with newline) finalization paths.
202-254: LGTM!Good coverage of edge cases: empty buffer finalization and reset/reuse cycles.
256-322: LGTM!Excellent coverage of large-scale additions and NULL parameter handling. The NULL tests ensure robustness against invalid inputs.
324-404: LGTM!Good boundary condition tests including exact capacity fill and multiple reset cycles.
406-522: LGTM!Useful edge case tests covering tiny buffers, double finalize, add-after-finalize, and alternating patterns. These tests verify the API behaves correctly in non-standard usage scenarios.
524-544: LGTM!Comprehensive test list covering all aggregation API functions and edge cases.
plugins/out_kinesis_firehose/firehose_api.c (6)
41-41: LGTM!The aggregation header is correctly included alongside other AWS headers.
57-58: LGTM!Forward declaration is necessary since
send_aggregated_recordcallssend_log_events.
148-169: LGTM!Clean wrapper function that delegates to the shared aggregation implementation. Parameters are correctly passed including
MAX_EVENT_SIZEas the limit.
571-600: LGTM!The simple aggregation path is correctly integrated into
add_event. The retry logic properly handles buffer-full conditions by sending the aggregated record and retrying.
761-767: LGTM!The final send logic correctly branches between aggregation mode and normal mode for any remaining data.
1114-1125: LGTM!Proper cleanup of the aggregation buffer when
agg_buf_initializedflag is set. This prevents memory leaks and double-free issues.plugins/out_kinesis_streams/kinesis_api.c (5)
55-56: LGTM: Forward declaration added for function ordering.The forward declaration is necessary since
send_aggregated_record(defined at line 422) callssend_log_eventsbefore its definition.
209-230: LGTM: Clean delegation to shared aggregation API.The wrapper appropriately delegates to the shared
flb_aws_aggregation_process_eventimplementation, passing all required context from the Kinesis-specific structures.
573-601: LGTM: Aggregation path correctly handles buffer-full scenario.The retry logic properly handles the buffer-full case (ret == 1) by sending the accumulated aggregated record and retrying the current event. The mechanism prevents infinite loops since oversized individual records return 2 (discard), which breaks the retry cycle.
751-756: LGTM: Proper end-of-batch handling and resource cleanup.The end-of-batch logic correctly routes through the aggregation path when enabled, and the cleanup path properly destroys the aggregation buffer when initialized.
Also applies to: 1104-1106
421-483: Buffer sizing concern does not reflect actual constant values.The buffer size constants are appropriately defined with sufficient margin.
PUT_RECORDS_PAYLOAD_SIZEis 5,242,880 bytes (~5.24 MB), while the maximum base64-encoded aggregated record is approximately 1.4 MB (MAX_EVENT_SIZE of 1,048,556 bytes with 33% base64 overhead). The check at line 456 is defensive programming but will not realistically trigger under normal conditions, as the buffer is 3.76x larger than the maximum possible encoded output.Likely an incorrect or invalid review comment.
plugins/out_kinesis_firehose/firehose.c (1)
317-354: LGTM: Firehose aggregation initialization consistent with Kinesis.The changes properly integrate aggregation support:
- Function signature updated to accept context for conditional initialization
- Aggregation buffer initialized when
simple_aggregationis enabled- Proper error handling with buffer destruction on initialization failure
- Configuration option added with appropriate defaults and description
The implementation mirrors the Kinesis Streams changes, ensuring consistency across both AWS output plugins.
Also applies to: 371-371, 524-529
tests/runtime/out_firehose.c (1)
192-524: LGTM: Comprehensive test coverage for Firehose aggregation.The nine new tests cover a wide range of scenarios:
- Basic aggregation functionality
- Integration with time_key and log_key features
- Volume testing with many records
- Interaction with compression
- Combined parameter configurations
- Edge cases (empty records, error handling)
- Custom time formats
All tests follow the established testing patterns and are properly registered.
Also applies to: 533-541
plugins/out_kinesis_streams/kinesis.c (1)
311-351: LGTM: Kinesis Streams aggregation initialization.The implementation is consistent with the Firehose changes:
- Context-aware buffer initialization
- Proper error handling with cleanup on failure
- Configuration option matching Firehose conventions
Also applies to: 368-368, 518-523
include/fluent-bit/aws/flb_aws_aggregation.h (1)
1-90: LGTM: Well-designed aggregation API.The header defines a clean, focused API:
- Simple buffer structure with clear fields
- Lifecycle management functions (init/destroy/reset)
- Data manipulation functions (add/finalize)
- High-level event processing function
- Return value semantics are clearly documented
Based on learnings, function descriptions are not required for Fluent Bit headers.
tests/runtime/out_kinesis.c (1)
298-443: LGTM: Adequate test coverage for Kinesis Streams aggregation.The four new tests cover key scenarios:
- Basic aggregation with multiple small records
- Integration with time_key and log_key features
- Volume testing with 50 records to validate aggregation efficiency
Tests follow established patterns and are properly registered. Coverage aligns with Firehose testing approach.
Also applies to: 455-458
src/aws/flb_aws_aggregation.c (1)
30-107: LGTM: Core aggregation functions are well-implemented.The buffer lifecycle and manipulation functions demonstrate:
- Proper null pointer checks
- Correct memory management with flb_malloc/flb_free
- Appropriate return codes matching the API documentation
- Buffer overflow prevention in
flb_aws_aggregation_add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
158fb26 to
8376cf0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation concept is good but our commit message linter complains error and we usually separate commits for this type of PR like:
aws: aws_aggregation: Implement aggregation operationout_kinesis_firehose: Add simple_aggregation operationout_kinesis_stream: Add simple_aggregation operationtests: internal: aws_aggregationout_kinesis_firehose: tests: Add a test case of simple_aggregation operationout_kinesis_stream: tests: Add a test case of Add simple_aggregation operation
Plus, compilation errors are also happening. Could you take a look on this?
8376cf0 to
dc1eb6b
Compare
Should be updated in the last force push. The build failure was some weird refactoring bug that removed the struct value which not needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
tests/runtime/out_kinesis.c (2)
298-443: Consider adding assertions to verify aggregation behavior.The new aggregation tests successfully set up the plugin configuration and push records, but they rely solely on the test mode mocking and don't include explicit assertions to verify that:
- Records were actually aggregated into fewer API calls
- The aggregated payload format is correct
- Buffer boundaries are respected
While the sleep-and-succeed pattern is consistent with existing tests in this file, consider whether additional test infrastructure could validate the aggregation behavior more thoroughly.
435-438: Verify buffer size for formatted records.The loop uses a 100-byte buffer with
snprintfto format records. While this should be sufficient for the test pattern[1, {"id":%d,"msg":"test"}], consider usingsizeofin thesnprintfcall for explicit bounds checking:🔎 Suggested improvement:
for (i = 0; i < 50; i++) { - snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i); + ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i); + if (ret >= sizeof(record)) { + flb_plg_warn(ctx->ins, "Record truncated in test"); + } flb_lib_push(ctx, in_ffd, record, strlen(record)); }src/aws/flb_aws_aggregation.c (1)
179-183: Handle early return after time formatting failure.At lines 179-182, when
flb_aws_strftime_precisionreturns 0 (formatting failed), the code logs an error and freesout_buf, but then falls through to the else block at line 184 which will try to useout_buf.Looking more carefully, when
len == 0, the code freesout_bufat line 182, then at line 184 there's anelseblock that won't execute because we're in theif (len == 0)branch. So the control flow continues to line 204.Actually, I misread this. Let me recheck:
if (len == 0) { flb_plg_error(ins, "Failed to add time_key %s to record, %s", time_key, stream_name); flb_free(out_buf); } else { // time_key_ptr manipulation ... }This is correct - when len == 0, it frees out_buf and falls through. When len > 0, it goes into the else block. The code then continues to line 204 for the size check. This is safe because
writtenhasn't been modified when len == 0, so the size check still works.However, there's no explicit return or discard after time formatting failure. The record will be added without the time_key, which may not be the intended behavior.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
include/fluent-bit/aws/flb_aws_aggregation.h(1 hunks)plugins/out_kinesis_firehose/firehose.c(4 hunks)plugins/out_kinesis_firehose/firehose.h(3 hunks)plugins/out_kinesis_firehose/firehose_api.c(7 hunks)plugins/out_kinesis_streams/kinesis.c(4 hunks)plugins/out_kinesis_streams/kinesis.h(3 hunks)plugins/out_kinesis_streams/kinesis_api.c(9 hunks)src/aws/CMakeLists.txt(1 hunks)src/aws/flb_aws_aggregation.c(1 hunks)tests/internal/CMakeLists.txt(1 hunks)tests/internal/aws_aggregation.c(1 hunks)tests/runtime/out_firehose.c(1 hunks)tests/runtime/out_kinesis.c(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_kinesis_streams/kinesis.h
🧰 Additional context used
🧠 Learnings (9)
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.cplugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.cplugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.cplugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
plugins/out_kinesis_firehose/firehose_api.cplugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-09-04T12:35:36.904Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:3275-3282
Timestamp: 2025-09-04T12:35:36.904Z
Learning: The out_s3 plugin intentionally uses a simple numeric comparison for retry_limit (chunk->failures >= ctx->ins->retry_limit) rather than the standard Fluent Bit pattern that checks for FLB_OUT_RETRY_UNLIMITED (-1). The maintainer wants to keep this current behavior for consistency within the plugin.
Applied to files:
plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.
Applied to files:
plugins/out_kinesis_firehose/firehose.h
🧬 Code graph analysis (8)
plugins/out_kinesis_firehose/firehose_api.c (1)
src/aws/flb_aws_aggregation.c (4)
flb_aws_aggregation_process_event(113-233)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_destroy(48-56)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
flb_aws_aggregation_process_event(113-233)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_destroy(48-56)include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
tests/runtime/out_kinesis.c (1)
src/flb_lib.c (9)
flb_create(143-225)flb_input(266-276)flb_input_set(305-335)flb_output(279-289)flb_output_set(520-551)flb_start(983-994)flb_lib_push(843-870)flb_stop(1011-1055)flb_destroy(228-263)
plugins/out_kinesis_streams/kinesis.c (2)
plugins/out_kinesis_firehose/firehose.c (1)
new_flush_buffer(317-357)src/aws/flb_aws_aggregation.c (1)
flb_aws_aggregation_init(30-46)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
flb_aws_aggregation_init(30-46)flb_aws_aggregation_destroy(48-56)flb_aws_aggregation_add(58-77)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
flb_aws_aggregation_init(30-46)flb_aws_aggregation_destroy(48-56)flb_aws_aggregation_add(58-77)flb_aws_aggregation_finalize(79-100)flb_aws_aggregation_reset(102-107)flb_aws_aggregation_process_event(113-233)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_pack.c (1)
flb_msgpack_to_json(1011-1024)include/fluent-bit/flb_compat.h (1)
gmtime_r(75-81)src/aws/flb_aws_util.c (1)
flb_aws_strftime_precision(1240-1326)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
new_flush_buffer(311-354)src/aws/flb_aws_aggregation.c (1)
flb_aws_aggregation_init(30-46)plugins/out_kinesis_firehose/firehose_api.c (1)
flush_destroy(1121-1133)
🔇 Additional comments (23)
src/aws/CMakeLists.txt (1)
18-18: LGTM!The new aggregation source file is correctly added to the AWS library build list, following the existing pattern for unconditional AWS components.
tests/internal/CMakeLists.txt (1)
141-141: LGTM!The aggregation unit test is correctly added within the
FLB_AWSconditional block, following the established pattern for AWS test files.plugins/out_kinesis_firehose/firehose.h (1)
29-29: LGTM!The header changes are well-structured:
- New include for the aggregation API.
agg_bufandagg_buf_initializedfields instruct flushfor per-flush aggregation state.simple_aggregationflag instruct flb_firehosefor runtime configuration.The
agg_buf_initializedflag pattern correctly mirrors the Kinesis Streams implementation.Also applies to: 60-63, 102-102
tests/internal/aws_aggregation.c (1)
1-544: Excellent test coverage!The test suite comprehensively covers the AWS aggregation API:
- Lifecycle management (init/destroy/reset)
- Single and multiple record additions
- Buffer full detection and boundary conditions
- Both Firehose (with newline) and Kinesis (without newline) finalization modes
- Edge cases (NULL params, empty buffer, tiny buffer, exact boundaries)
- Reuse patterns (reset cycles, add after finalize, double finalize)
This thorough coverage will help catch regressions in the aggregation module.
plugins/out_kinesis_firehose/firehose_api.c (3)
148-169: LGTM!The
process_event_simple_aggregationfunction is a clean wrapper around the sharedflb_aws_aggregation_process_eventimplementation, correctly passing the Firehose-specific parameters includingMAX_EVENT_SIZEand the delivery stream name.
386-480: Aggregation send logic is well-structured.The
send_aggregated_recordfunction correctly handles:
- Finalization with
add_final_newline=1for Firehose- Compression and base64 encoding paths
- Proper memory management for both paths
- Creating a single event record for the aggregated payload
- Buffer reset after sending
768-774: LGTM!Correct finalization and cleanup:
- Lines 768-774: Properly routes to
send_aggregated_recordorsend_log_eventsbased onsimple_aggregationflag.- Lines 1124-1126: Correctly checks
agg_buf_initializedbefore calling destroy, preventing double-free or use-after-free issues.Also applies to: 1124-1126
plugins/out_kinesis_streams/kinesis.c (2)
311-354: LGTM!The aggregation initialization in
new_flush_bufferis correctly implemented:
- Accepts context parameter for config access
- Initializes
agg_buf_initializedtoFLB_FALSEbefore attempting init- Properly handles init failure with cleanup
- Sets
agg_buf_initializedtoFLB_TRUEonly on success- Mirrors the Firehose implementation pattern for consistency
518-523: LGTM!The
simple_aggregationconfig map entry is correctly defined with:
- Boolean type with
"false"default- Proper offset binding to
struct flb_kinesis- Clear description of the feature's purpose
plugins/out_kinesis_firehose/firehose.c (2)
317-357: LGTM: Aggregation buffer initialization is well-guarded.The changes properly integrate aggregation support:
- The
ctxparameter enables access to thesimple_aggregationflag- Initialization failure correctly cleans up and returns NULL
- The
agg_buf_initializedflag tracks state for proper cleanupThe integration follows the same pattern used in the Kinesis Streams plugin (see
plugins/out_kinesis_streams/kinesis.c:310-353), ensuring consistency across both plugins.
524-529: Config option well-documented.The
simple_aggregationconfiguration properly:
- Defaults to
false(opt-in behavior)- Includes clear description of benefits
- Uses boolean type for simple on/off control
- Aligns with the similar option in kinesis_streams
tests/runtime/out_firehose.c (1)
192-524: Good coverage of edge cases with aggregation.The Firehose aggregation tests are more comprehensive than the Kinesis tests, covering:
- Compression combined with aggregation (line 339)
- Empty/minimal records (line 415)
- Error handling during aggregation (line 450)
- Custom time formats (line 489)
This broader test coverage helps validate the aggregation feature works correctly across various configuration combinations.
plugins/out_kinesis_streams/kinesis_api.c (5)
55-56: Forward declaration enables clean separation.The forward declaration of
send_log_eventsallows the newsend_aggregated_recordfunction to call it without reordering the entire file.
209-230: Effective wrapper delegates to shared implementation.The
process_event_simple_aggregationwrapper correctly forwards all parameters to the sharedflb_aws_aggregation_process_eventAPI, avoiding code duplication between Kinesis Streams and Firehose plugins.
573-608: Aggregation retry logic addresses previous concern.The guard at lines 584-590 checks if the aggregation buffer is empty after attempting to send, preventing the infinite loop flagged in the previous review. When
ret == 0 && buf->agg_buf.agg_buf_offset == 0, the code discards the unprocessable record.However, as noted in the previous comment, there may be an issue with checking
agg_buf_offsetafterflb_aws_aggregation_resethas already been called. This needs verification to ensure the guard works as intended.
758-763: End-of-batch logic correctly handles both modes.The conditional at lines 758-763 properly routes the final send through
send_aggregated_recordwhen aggregation is enabled, otherwise using the standardsend_log_eventspath. This ensures any buffered aggregated records are flushed at the end of processing.
1111-1113: Aggregation buffer cleanup is properly guarded.The destruction logic at lines 1111-1113 checks
agg_buf_initializedbefore callingflb_aws_aggregation_destroy, preventing double-free or use of uninitialized buffers.include/fluent-bit/aws/flb_aws_aggregation.h (1)
20-90: Well-designed aggregation API with clear documentation.The header provides a clean public API for AWS aggregation:
- Clear structure definition with descriptive field names
- All functions documented with return value semantics
- Return codes are consistent and well-explained (0=success, -1=error, 1=buffer full, 2=discard)
- Include guards properly defined
The API design allows both Kinesis Streams and Firehose plugins to share the same aggregation logic, reducing code duplication.
src/aws/flb_aws_aggregation.c (5)
30-46: LGTM: Initialization properly validates and allocates.The
flb_aws_aggregation_initfunction:
- Validates the buffer pointer
- Allocates the requested size
- Initializes all fields (size and offset)
- Returns appropriate error codes
48-56: LGTM: Destruction safely handles all cases.The
flb_aws_aggregation_destroyfunction:
- Checks both
bufandbuf->agg_bufbefore freeing- Nulls the pointer after freeing
- Resets size and offset to maintain consistency
58-77: LGTM: Bounds checking prevents buffer overflow.The
flb_aws_aggregation_addfunction correctly:
- Validates all input parameters
- Checks if adding data would exceed
max_record_sizebefore copying- Returns 1 (buffer full) when limit would be exceeded, allowing caller to finalize and retry
- Uses safe
memcpyafter validation
79-100: LGTM: Finalization handles both Kinesis and Firehose cases.The
flb_aws_aggregation_finalizefunction:
- Returns -1 when there's no data to finalize (empty buffer)
- The
add_final_newlineparameter allows Firehose (which needs it) and Kinesis Streams (which doesn't) to use the same API- Checks buffer size before adding the final newline
113-233: Shared event processing logic eliminates duplication.The
flb_aws_aggregation_process_eventfunction consolidates the msgpack-to-JSON conversion, log_key extraction, time_key injection, and aggregation buffer management that was previously duplicated between Kinesis and Firehose plugins.The function correctly:
- Converts msgpack to JSON
- Discards empty messages (return 2)
- Handles log_key by stripping enclosing quotes
- Checks size limits before and after time_key addition
- Appends newline
- Attempts to add to aggregation buffer
- Returns appropriate codes: 0=success, 1=buffer full, 2=discard, -1=error
| /* Finalize and send aggregated record (Kinesis Streams version - no final newline) */ | ||
| static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) { | ||
| int ret; | ||
| size_t agg_size; | ||
| size_t b64_len; | ||
| struct kinesis_event *event; | ||
|
|
||
| /* Finalize without final newline (Kinesis Streams doesn't need it) */ | ||
| ret = flb_aws_aggregation_finalize(&buf->agg_buf, 0, &agg_size); | ||
| if (ret < 0) { | ||
| return 0; | ||
| } | ||
|
|
||
| /* Base64 encode the aggregated record */ | ||
| size_t size = (agg_size * 1.5) + 4; | ||
| if (buf->event_buf == NULL || buf->event_buf_size < size) { | ||
| flb_free(buf->event_buf); | ||
| buf->event_buf = flb_malloc(size); | ||
| buf->event_buf_size = size; | ||
| if (buf->event_buf == NULL) { | ||
| flb_errno(); | ||
| return -1; | ||
| } | ||
| } | ||
|
|
||
| ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, | ||
| (unsigned char *) buf->agg_buf.agg_buf, agg_size); | ||
| if (ret != 0) { | ||
| flb_errno(); | ||
| return -1; | ||
| } | ||
| agg_size = b64_len; | ||
|
|
||
| /* Copy to tmp_buf */ | ||
| if (buf->tmp_buf_size < agg_size) { | ||
| flb_plg_error(ctx->ins, "Aggregated record too large for buffer"); | ||
| flb_aws_aggregation_reset(&buf->agg_buf); | ||
| return 0; | ||
| } | ||
|
|
||
| memcpy(buf->tmp_buf, buf->event_buf, agg_size); | ||
|
|
||
| /* Create event record */ | ||
| event = &buf->events[0]; | ||
| event->json = buf->tmp_buf; | ||
| event->len = agg_size; | ||
| event->timestamp.tv_sec = 0; | ||
| event->timestamp.tv_nsec = 0; | ||
| buf->event_index = 1; | ||
|
|
||
| /* Calculate data_size for the payload */ | ||
| buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN; | ||
| buf->data_size += strlen(ctx->stream_name); | ||
| buf->data_size += agg_size + PUT_RECORDS_PER_RECORD_LEN; | ||
|
|
||
| /* Send the aggregated record */ | ||
| ret = send_log_events(ctx, buf); | ||
|
|
||
| /* Reset aggregation buffer */ | ||
| flb_aws_aggregation_reset(&buf->agg_buf); | ||
|
|
||
| return ret; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how agg_buf_offset is used throughout the codebase
echo "=== Checking agg_buf_offset usage in kinesis_api.c ==="
rg -n -C3 'agg_buf\.agg_buf_offset' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Checking aggregation reset calls ==="
rg -n -C3 'flb_aws_aggregation_reset' plugins/out_kinesis_streams/
echo -e "\n=== Checking similar pattern in firehose_api.c ==="
rg -n -C3 'agg_buf\.agg_buf_offset == 0' plugins/out_kinesis_firehose/Repository: fluent/fluent-bit
Length of output: 2492
🏁 Script executed:
#!/bin/bash
# Get the function containing line 584 to understand the caller context
echo "=== Context around line 584 (caller of send_aggregated_record) ==="
sed -n '560,600p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Check add_record_to_buffer function to see the loop ==="
rg -n -B5 'send_aggregated_record' plugins/out_kinesis_streams/kinesis_api.c | head -40Repository: fluent/fluent-bit
Length of output: 2420
🏁 Script executed:
#!/bin/bash
# Get the full send_aggregated_record function to see all return paths
echo "=== Full send_aggregated_record function ==="
sed -n '421,483p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Check what process_event_simple_aggregation returns ==="
rg -n -A20 'static int process_event_simple_aggregation' plugins/out_kinesis_streams/kinesis_api.c | head -50
echo -e "\n=== Check the retry loop after send_aggregated_record ==="
sed -n '577,615p' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 4607
🏁 Script executed:
#!/bin/bash
# Check if there's a goto loop back to retry_add_event_agg
echo "=== Checking for the retry loop structure ==="
sed -n '575,625p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Check reset_flush_buf behavior ==="
rg -n -A10 'static void reset_flush_buf' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 2287
🏁 Script executed:
#!/bin/bash
# Check send_log_events return values and what they mean
echo "=== send_log_events function signature and early returns ==="
rg -n -A30 'static int send_log_events' plugins/out_kinesis_streams/kinesis_api.c | head -50
echo -e "\n=== Check early return at line 492-500 mentioned in review ==="
sed -n '486,505p' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 2497
🏁 Script executed:
#!/bin/bash
# Check the full send_log_events function to see all return paths
echo "=== Full send_log_events function returns ==="
rg -n 'return' plugins/out_kinesis_streams/kinesis_api.c | grep -A1 -B1 'send_log_events' | tail -20
echo -e "\n=== Check end of send_log_events function ==="
sed -n '486,570p' plugins/out_kinesis_streams/kinesis_api.c | tail -40Repository: fluent/fluent-bit
Length of output: 1350
🏁 Script executed:
#!/bin/bash
# Check if there are any error paths in send_aggregated_record that return without resetting
echo "=== Tracing all return paths in send_aggregated_record ==="
sed -n '421,483p' plugins/out_kinesis_streams/kinesis_api.c | grep -n 'return'
echo -e "\n=== Check what process_event_simple_aggregation returns ==="
rg -n -B2 -A10 'process_event_simple_aggregation.*ret' plugins/out_kinesis_streams/kinesis_api.c | grep -A15 'ret = 1'Repository: fluent/fluent-bit
Length of output: 289
🏁 Script executed:
#!/bin/bash
# Let's get the complete picture of what happens after send_aggregated_record returns
echo "=== Full context of the retry loop and guard ==="
sed -n '575,605p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Check what flb_aws_aggregation_reset does (in external lib, but check if it's called again) ==="
rg -n 'flb_aws_aggregation_reset' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Verify: does reset_flush_buf reset agg_buf? ==="
sed -n '414,420p' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 1762
🏁 Script executed:
#!/bin/bash
# Let's verify the exact sequence: what happens when guard triggers vs normal case
echo "=== Guard scenario (line 584-590) ==="
sed -n '584,595p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Check if the current record has already been sent when guard triggers ==="
echo "In send_aggregated_record line 477: ret = send_log_events(ctx, buf);"
echo "send_log_events processes buf->events and sends them"
echo "Then at line 480: flb_aws_aggregation_reset(&buf->agg_buf);"
echo "This resets the aggregation buffer AFTER sending"
echo ""
echo "So if guard triggers after successful send, the data is already sent."
echo "Let's verify if reset_flush_buf at line 590 is correct behavior:"
sed -n '588,595p' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 1299
🏁 Script executed:
#!/bin/bash
# Check the final end to see if there's another loop at the end of process_and_send
echo "=== Check where send_aggregated_record is called from the end processing ==="
sed -n '750,770p' plugins/out_kinesis_streams/kinesis_api.c
echo -e "\n=== Summary of all return paths in send_aggregated_record ==="
echo "Line 432: return 0 (finalize failed) - NO reset before return"
echo "Line 457-458: return 0 (buffer too large) - WITH reset before return"
echo "Line 482: return ret (normal) - WITH reset before return (line 480)"
echo ""
echo "So two of three return paths DO reset agg_buf_offset to 0"
echo ""
echo "=== Verify the guard condition logic ==="
sed -n '584,591p' plugins/out_kinesis_streams/kinesis_api.cRepository: fluent/fluent-bit
Length of output: 1368
Fix guard logic to properly detect infinite loop conditions.
The guard at line 584 checks ret == 0 && buf->agg_buf.agg_buf_offset == 0 to detect infinite loops, but the condition is unreliable. When send_aggregated_record succeeds (line 477), it resets the aggregation buffer at line 480 before returning. This causes the guard to trigger for all successful sends, not just infinite loop cases. The guard then logs "No data was aggregated" when in fact the data was successfully aggregated and sent. While data is not lost (it was sent before reset), the misleading warning obscures the actual intent and could hide genuine infinite loop conditions. The guard needs to distinguish between "nothing to aggregate" and "successfully sent and reset," perhaps by checking the state before aggregation finalize or by tracking whether data was actually sent.
🤖 Prompt for AI Agents
In plugins/out_kinesis_streams/kinesis_api.c around lines 421-483 (and the guard
at ~584), the guard that logs "No data was aggregated" misfires because
send_aggregated_record resets agg_buf before return; record the aggregation
state before finalizing or return a distinct status so the caller can
distinguish "nothing to aggregate" from "successfully sent then reset."
Specifically: capture pre_finalize_offset = buf->agg_buf.agg_buf_offset (or add
a return code for "sent") before calling
flb_aws_aggregation_finalize/send_aggregated_record, and change the guard to
check pre_finalize_offset == 0 (or check the sent return code) when deciding to
log the warning; leave error paths unchanged.
dc1eb6b to
28b1d54
Compare
- Add helper methods to support aggregating records for aws outputs Signed-off-by: Shelby Hagman <[email protected]>
- Adds testing for aws_aggregation Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_firehose Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_kinesis Signed-off-by: Shelby Hagman <[email protected]>
28b1d54 to
72d6eb4
Compare
firehose
kinesis
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent/fluent-bit-docs#2299
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.