Skip to content

Conversation

@ShelbyZ
Copy link

@ShelbyZ ShelbyZ commented Dec 12, 2025

  • Bring over simple_aggregation from golang plugin versions
  • Add unit/runtime tests

firehose

          [OUTPUT]
            Name                kinesis_firehose
            Match               application.*
            region              ${AWS_REGION}
            delivery_stream     {{ .Values.firehoseStreamName }}
            retry_limit         3
            workers             1
            simple_aggregation  On

kinesis

          [OUTPUT]
            Name                kinesis_streams
            Match               application.*
            region              ${AWS_REGION}
            stream              {{ .Values.dataStreamName }}
            auto_retry_requests On
            simple_aggregation  On

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • [WIP] Debug log output from testing the change
  • [WIP] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#2299

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Optional "simple_aggregation" mode for Kinesis Streams and Firehose outputs (default: off) to combine multiple records into single API requests, with newline/time/log-key handling and automatic flush on buffer-full.
    • Shared AWS in-memory aggregation support added for consistent aggregation across AWS outputs.
  • Tests
    • New unit and runtime tests covering aggregation behavior, edge cases, compression, time/log key variants, and large/boundary workloads.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 12, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new AWS in-memory aggregation API and implementation, and integrates an optional simple_aggregation path into Kinesis Streams and Firehose plugins with buffer management, event processing, finalization, send-on-full retry, and accompanying unit and runtime tests.

Changes

Cohort / File(s) Summary
New public aggregation API
include/fluent-bit/aws/flb_aws_aggregation.h
Adds struct flb_aws_agg_buffer and public APIs: flb_aws_aggregation_init, destroy, reset, add, finalize, and process_event with documented return codes and buffer semantics.
Aggregation implementation & build
src/aws/flb_aws_aggregation.c, src/aws/CMakeLists.txt
Implements in-memory aggregation (msgpack→JSON conversion, size checks, time key injection, newline handling), exportable APIs, and adds flb_aws_aggregation.c to flb-aws build.
Firehose plugin integration
plugins/out_kinesis_firehose/firehose.h, plugins/out_kinesis_firehose/firehose.c, plugins/out_kinesis_firehose/firehose_api.c
Adds agg_buf and agg_buf_initialized to flush struct, simple_aggregation flag to context/config, threads ctx into new_flush_buffer, initializes/destroys aggregation buffer, integrates simple aggregation processing, finalize/send/reset and buffer-full retry logic.
Kinesis Streams plugin integration
plugins/out_kinesis_streams/kinesis.h, plugins/out_kinesis_streams/kinesis.c, plugins/out_kinesis_streams/kinesis_api.c
Mirrors Firehose changes: include aggregation header, add agg fields to flush, simple_aggregation flag to ctx, update new_flush_buffer signature, add aggregation processing, finalize/send/reset and cleanup.
Unit tests
tests/internal/CMakeLists.txt, tests/internal/aws_aggregation.c
Adds unit tests for aggregation API covering init/destroy, add/finalize behavior, boundary conditions, resets, large-scale additions, and NULL/edge-case handling; adds test source to CMake.
Runtime tests
tests/runtime/out_firehose.c, tests/runtime/out_kinesis.c
Adds multiple runtime tests for Firehose (9) and Kinesis (4) exercising simple aggregation with time/log keys, compression, many records, error cases, and formatting (note: some duplicated test definitions in Kinesis tests).

Sequence Diagram(s)

sequenceDiagram
    actor Input
    participant Plugin as Kinesis/Firehose Plugin
    participant AggBuf as Aggregation Buffer
    participant AWS as AWS Service

    Input->>Plugin: deliver event(s)
    alt simple_aggregation disabled
        Plugin->>AWS: send batched records immediately
    else simple_aggregation enabled
        loop per event
            Plugin->>AggBuf: flb_aws_aggregation_process_event(...)
            alt FLB_AWS_AGG_OK
                AggBuf->>AggBuf: append JSON + newline
            else FLB_AWS_AGG_FULL
                AggBuf-->>Plugin: buffer full (signal)
                Plugin->>AggBuf: flb_aws_aggregation_finalize(add_final_newline?)
                Plugin->>AWS: send aggregated record (encode/compress as configured)
                Plugin->>AggBuf: flb_aws_aggregation_reset()
                Plugin->>AggBuf: retry current event
            else FLB_AWS_AGG_DISCARD/ERROR
                Plugin->>Plugin: drop/log and continue
            end
        end
        Plugin->>AggBuf: flb_aws_aggregation_finalize(end of batch)
        Plugin->>AWS: send final aggregated record
        Plugin->>AggBuf: flb_aws_aggregation_reset()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus review on:
    • src/aws/flb_aws_aggregation.c — buffer allocation, bounds checks, return-code semantics, and thread-safety assumptions.
    • Integration in plugins/*_api.c — retry-on-full flow, finalize/send/reset semantics, compression/base64 paths.
    • Changes to new_flush_buffer signatures and flush teardown for resource leaks.
    • Tests: duplicated entries and correctness in tests/runtime/out_kinesis.c.

Suggested reviewers

  • edsiper
  • koleini
  • fujimotos

Poem

🐰 I nibble bytes and stitch them tight,
In a burrowed buffer snug by night.
When full I shout, "Finalize — then go!"
Pack records, send, and reset — hop, hop, ho! 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 53.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding simple_aggregation feature to kinesis/firehose plugins. It is concise, clear, and directly reflects the core purpose of the changeset.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28b1d54 and 72d6eb4.

📒 Files selected for processing (13)
  • include/fluent-bit/aws/flb_aws_aggregation.h (1 hunks)
  • plugins/out_kinesis_firehose/firehose.c (4 hunks)
  • plugins/out_kinesis_firehose/firehose.h (3 hunks)
  • plugins/out_kinesis_firehose/firehose_api.c (7 hunks)
  • plugins/out_kinesis_streams/kinesis.c (4 hunks)
  • plugins/out_kinesis_streams/kinesis.h (3 hunks)
  • plugins/out_kinesis_streams/kinesis_api.c (9 hunks)
  • src/aws/CMakeLists.txt (1 hunks)
  • src/aws/flb_aws_aggregation.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/aws_aggregation.c (1 hunks)
  • tests/runtime/out_firehose.c (1 hunks)
  • tests/runtime/out_kinesis.c (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/aws/flb_aws_aggregation.c
  • include/fluent-bit/aws/flb_aws_aggregation.h
🧰 Additional context used
🧠 Learnings (9)
📚 Learning: 2025-09-04T12:35:36.904Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:3275-3282
Timestamp: 2025-09-04T12:35:36.904Z
Learning: The out_s3 plugin intentionally uses a simple numeric comparison for retry_limit (chunk->failures >= ctx->ins->retry_limit) rather than the standard Fluent Bit pattern that checks for FLB_OUT_RETRY_UNLIMITED (-1). The maintainer wants to keep this current behavior for consistency within the plugin.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • plugins/out_kinesis_firehose/firehose.h
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
🧬 Code graph analysis (6)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-234)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
plugins/out_kinesis_streams/kinesis.c (3)
plugins/out_kinesis_firehose/firehose.c (1)
  • new_flush_buffer (317-357)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_streams/kinesis_api.c (1)
  • kinesis_flush_destroy (1109-1121)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
  • new_flush_buffer (311-354)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_firehose/firehose_api.c (1)
  • flush_destroy (1122-1134)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
tests/runtime/out_kinesis.c (2)
plugins/out_s3/s3.c (1)
  • setenv (57-60)
src/flb_lib.c (9)
  • flb_create (143-225)
  • flb_input (266-276)
  • flb_input_set (305-335)
  • flb_output (279-289)
  • flb_output_set (520-551)
  • flb_start (983-994)
  • flb_lib_push (843-870)
  • flb_stop (1011-1055)
  • flb_destroy (228-263)
plugins/out_kinesis_firehose/firehose_api.c (1)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-234)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
🔇 Additional comments (29)
src/aws/CMakeLists.txt (1)

18-18: LGTM!

The addition of the aggregation source file to the build list is correct and follows the existing pattern in the CMakeLists.txt.

plugins/out_kinesis_firehose/firehose.h (3)

29-29: LGTM!

Including the AWS aggregation header is necessary to use the flb_aws_agg_buffer type.


60-62: LGTM!

The aggregation buffer fields are correctly added to the flush structure, enabling per-flush aggregation state management. The agg_buf_initialized flag ensures proper cleanup in the destroy path.


102-102: LGTM!

The simple_aggregation flag in the context enables runtime control of aggregation behavior per plugin instance.

tests/internal/CMakeLists.txt (1)

141-141: LGTM!

The aggregation test file is correctly added to the FLB_AWS test list, enabling unit test coverage for the new aggregation API.

plugins/out_kinesis_firehose/firehose.c (3)

317-320: LGTM!

The function signature is correctly updated to accept the context parameter needed for checking the simple_aggregation flag. The ret variable is properly used to capture the initialization result.


344-354: LGTM!

The aggregation buffer initialization logic is correct:

  • Conditionally initializes only when simple_aggregation is enabled
  • Properly handles initialization failure by cleaning up and returning NULL
  • Sets the agg_buf_initialized flag for proper lifecycle management in the destroy path

The error handling prevents resource leaks by calling flush_destroy on failure.


524-529: LGTM!

The configuration entry for simple_aggregation is well-structured with:

  • Appropriate default value (false)
  • Clear, informative help text explaining the purpose and benefits
  • Correct offset binding to the context structure
plugins/out_kinesis_streams/kinesis.h (1)

29-29: LGTM!

The header additions mirror the Firehose plugin structure, ensuring consistent aggregation support across both Kinesis plugins. The aggregation buffer fields and configuration flag follow the same pattern.

Also applies to: 60-62, 100-100

tests/internal/aws_aggregation.c (1)

1-544: LGTM!

This is an excellent, comprehensive test suite for the AWS aggregation API. The 17 test cases provide thorough coverage including:

  • Lifecycle management: initialization, destruction, reset, and reuse
  • Data operations: single and multiple record additions with content verification
  • Capacity handling: buffer full detection with both large single writes and multiple small writes
  • Finalization modes: testing both Firehose (with newline) and Kinesis (without newline) modes
  • Edge cases: NULL parameters, empty buffers, boundary conditions, exact-size fills
  • Robustness: double finalize, add-after-finalize, alternating patterns, very small buffers
  • Scale: large aggregations (1000 records)

The tests follow the cutest framework pattern used throughout Fluent Bit and use realistic JSON test data. This level of coverage significantly reduces the risk of regressions.

plugins/out_kinesis_streams/kinesis.c (3)

311-314: LGTM!

The function signature correctly includes the context parameter for aggregation checks, plus tag parameters needed for Kinesis partition key generation. The ret variable properly captures initialization results.


341-351: LGTM!

The aggregation buffer initialization follows the same pattern as the Firehose plugin, ensuring consistent behavior across both implementations. Error handling correctly prevents leaks by calling kinesis_flush_destroy on failure.


518-523: LGTM!

The configuration entry matches the Firehose implementation, providing consistent documentation and behavior across both Kinesis plugins.

plugins/out_kinesis_streams/kinesis_api.c (6)

39-39: LGTM!

The aggregation header include and forward declaration are necessary for the aggregation integration. The forward declaration prevents ordering issues between static functions.

Also applies to: 55-56


209-230: LGTM!

The process_event_simple_aggregation function is a clean delegation to the shared aggregation API (flb_aws_aggregation_process_event), properly passing all required parameters including the Kinesis-specific stream name and MAX_EVENT_SIZE limit.


421-483: LGTM!

The send_aggregated_record function correctly:

  • Finalizes without a trailing newline (appropriate for Kinesis Streams, unlike Firehose which needs one)
  • Base64 encodes the aggregated data as required by the Kinesis PutRecords API
  • Handles buffer sizing and reallocation for the base64-encoded output
  • Creates a single event record containing the aggregated data
  • Sends via the existing send_log_events path
  • Resets the aggregation buffer for reuse

The error handling returns 0 when finalize fails (empty buffer), which is safe given the guard check before calling this function.


573-609: LGTM!

The aggregation path in add_event implements a sensible retry mechanism:

  1. Attempts to add the event to the aggregation buffer
  2. On buffer-full (ret == 1), checks if the buffer is empty (line 583)
  3. If empty, the record is too large to aggregate and is discarded (lines 584-587)
  4. If not empty, sends the accumulated data and retries adding the current record (lines 591-596)
  5. On retry with an empty buffer, an oversized record will be caught by the guard and discarded

This prevents infinite loops while ensuring that accumulated data is sent before discarding problematic records. The guard at line 583 is critical for detecting records that exceed the maximum aggregation size.


759-764: LGTM!

The end-of-batch handling correctly routes to either send_aggregated_record or send_log_events based on the aggregation mode, ensuring any remaining buffered data is sent before completing the flush cycle.


1112-1114: LGTM!

The aggregation buffer cleanup in kinesis_flush_destroy properly checks the initialization flag and destroys the buffer, preventing resource leaks.

plugins/out_kinesis_firehose/firehose_api.c (6)

41-41: LGTM: Clean integration of aggregation API.

The include and forward declaration properly integrate the new aggregation functionality into the Firehose plugin.

Also applies to: 57-58


148-169: LGTM: Clean wrapper for shared aggregation API.

The helper properly adapts Firehose-specific context to the shared aggregation API, forwarding all necessary parameters.


452-456: Buffer size check protects against overflow.

The check ensures the aggregated record fits in tmp_buf before copying. If it doesn't fit, the record is discarded with appropriate logging. This is a reasonable defensive measure, though ideally the aggregation buffer size should be configured to prevent this scenario.


579-594: Previous critical bug successfully fixed.

The aggregation retry logic now correctly checks if the buffer is empty (line 581) before attempting to send (line 589). This fixes the previous bug where the check happened after send_aggregated_record called flb_aws_aggregation_reset, causing records to be incorrectly discarded.

The logic now properly handles:

  • Records too large for the aggregation buffer (discard immediately)
  • Buffer full with data (send aggregated data, then retry current record)

✅ Addresses the critical issue raised in previous review


769-776: LGTM: Proper end-of-batch handling for aggregated data.

The logic correctly sends any remaining aggregated data when the batch completes, ensuring no data is left in the buffer.


1125-1127: LGTM: Proper resource cleanup.

The aggregation buffer is correctly destroyed when initialized, preventing resource leaks.

tests/runtime/out_firehose.c (2)

192-525: Comprehensive test coverage for aggregation feature.

The nine new test functions thoroughly exercise the Firehose aggregation functionality across various configurations:

  • Basic aggregation with multiple records
  • Integration with time_key and log_key features
  • Compression combined with aggregation
  • Edge cases (empty records, many records)
  • Error handling scenarios

The tests follow consistent patterns and include proper setup/teardown. The buffer overflow check at line 331 in aggregation_many_records is good defensive programming.


534-542: LGTM: Test registry properly updated.

All nine aggregation tests are correctly registered with descriptive names.

tests/runtime/out_kinesis.c (2)

298-444: Good test coverage for Kinesis aggregation.

The four new test functions provide solid coverage of Kinesis aggregation functionality, including integration with time_key and log_key features, and stress-testing with many small records. The tests follow the same patterns as the Firehose tests, ensuring consistency across the test suite.


456-459: LGTM: Test registry properly updated.

All four Kinesis aggregation tests are correctly registered with consistent naming.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
plugins/out_kinesis_firehose/firehose_api.c (1)

386-480: Consider adding a guard for events array.

The aggregated record finalization and sending logic is well-structured. The compression and base64 encoding paths are handled correctly, and memory management appears sound.

One minor consideration: at line 461, the code assumes buf->events is allocated. While this should always be true when called via process_and_send_records, a defensive check could prevent potential crashes in edge cases.

Optional defensive check:

+    /* Ensure events array is allocated */
+    if (buf->events == NULL) {
+        flb_plg_error(ctx->ins, "Events buffer not initialized");
+        flb_aws_aggregation_reset(&buf->agg_buf);
+        return -1;
+    }
+
     /* Create event record */
     event = &buf->events[0];
src/aws/flb_aws_aggregation.c (1)

113-233: LGTM: Event processing logic is correct.

The flb_aws_aggregation_process_event function properly handles:

  • msgpack-to-JSON conversion
  • log_key extraction (removing enclosing quotes)
  • Size validation at multiple stages
  • time_key injection with custom formatting
  • Newline appending before aggregation

The function correctly returns different codes for different scenarios (buffer full, record too large, error), enabling proper caller handling.

Optional: Consider future refactoring opportunity.

The event processing logic in flb_aws_aggregation_process_event shares significant code with the non-aggregation process_event function in kinesis_api.c (msgpack conversion, log_key handling, time_key injection). While the current duplication is acceptable for clarity and the aggregation/non-aggregation paths diverge at the end, a future refactor could extract common logic into shared helper functions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c06c124 and 158fb26.

📒 Files selected for processing (13)
  • include/fluent-bit/aws/flb_aws_aggregation.h (1 hunks)
  • plugins/out_kinesis_firehose/firehose.c (4 hunks)
  • plugins/out_kinesis_firehose/firehose.h (3 hunks)
  • plugins/out_kinesis_firehose/firehose_api.c (7 hunks)
  • plugins/out_kinesis_streams/kinesis.c (4 hunks)
  • plugins/out_kinesis_streams/kinesis.h (3 hunks)
  • plugins/out_kinesis_streams/kinesis_api.c (9 hunks)
  • src/aws/CMakeLists.txt (1 hunks)
  • src/aws/flb_aws_aggregation.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/aws_aggregation.c (1 hunks)
  • tests/runtime/out_firehose.c (1 hunks)
  • tests/runtime/out_kinesis.c (2 hunks)
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • include/fluent-bit/aws/flb_aws_aggregation.h
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
🧬 Code graph analysis (7)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_process_event (113-233)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
  • new_flush_buffer (311-354)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_firehose/firehose_api.c (1)
  • flush_destroy (1114-1126)
tests/runtime/out_firehose.c (2)
src/flb_lib.c (9)
  • flb_create (143-225)
  • flb_input (266-276)
  • flb_input_set (305-335)
  • flb_output (279-289)
  • flb_output_set (520-551)
  • flb_start (983-994)
  • flb_lib_push (843-870)
  • flb_stop (1011-1055)
  • flb_destroy (228-263)
tests/runtime/out_kinesis.c (1)
  • flb_test_firehose_success (13-45)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_pack.c (1)
  • flb_msgpack_to_json (1011-1024)
include/fluent-bit/flb_compat.h (1)
  • gmtime_r (75-81)
src/aws/flb_aws_util.c (1)
  • flb_aws_strftime_precision (1240-1326)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
plugins/out_kinesis_firehose/firehose_api.c (3)
plugins/out_kinesis_streams/kinesis_api.c (4)
  • send_log_events (486-555)
  • process_event_simple_aggregation (213-230)
  • send_aggregated_record (422-483)
  • reset_flush_buf (414-419)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
🔇 Additional comments (35)
src/aws/CMakeLists.txt (1)

18-18: LGTM!

The aggregation source file is correctly added to the AWS library build. The unconditional inclusion is appropriate since aggregation is a core feature of the kinesis/firehose plugins.

tests/internal/CMakeLists.txt (1)

141-141: LGTM!

The aggregation unit test is correctly added under the FLB_AWS conditional, following the established pattern for AWS-related tests.

plugins/out_kinesis_firehose/firehose.h (3)

29-29: LGTM!

The aggregation header include is correctly placed with other Fluent Bit headers.


60-62: LGTM!

The aggregation buffer and initialization flag are appropriately added to the flush structure. The agg_buf_initialized flag properly tracks whether cleanup is needed in flush_destroy.


102-102: LGTM!

The simple_aggregation configuration option is correctly added to the context structure.

plugins/out_kinesis_streams/kinesis.h (3)

29-29: LGTM!

The aggregation header include is correctly placed, consistent with the firehose plugin.


60-62: LGTM!

The aggregation buffer fields are consistent with the firehose plugin structure.


100-100: LGTM!

The simple_aggregation configuration option is correctly added to the Kinesis context structure.

tests/internal/aws_aggregation.c (10)

1-27: LGTM!

Well-structured test file with appropriate includes and constant definition. The MAX_RECORD_SIZE matches the expected usage in production code.


29-47: LGTM!

Good lifecycle test verifying init sets expected values and destroy properly cleans up state.


49-107: LGTM!

Comprehensive tests for single and multiple record additions with proper content verification via memcmp.


109-163: LGTM!

Good boundary testing for buffer-full scenarios, including both oversized single records and gradual fill to capacity.


165-200: LGTM!

Properly tests both Kinesis Streams mode (no newline) and Firehose mode (with newline) finalization paths.


202-254: LGTM!

Good coverage of edge cases: empty buffer finalization and reset/reuse cycles.


256-322: LGTM!

Excellent coverage of large-scale additions and NULL parameter handling. The NULL tests ensure robustness against invalid inputs.


324-404: LGTM!

Good boundary condition tests including exact capacity fill and multiple reset cycles.


406-522: LGTM!

Useful edge case tests covering tiny buffers, double finalize, add-after-finalize, and alternating patterns. These tests verify the API behaves correctly in non-standard usage scenarios.


524-544: LGTM!

Comprehensive test list covering all aggregation API functions and edge cases.

plugins/out_kinesis_firehose/firehose_api.c (6)

41-41: LGTM!

The aggregation header is correctly included alongside other AWS headers.


57-58: LGTM!

Forward declaration is necessary since send_aggregated_record calls send_log_events.


148-169: LGTM!

Clean wrapper function that delegates to the shared aggregation implementation. Parameters are correctly passed including MAX_EVENT_SIZE as the limit.


571-600: LGTM!

The simple aggregation path is correctly integrated into add_event. The retry logic properly handles buffer-full conditions by sending the aggregated record and retrying.


761-767: LGTM!

The final send logic correctly branches between aggregation mode and normal mode for any remaining data.


1114-1125: LGTM!

Proper cleanup of the aggregation buffer when agg_buf_initialized flag is set. This prevents memory leaks and double-free issues.

plugins/out_kinesis_streams/kinesis_api.c (5)

55-56: LGTM: Forward declaration added for function ordering.

The forward declaration is necessary since send_aggregated_record (defined at line 422) calls send_log_events before its definition.


209-230: LGTM: Clean delegation to shared aggregation API.

The wrapper appropriately delegates to the shared flb_aws_aggregation_process_event implementation, passing all required context from the Kinesis-specific structures.


573-601: LGTM: Aggregation path correctly handles buffer-full scenario.

The retry logic properly handles the buffer-full case (ret == 1) by sending the accumulated aggregated record and retrying the current event. The mechanism prevents infinite loops since oversized individual records return 2 (discard), which breaks the retry cycle.


751-756: LGTM: Proper end-of-batch handling and resource cleanup.

The end-of-batch logic correctly routes through the aggregation path when enabled, and the cleanup path properly destroys the aggregation buffer when initialized.

Also applies to: 1104-1106


421-483: Buffer sizing concern does not reflect actual constant values.

The buffer size constants are appropriately defined with sufficient margin. PUT_RECORDS_PAYLOAD_SIZE is 5,242,880 bytes (~5.24 MB), while the maximum base64-encoded aggregated record is approximately 1.4 MB (MAX_EVENT_SIZE of 1,048,556 bytes with 33% base64 overhead). The check at line 456 is defensive programming but will not realistically trigger under normal conditions, as the buffer is 3.76x larger than the maximum possible encoded output.

Likely an incorrect or invalid review comment.

plugins/out_kinesis_firehose/firehose.c (1)

317-354: LGTM: Firehose aggregation initialization consistent with Kinesis.

The changes properly integrate aggregation support:

  • Function signature updated to accept context for conditional initialization
  • Aggregation buffer initialized when simple_aggregation is enabled
  • Proper error handling with buffer destruction on initialization failure
  • Configuration option added with appropriate defaults and description

The implementation mirrors the Kinesis Streams changes, ensuring consistency across both AWS output plugins.

Also applies to: 371-371, 524-529

tests/runtime/out_firehose.c (1)

192-524: LGTM: Comprehensive test coverage for Firehose aggregation.

The nine new tests cover a wide range of scenarios:

  • Basic aggregation functionality
  • Integration with time_key and log_key features
  • Volume testing with many records
  • Interaction with compression
  • Combined parameter configurations
  • Edge cases (empty records, error handling)
  • Custom time formats

All tests follow the established testing patterns and are properly registered.

Also applies to: 533-541

plugins/out_kinesis_streams/kinesis.c (1)

311-351: LGTM: Kinesis Streams aggregation initialization.

The implementation is consistent with the Firehose changes:

  • Context-aware buffer initialization
  • Proper error handling with cleanup on failure
  • Configuration option matching Firehose conventions

Also applies to: 368-368, 518-523

include/fluent-bit/aws/flb_aws_aggregation.h (1)

1-90: LGTM: Well-designed aggregation API.

The header defines a clean, focused API:

  • Simple buffer structure with clear fields
  • Lifecycle management functions (init/destroy/reset)
  • Data manipulation functions (add/finalize)
  • High-level event processing function
  • Return value semantics are clearly documented

Based on learnings, function descriptions are not required for Fluent Bit headers.

tests/runtime/out_kinesis.c (1)

298-443: LGTM: Adequate test coverage for Kinesis Streams aggregation.

The four new tests cover key scenarios:

  • Basic aggregation with multiple small records
  • Integration with time_key and log_key features
  • Volume testing with 50 records to validate aggregation efficiency

Tests follow established patterns and are properly registered. Coverage aligns with Firehose testing approach.

Also applies to: 455-458

src/aws/flb_aws_aggregation.c (1)

30-107: LGTM: Core aggregation functions are well-implemented.

The buffer lifecycle and manipulation functions demonstrate:

  • Proper null pointer checks
  • Correct memory management with flb_malloc/flb_free
  • Appropriate return codes matching the API documentation
  • Buffer overflow prevention in flb_aws_aggregation_add

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation concept is good but our commit message linter complains error and we usually separate commits for this type of PR like:

  • aws: aws_aggregation: Implement aggregation operation
  • out_kinesis_firehose: Add simple_aggregation operation
  • out_kinesis_stream: Add simple_aggregation operation
  • tests: internal: aws_aggregation
  • out_kinesis_firehose: tests: Add a test case of simple_aggregation operation
  • out_kinesis_stream: tests: Add a test case of Add simple_aggregation operation

Plus, compilation errors are also happening. Could you take a look on this?

@ShelbyZ
Copy link
Author

ShelbyZ commented Dec 18, 2025

The implementation concept is good but our commit message linter complains error and we usually separate commits for this type of PR like:

  • aws: aws_aggregation: Implement aggregation operation
  • out_kinesis_firehose: Add simple_aggregation operation
  • out_kinesis_stream: Add simple_aggregation operation
  • tests: internal: aws_aggregation
  • out_kinesis_firehose: tests: Add a test case of simple_aggregation operation
  • out_kinesis_stream: tests: Add a test case of Add simple_aggregation operation

Plus, compilation errors are also happening. Could you take a look on this?

Should be updated in the last force push. The build failure was some weird refactoring bug that removed the struct value which not needed

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
tests/runtime/out_kinesis.c (2)

298-443: Consider adding assertions to verify aggregation behavior.

The new aggregation tests successfully set up the plugin configuration and push records, but they rely solely on the test mode mocking and don't include explicit assertions to verify that:

  • Records were actually aggregated into fewer API calls
  • The aggregated payload format is correct
  • Buffer boundaries are respected

While the sleep-and-succeed pattern is consistent with existing tests in this file, consider whether additional test infrastructure could validate the aggregation behavior more thoroughly.


435-438: Verify buffer size for formatted records.

The loop uses a 100-byte buffer with snprintf to format records. While this should be sufficient for the test pattern [1, {"id":%d,"msg":"test"}], consider using sizeof in the snprintf call for explicit bounds checking:

🔎 Suggested improvement:
     for (i = 0; i < 50; i++) {
-        snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i);
+        ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i);
+        if (ret >= sizeof(record)) {
+            flb_plg_warn(ctx->ins, "Record truncated in test");
+        }
         flb_lib_push(ctx, in_ffd, record, strlen(record));
     }
src/aws/flb_aws_aggregation.c (1)

179-183: Handle early return after time formatting failure.

At lines 179-182, when flb_aws_strftime_precision returns 0 (formatting failed), the code logs an error and frees out_buf, but then falls through to the else block at line 184 which will try to use out_buf.

Looking more carefully, when len == 0, the code frees out_buf at line 182, then at line 184 there's an else block that won't execute because we're in the if (len == 0) branch. So the control flow continues to line 204.

Actually, I misread this. Let me recheck:

if (len == 0) {
    flb_plg_error(ins, "Failed to add time_key %s to record, %s",
                  time_key, stream_name);
    flb_free(out_buf);
}
else {
    // time_key_ptr manipulation
    ...
}

This is correct - when len == 0, it frees out_buf and falls through. When len > 0, it goes into the else block. The code then continues to line 204 for the size check. This is safe because written hasn't been modified when len == 0, so the size check still works.

However, there's no explicit return or discard after time formatting failure. The record will be added without the time_key, which may not be the intended behavior.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 158fb26 and dc1eb6b.

📒 Files selected for processing (13)
  • include/fluent-bit/aws/flb_aws_aggregation.h (1 hunks)
  • plugins/out_kinesis_firehose/firehose.c (4 hunks)
  • plugins/out_kinesis_firehose/firehose.h (3 hunks)
  • plugins/out_kinesis_firehose/firehose_api.c (7 hunks)
  • plugins/out_kinesis_streams/kinesis.c (4 hunks)
  • plugins/out_kinesis_streams/kinesis.h (3 hunks)
  • plugins/out_kinesis_streams/kinesis_api.c (9 hunks)
  • src/aws/CMakeLists.txt (1 hunks)
  • src/aws/flb_aws_aggregation.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/aws_aggregation.c (1 hunks)
  • tests/runtime/out_firehose.c (1 hunks)
  • tests/runtime/out_kinesis.c (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/out_kinesis_streams/kinesis.h
🧰 Additional context used
🧠 Learnings (9)
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • plugins/out_kinesis_firehose/firehose_api.c
  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-09-04T12:35:36.904Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:3275-3282
Timestamp: 2025-09-04T12:35:36.904Z
Learning: The out_s3 plugin intentionally uses a simple numeric comparison for retry_limit (chunk->failures >= ctx->ins->retry_limit) rather than the standard Fluent Bit pattern that checks for FLB_OUT_RETRY_UNLIMITED (-1). The maintainer wants to keep this current behavior for consistency within the plugin.

Applied to files:

  • plugins/out_kinesis_streams/kinesis_api.c
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • plugins/out_kinesis_firehose/firehose.h
🧬 Code graph analysis (8)
plugins/out_kinesis_firehose/firehose_api.c (1)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
plugins/out_kinesis_streams/kinesis_api.c (2)
src/aws/flb_aws_aggregation.c (4)
  • flb_aws_aggregation_process_event (113-233)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_destroy (48-56)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
tests/runtime/out_kinesis.c (1)
src/flb_lib.c (9)
  • flb_create (143-225)
  • flb_input (266-276)
  • flb_input_set (305-335)
  • flb_output (279-289)
  • flb_output_set (520-551)
  • flb_start (983-994)
  • flb_lib_push (843-870)
  • flb_stop (1011-1055)
  • flb_destroy (228-263)
plugins/out_kinesis_streams/kinesis.c (2)
plugins/out_kinesis_firehose/firehose.c (1)
  • new_flush_buffer (317-357)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
tests/internal/aws_aggregation.c (1)
src/aws/flb_aws_aggregation.c (5)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
include/fluent-bit/aws/flb_aws_aggregation.h (1)
src/aws/flb_aws_aggregation.c (6)
  • flb_aws_aggregation_init (30-46)
  • flb_aws_aggregation_destroy (48-56)
  • flb_aws_aggregation_add (58-77)
  • flb_aws_aggregation_finalize (79-100)
  • flb_aws_aggregation_reset (102-107)
  • flb_aws_aggregation_process_event (113-233)
src/aws/flb_aws_aggregation.c (4)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/flb_pack.c (1)
  • flb_msgpack_to_json (1011-1024)
include/fluent-bit/flb_compat.h (1)
  • gmtime_r (75-81)
src/aws/flb_aws_util.c (1)
  • flb_aws_strftime_precision (1240-1326)
plugins/out_kinesis_firehose/firehose.c (3)
plugins/out_kinesis_streams/kinesis.c (1)
  • new_flush_buffer (311-354)
src/aws/flb_aws_aggregation.c (1)
  • flb_aws_aggregation_init (30-46)
plugins/out_kinesis_firehose/firehose_api.c (1)
  • flush_destroy (1121-1133)
🔇 Additional comments (23)
src/aws/CMakeLists.txt (1)

18-18: LGTM!

The new aggregation source file is correctly added to the AWS library build list, following the existing pattern for unconditional AWS components.

tests/internal/CMakeLists.txt (1)

141-141: LGTM!

The aggregation unit test is correctly added within the FLB_AWS conditional block, following the established pattern for AWS test files.

plugins/out_kinesis_firehose/firehose.h (1)

29-29: LGTM!

The header changes are well-structured:

  • New include for the aggregation API.
  • agg_buf and agg_buf_initialized fields in struct flush for per-flush aggregation state.
  • simple_aggregation flag in struct flb_firehose for runtime configuration.

The agg_buf_initialized flag pattern correctly mirrors the Kinesis Streams implementation.

Also applies to: 60-63, 102-102

tests/internal/aws_aggregation.c (1)

1-544: Excellent test coverage!

The test suite comprehensively covers the AWS aggregation API:

  • Lifecycle management (init/destroy/reset)
  • Single and multiple record additions
  • Buffer full detection and boundary conditions
  • Both Firehose (with newline) and Kinesis (without newline) finalization modes
  • Edge cases (NULL params, empty buffer, tiny buffer, exact boundaries)
  • Reuse patterns (reset cycles, add after finalize, double finalize)

This thorough coverage will help catch regressions in the aggregation module.

plugins/out_kinesis_firehose/firehose_api.c (3)

148-169: LGTM!

The process_event_simple_aggregation function is a clean wrapper around the shared flb_aws_aggregation_process_event implementation, correctly passing the Firehose-specific parameters including MAX_EVENT_SIZE and the delivery stream name.


386-480: Aggregation send logic is well-structured.

The send_aggregated_record function correctly handles:

  • Finalization with add_final_newline=1 for Firehose
  • Compression and base64 encoding paths
  • Proper memory management for both paths
  • Creating a single event record for the aggregated payload
  • Buffer reset after sending

768-774: LGTM!

Correct finalization and cleanup:

  • Lines 768-774: Properly routes to send_aggregated_record or send_log_events based on simple_aggregation flag.
  • Lines 1124-1126: Correctly checks agg_buf_initialized before calling destroy, preventing double-free or use-after-free issues.

Also applies to: 1124-1126

plugins/out_kinesis_streams/kinesis.c (2)

311-354: LGTM!

The aggregation initialization in new_flush_buffer is correctly implemented:

  • Accepts context parameter for config access
  • Initializes agg_buf_initialized to FLB_FALSE before attempting init
  • Properly handles init failure with cleanup
  • Sets agg_buf_initialized to FLB_TRUE only on success
  • Mirrors the Firehose implementation pattern for consistency

518-523: LGTM!

The simple_aggregation config map entry is correctly defined with:

  • Boolean type with "false" default
  • Proper offset binding to struct flb_kinesis
  • Clear description of the feature's purpose
plugins/out_kinesis_firehose/firehose.c (2)

317-357: LGTM: Aggregation buffer initialization is well-guarded.

The changes properly integrate aggregation support:

  • The ctx parameter enables access to the simple_aggregation flag
  • Initialization failure correctly cleans up and returns NULL
  • The agg_buf_initialized flag tracks state for proper cleanup

The integration follows the same pattern used in the Kinesis Streams plugin (see plugins/out_kinesis_streams/kinesis.c:310-353), ensuring consistency across both plugins.


524-529: Config option well-documented.

The simple_aggregation configuration properly:

  • Defaults to false (opt-in behavior)
  • Includes clear description of benefits
  • Uses boolean type for simple on/off control
  • Aligns with the similar option in kinesis_streams
tests/runtime/out_firehose.c (1)

192-524: Good coverage of edge cases with aggregation.

The Firehose aggregation tests are more comprehensive than the Kinesis tests, covering:

  • Compression combined with aggregation (line 339)
  • Empty/minimal records (line 415)
  • Error handling during aggregation (line 450)
  • Custom time formats (line 489)

This broader test coverage helps validate the aggregation feature works correctly across various configuration combinations.

plugins/out_kinesis_streams/kinesis_api.c (5)

55-56: Forward declaration enables clean separation.

The forward declaration of send_log_events allows the new send_aggregated_record function to call it without reordering the entire file.


209-230: Effective wrapper delegates to shared implementation.

The process_event_simple_aggregation wrapper correctly forwards all parameters to the shared flb_aws_aggregation_process_event API, avoiding code duplication between Kinesis Streams and Firehose plugins.


573-608: Aggregation retry logic addresses previous concern.

The guard at lines 584-590 checks if the aggregation buffer is empty after attempting to send, preventing the infinite loop flagged in the previous review. When ret == 0 && buf->agg_buf.agg_buf_offset == 0, the code discards the unprocessable record.

However, as noted in the previous comment, there may be an issue with checking agg_buf_offset after flb_aws_aggregation_reset has already been called. This needs verification to ensure the guard works as intended.


758-763: End-of-batch logic correctly handles both modes.

The conditional at lines 758-763 properly routes the final send through send_aggregated_record when aggregation is enabled, otherwise using the standard send_log_events path. This ensures any buffered aggregated records are flushed at the end of processing.


1111-1113: Aggregation buffer cleanup is properly guarded.

The destruction logic at lines 1111-1113 checks agg_buf_initialized before calling flb_aws_aggregation_destroy, preventing double-free or use of uninitialized buffers.

include/fluent-bit/aws/flb_aws_aggregation.h (1)

20-90: Well-designed aggregation API with clear documentation.

The header provides a clean public API for AWS aggregation:

  • Clear structure definition with descriptive field names
  • All functions documented with return value semantics
  • Return codes are consistent and well-explained (0=success, -1=error, 1=buffer full, 2=discard)
  • Include guards properly defined

The API design allows both Kinesis Streams and Firehose plugins to share the same aggregation logic, reducing code duplication.

src/aws/flb_aws_aggregation.c (5)

30-46: LGTM: Initialization properly validates and allocates.

The flb_aws_aggregation_init function:

  • Validates the buffer pointer
  • Allocates the requested size
  • Initializes all fields (size and offset)
  • Returns appropriate error codes

48-56: LGTM: Destruction safely handles all cases.

The flb_aws_aggregation_destroy function:

  • Checks both buf and buf->agg_buf before freeing
  • Nulls the pointer after freeing
  • Resets size and offset to maintain consistency

58-77: LGTM: Bounds checking prevents buffer overflow.

The flb_aws_aggregation_add function correctly:

  • Validates all input parameters
  • Checks if adding data would exceed max_record_size before copying
  • Returns 1 (buffer full) when limit would be exceeded, allowing caller to finalize and retry
  • Uses safe memcpy after validation

79-100: LGTM: Finalization handles both Kinesis and Firehose cases.

The flb_aws_aggregation_finalize function:

  • Returns -1 when there's no data to finalize (empty buffer)
  • The add_final_newline parameter allows Firehose (which needs it) and Kinesis Streams (which doesn't) to use the same API
  • Checks buffer size before adding the final newline

113-233: Shared event processing logic eliminates duplication.

The flb_aws_aggregation_process_event function consolidates the msgpack-to-JSON conversion, log_key extraction, time_key injection, and aggregation buffer management that was previously duplicated between Kinesis and Firehose plugins.

The function correctly:

  • Converts msgpack to JSON
  • Discards empty messages (return 2)
  • Handles log_key by stripping enclosing quotes
  • Checks size limits before and after time_key addition
  • Appends newline
  • Attempts to add to aggregation buffer
  • Returns appropriate codes: 0=success, 1=buffer full, 2=discard, -1=error

Comment on lines 421 to 483
/* Finalize and send aggregated record (Kinesis Streams version - no final newline) */
static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) {
int ret;
size_t agg_size;
size_t b64_len;
struct kinesis_event *event;

/* Finalize without final newline (Kinesis Streams doesn't need it) */
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 0, &agg_size);
if (ret < 0) {
return 0;
}

/* Base64 encode the aggregated record */
size_t size = (agg_size * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
}
}

ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
if (ret != 0) {
flb_errno();
return -1;
}
agg_size = b64_len;

/* Copy to tmp_buf */
if (buf->tmp_buf_size < agg_size) {
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
flb_aws_aggregation_reset(&buf->agg_buf);
return 0;
}

memcpy(buf->tmp_buf, buf->event_buf, agg_size);

/* Create event record */
event = &buf->events[0];
event->json = buf->tmp_buf;
event->len = agg_size;
event->timestamp.tv_sec = 0;
event->timestamp.tv_nsec = 0;
buf->event_index = 1;

/* Calculate data_size for the payload */
buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN;
buf->data_size += strlen(ctx->stream_name);
buf->data_size += agg_size + PUT_RECORDS_PER_RECORD_LEN;

/* Send the aggregated record */
ret = send_log_events(ctx, buf);

/* Reset aggregation buffer */
flb_aws_aggregation_reset(&buf->agg_buf);

return ret;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how agg_buf_offset is used throughout the codebase
echo "=== Checking agg_buf_offset usage in kinesis_api.c ==="
rg -n -C3 'agg_buf\.agg_buf_offset' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Checking aggregation reset calls ==="
rg -n -C3 'flb_aws_aggregation_reset' plugins/out_kinesis_streams/

echo -e "\n=== Checking similar pattern in firehose_api.c ==="
rg -n -C3 'agg_buf\.agg_buf_offset == 0' plugins/out_kinesis_firehose/

Repository: fluent/fluent-bit

Length of output: 2492


🏁 Script executed:

#!/bin/bash
# Get the function containing line 584 to understand the caller context
echo "=== Context around line 584 (caller of send_aggregated_record) ==="
sed -n '560,600p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Check add_record_to_buffer function to see the loop ==="
rg -n -B5 'send_aggregated_record' plugins/out_kinesis_streams/kinesis_api.c | head -40

Repository: fluent/fluent-bit

Length of output: 2420


🏁 Script executed:

#!/bin/bash
# Get the full send_aggregated_record function to see all return paths
echo "=== Full send_aggregated_record function ==="
sed -n '421,483p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Check what process_event_simple_aggregation returns ==="
rg -n -A20 'static int process_event_simple_aggregation' plugins/out_kinesis_streams/kinesis_api.c | head -50

echo -e "\n=== Check the retry loop after send_aggregated_record ==="
sed -n '577,615p' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 4607


🏁 Script executed:

#!/bin/bash
# Check if there's a goto loop back to retry_add_event_agg
echo "=== Checking for the retry loop structure ==="
sed -n '575,625p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Check reset_flush_buf behavior ==="
rg -n -A10 'static void reset_flush_buf' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 2287


🏁 Script executed:

#!/bin/bash
# Check send_log_events return values and what they mean
echo "=== send_log_events function signature and early returns ==="
rg -n -A30 'static int send_log_events' plugins/out_kinesis_streams/kinesis_api.c | head -50

echo -e "\n=== Check early return at line 492-500 mentioned in review ==="
sed -n '486,505p' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 2497


🏁 Script executed:

#!/bin/bash
# Check the full send_log_events function to see all return paths
echo "=== Full send_log_events function returns ==="
rg -n 'return' plugins/out_kinesis_streams/kinesis_api.c | grep -A1 -B1 'send_log_events' | tail -20

echo -e "\n=== Check end of send_log_events function ==="
sed -n '486,570p' plugins/out_kinesis_streams/kinesis_api.c | tail -40

Repository: fluent/fluent-bit

Length of output: 1350


🏁 Script executed:

#!/bin/bash
# Check if there are any error paths in send_aggregated_record that return without resetting
echo "=== Tracing all return paths in send_aggregated_record ==="
sed -n '421,483p' plugins/out_kinesis_streams/kinesis_api.c | grep -n 'return'

echo -e "\n=== Check what process_event_simple_aggregation returns ==="
rg -n -B2 -A10 'process_event_simple_aggregation.*ret' plugins/out_kinesis_streams/kinesis_api.c | grep -A15 'ret = 1'

Repository: fluent/fluent-bit

Length of output: 289


🏁 Script executed:

#!/bin/bash
# Let's get the complete picture of what happens after send_aggregated_record returns
echo "=== Full context of the retry loop and guard ==="
sed -n '575,605p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Check what flb_aws_aggregation_reset does (in external lib, but check if it's called again) ==="
rg -n 'flb_aws_aggregation_reset' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Verify: does reset_flush_buf reset agg_buf? ==="
sed -n '414,420p' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 1762


🏁 Script executed:

#!/bin/bash
# Let's verify the exact sequence: what happens when guard triggers vs normal case
echo "=== Guard scenario (line 584-590) ==="
sed -n '584,595p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Check if the current record has already been sent when guard triggers ==="
echo "In send_aggregated_record line 477: ret = send_log_events(ctx, buf);"
echo "send_log_events processes buf->events and sends them"
echo "Then at line 480: flb_aws_aggregation_reset(&buf->agg_buf);"
echo "This resets the aggregation buffer AFTER sending"
echo ""
echo "So if guard triggers after successful send, the data is already sent."
echo "Let's verify if reset_flush_buf at line 590 is correct behavior:"
sed -n '588,595p' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 1299


🏁 Script executed:

#!/bin/bash
# Check the final end to see if there's another loop at the end of process_and_send
echo "=== Check where send_aggregated_record is called from the end processing ==="
sed -n '750,770p' plugins/out_kinesis_streams/kinesis_api.c

echo -e "\n=== Summary of all return paths in send_aggregated_record ==="
echo "Line 432: return 0 (finalize failed) - NO reset before return"
echo "Line 457-458: return 0 (buffer too large) - WITH reset before return"  
echo "Line 482: return ret (normal) - WITH reset before return (line 480)"
echo ""
echo "So two of three return paths DO reset agg_buf_offset to 0"
echo ""
echo "=== Verify the guard condition logic ==="
sed -n '584,591p' plugins/out_kinesis_streams/kinesis_api.c

Repository: fluent/fluent-bit

Length of output: 1368


Fix guard logic to properly detect infinite loop conditions.

The guard at line 584 checks ret == 0 && buf->agg_buf.agg_buf_offset == 0 to detect infinite loops, but the condition is unreliable. When send_aggregated_record succeeds (line 477), it resets the aggregation buffer at line 480 before returning. This causes the guard to trigger for all successful sends, not just infinite loop cases. The guard then logs "No data was aggregated" when in fact the data was successfully aggregated and sent. While data is not lost (it was sent before reset), the misleading warning obscures the actual intent and could hide genuine infinite loop conditions. The guard needs to distinguish between "nothing to aggregate" and "successfully sent and reset," perhaps by checking the state before aggregation finalize or by tracking whether data was actually sent.

🤖 Prompt for AI Agents
In plugins/out_kinesis_streams/kinesis_api.c around lines 421-483 (and the guard
at ~584), the guard that logs "No data was aggregated" misfires because
send_aggregated_record resets agg_buf before return; record the aggregation
state before finalizing or return a distinct status so the caller can
distinguish "nothing to aggregate" from "successfully sent then reset."
Specifically: capture pre_finalize_offset = buf->agg_buf.agg_buf_offset (or add
a return code for "sent") before calling
flb_aws_aggregation_finalize/send_aggregated_record, and change the guard to
check pre_finalize_offset == 0 (or check the sent return code) when deciding to
log the warning; leave error paths unchanged.

- Add helper methods to support aggregating records for aws outputs

Signed-off-by: Shelby Hagman <[email protected]>
- Adds testing for aws_aggregation

Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin

Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_firehose

Signed-off-by: Shelby Hagman <[email protected]>
- Add simple_aggregation config parameter and implementation to plugin

Signed-off-by: Shelby Hagman <[email protected]>
- Adds tests to out_kinesis

Signed-off-by: Shelby Hagman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants