From f83b7683b0872d07461063bceb0fef3102c3f6ea Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Mon, 1 Dec 2025 12:16:19 +0100 Subject: [PATCH 1/6] ml: ensure context is registered for REGEX type This fix ensures that when the buffer is flushed, the record will have proper timestamp and metadata instead of just the "log" field. Signed-off-by: Mirko Lazarevic --- src/multiline/flb_ml.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 3050286cd2c..4981f87af09 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -268,6 +268,11 @@ static int package_content(struct flb_ml_stream *mst, if (ret == FLB_MULTILINE_TRUNCATED) { truncated = FLB_TRUE; } + + if (stream_group->mp_sbuf.size == 0) { + flb_ml_register_context(stream_group, tm, full_map); + } + processed = FLB_TRUE; } else if (type == FLB_ML_ENDSWITH) { From ac0ab6a92efe368d0e4d48572390a20ca14e207f Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Tue, 2 Dec 2025 23:25:25 +0100 Subject: [PATCH 2/6] tests: Add unit tests Signed-off-by: Mirko Lazarevic --- tests/internal/multiline.c | 270 +++++++++++++++++++++++++++++++++++++ 1 file changed, 270 insertions(+) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index 4a9ef50df87..bf1c74001a9 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1643,6 +1643,275 @@ static void test_buffer_limit_disabled() flb_config_exit(config); } +/* + * Helper struct to track flush results with metadata + */ +struct metadata_result { + int current_record; + int total_expected; + char *key; + int records_with_full_metadata; /* Count of records with full metadata */ + int records_with_log_only; /* Count of records with only 'log' field */ +}; + +/* + * Callback that verifies metadata preservation + * + * Before the fix: continuation lines would have only 1 field (log) + * After the fix: all lines should have multiple fields (time, stream, log, file, etc.) + */ +static int flush_callback_metadata_check(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + int ret; + int field_count; + size_t off = 0; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object key; + struct flb_time tm; + struct metadata_result *res = data; + + /* Unpack the record */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf_data, buf_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + /* Extract timestamp and map */ + flb_time_pop_from_msgpack(&tm, &result, &map); + + /* Count fields in the map */ + field_count = map->via.map.size; + fprintf(stdout, "[Record %d] Fields: %d, Timestamp: %lu.%lu\n", + res->current_record, field_count, + (unsigned long)tm.tm.tv_sec, (unsigned long)tm.tm.tv_nsec); + + + /* Print field names */ + fprintf(stdout, " Field names: "); + for (int i = 0; i < field_count; i++) { + key = map->via.map.ptr[i].key; + if (key.type == MSGPACK_OBJECT_STR) { + fprintf(stdout, "%.*s", (int)key.via.str.size, key.via.str.ptr); + if (i < field_count - 1) { + fprintf(stdout, ", "); + } + } + } + fprintf(stdout, "\n"); + + /* Track metadata presence */ + /* 4 fields: time, stream, log, file*/ + if (field_count == 4) { + res->records_with_full_metadata++; + } else { + res->records_with_log_only++; + fprintf(stdout, " WARNING: Record has only 'log' field (missing metadata)\n"); + } + + res->current_record++; + msgpack_unpacked_destroy(&result); + + return 0; +} + +static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_id, + struct flb_time *tm, const char *log_content, + const char *stream_name, const char *file_path) +{ + int ret; + int log_len; + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *map; + + log_len = strlen(log_content); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Array: [timestamp, map] */ + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(tm, &mp_pck, 0); + + /* Map with 4 fields: time (string), stream, log, file */ + msgpack_pack_map(&mp_pck, 4); + + /* time field */ + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "time", 4); + msgpack_pack_str(&mp_pck, 24); + msgpack_pack_str_body(&mp_pck, "2025-12-01T17:33:44+00:00", 24); + + /* stream field */ + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "stream", 6); + msgpack_pack_str(&mp_pck, strlen(stream_name)); + msgpack_pack_str_body(&mp_pck, stream_name, strlen(stream_name)); + + /* log field */ + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + msgpack_pack_str(&mp_pck, log_len); + msgpack_pack_str_body(&mp_pck, log_content, log_len); + + /* file field */ + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "file", 4); + msgpack_pack_str(&mp_pck, strlen(file_path)); + msgpack_pack_str_body(&mp_pck, file_path, strlen(file_path)); + + /* Unpack and lookup the content map */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return -1; + } + + root = result.data; + map = &root.via.array.ptr[1]; + + /* Send to multiline processor */ + ret = flb_ml_append_object(ml, stream_id, tm, NULL, map); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + + return ret; +} + + +/* + * Test issue 10576: Metadata preservation when lines are flushed + * --------------------- + * - https://github.com/fluent/fluent-bit/issues/10576 + * + * Input pattern just as an example: + * - Line 1: "non-iso timestamp Likely to fail" (matches continuation) + * - Line 2: "Likely to fail" (matches continuation) + * - Line 3: "[iso timestamp] should be ok" (matches start_state) + * - Line 4: "non-iso timestamp Likely to fail" (matches continuation) + * - Line 5: "non-iso timestamp Likely to fail" (matches continuation) + * + * Before fix: Lines 1, 2, 4, 5 would have only {"log": "..."} (missing metadata) + * After fix: All lines should have {"time": "...", "stream": "...", "log": "...", "file": "..."} + */ +static void test_issue_10567_metadata_preservation() +{ + int ret; + int i; + uint64_t stream_id; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct metadata_result res = {0}; + + /* Test input - mix of start_state and continuation lines */ + const char *test_lines[] = { + "Mon Dec 1 17:33:44 UTC 2025 Likely to fail", /* continuation (no [timestamp]) */ + "Mon Dec 1 17:33:49 UTC 2025 Likely to fail", /* continuation */ + "[2025-12-01T17:33:54.551Z] should be ok", /* start_state */ + "Mon Dec 1 17:33:59 UTC 2025 Likely to fail", /* continuation */ + "Mon Dec 1 17:34:04 UTC 2025 Likely to fail", /* continuation */ + "[2025-12-01T17:34:09.555Z] should be ok", /* start_state */ + }; + + int num_lines = sizeof(test_lines) / sizeof(test_lines[0]); + + /* Initialize */ + config = flb_config_init(); + TEST_CHECK(config != NULL); + + /* Create custom multiline parser */ + mlp = flb_ml_parser_create(config, + "parser_10576", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser */ + NULL); /* parser_name */ + TEST_CHECK(mlp != NULL); + + /* start_state - matches [YYYY-MM-DDTHH:MM:SS.sssZ] format */ + ret = flb_ml_rule_create(mlp, "start_state", + "/^\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\]/", + "cont", NULL); + TEST_CHECK(ret == 0); + + /* cont - matches lines NOT starting with [timestamp] */ + ret = flb_ml_rule_create(mlp, "cont", + "/^(?!\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\])/", + "cont", NULL); + TEST_CHECK(ret == 0); + + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + /* Create ML context */ + ml = flb_ml_create(config, "test-metadata"); + TEST_CHECK(ml != NULL); + + mlp_i = flb_ml_parser_instance_create(ml, "parser_10576"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + /* Initialize result tracking */ + res.key = "log"; + res.total_expected = num_lines; + + /* Create stream */ + ret = flb_ml_stream_create(ml, "test-stream", -1, + flush_callback_metadata_check, + (void *)&res, &stream_id); + TEST_CHECK(ret == 0); + + /* Send each line and flush immediately after */ + for (i = 0; i < num_lines; i++) { + flb_time_get(&tm); + + fprintf(stdout, "Input[%d]: %s\n", i, test_lines[i]); + + ret = append_log_to_multiline_processor(ml, stream_id, &tm, test_lines[i], + "stdout", "/var/log/test.log"); + TEST_CHECK(ret == 0); + + /* Note: Flush after each line to simulate slow log arrival */ + flb_ml_flush_pending_now(ml); + } + + /* Final flush to ensure nothing is left */ + flb_ml_flush_pending_now(ml); + + /* + * After the fix, ALL records should have full metadata. + */ + TEST_CHECK(res.records_with_log_only == 0); + TEST_CHECK(res.records_with_full_metadata == res.current_record); + TEST_CHECK(res.current_record == num_lines); + + /* Cleanup */ + flb_ml_destroy(ml); + flb_config_exit(config); +} + + TEST_LIST = { /* Normal features tests */ { "parser_docker", test_parser_docker}, @@ -1663,5 +1932,6 @@ TEST_LIST = { { "issue_4034" , test_issue_4034}, { "issue_4949" , test_issue_4949}, { "issue_5504" , test_issue_5504}, + { "issue_10576" , test_issue_10567_metadata_preservation }, { 0 } }; From 128f9e480ea1eff0f79bda2908bb796e37800c6b Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Wed, 3 Dec 2025 00:00:28 +0100 Subject: [PATCH 3/6] tests: improve unit tests Signed-off-by: Mirko Lazarevic --- tests/internal/multiline.c | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index bf1c74001a9..d14167a6623 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1648,8 +1648,6 @@ static void test_buffer_limit_disabled() */ struct metadata_result { int current_record; - int total_expected; - char *key; int records_with_full_metadata; /* Count of records with full metadata */ int records_with_log_only; /* Count of records with only 'log' field */ }; @@ -1684,6 +1682,9 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, /* Extract timestamp and map */ flb_time_pop_from_msgpack(&tm, &result, &map); + /* Verify timestamp is not zero */ + TEST_CHECK(flb_time_to_nanosec(&tm) != 0L); + /* Count fields in the map */ field_count = map->via.map.size; fprintf(stdout, "[Record %d] Fields: %d, Timestamp: %lu.%lu\n", @@ -1705,8 +1706,7 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, fprintf(stdout, "\n"); /* Track metadata presence */ - /* 4 fields: time, stream, log, file*/ - if (field_count == 4) { + if (field_count > 1) { res->records_with_full_metadata++; } else { res->records_with_log_only++; @@ -1748,8 +1748,8 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ /* time field */ msgpack_pack_str(&mp_pck, 4); msgpack_pack_str_body(&mp_pck, "time", 4); - msgpack_pack_str(&mp_pck, 24); - msgpack_pack_str_body(&mp_pck, "2025-12-01T17:33:44+00:00", 24); + msgpack_pack_str(&mp_pck, 25); + msgpack_pack_str_body(&mp_pck, "2025-12-01T17:33:44+00:00", 25); /* stream field */ msgpack_pack_str(&mp_pck, 6); @@ -1783,6 +1783,7 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ /* Send to multiline processor */ ret = flb_ml_append_object(ml, stream_id, tm, NULL, map); + TEST_CHECK(ret == FLB_MULTILINE_OK); msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&mp_sbuf); @@ -1806,7 +1807,7 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ * Before fix: Lines 1, 2, 4, 5 would have only {"log": "..."} (missing metadata) * After fix: All lines should have {"time": "...", "stream": "...", "log": "...", "file": "..."} */ -static void test_issue_10567_metadata_preservation() +static void test_issue_10576() { int ret; int i; @@ -1872,10 +1873,6 @@ static void test_issue_10567_metadata_preservation() flb_ml_parser_instance_set(mlp_i, "key_content", "log"); - /* Initialize result tracking */ - res.key = "log"; - res.total_expected = num_lines; - /* Create stream */ ret = flb_ml_stream_create(ml, "test-stream", -1, flush_callback_metadata_check, @@ -1932,6 +1929,6 @@ TEST_LIST = { { "issue_4034" , test_issue_4034}, { "issue_4949" , test_issue_4949}, { "issue_5504" , test_issue_5504}, - { "issue_10576" , test_issue_10567_metadata_preservation }, + { "issue_10576" , test_issue_10576}, { 0 } }; From 2a27cbbce5a6e2f2744d9ddec093287f65cefb59 Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Wed, 3 Dec 2025 22:20:08 +0100 Subject: [PATCH 4/6] ml: handle TRUNCATED return case Addresses PR comments and adds correspoinding unit tests Signed-off-by: Mirko Lazarevic --- src/multiline/flb_ml.c | 4 +- tests/internal/multiline.c | 278 +++++++++++++++++++++++++++++-------- 2 files changed, 219 insertions(+), 63 deletions(-) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 4981f87af09..e73bd7c90d9 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -269,7 +269,7 @@ static int package_content(struct flb_ml_stream *mst, truncated = FLB_TRUE; } - if (stream_group->mp_sbuf.size == 0) { + if (!truncated && stream_group->mp_sbuf.size == 0) { flb_ml_register_context(stream_group, tm, full_map); } @@ -346,7 +346,7 @@ static int package_content(struct flb_ml_stream *mst, processed = FLB_TRUE; } - if (processed && metadata != NULL) { + if (!truncated && processed && metadata != NULL) { msgpack_pack_object(&stream_group->mp_md_pck, *metadata); } diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index d14167a6623..a1108531861 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1644,32 +1644,47 @@ static void test_buffer_limit_disabled() } /* - * Helper struct to track flush results with metadata + * Unit tests for issue 10576: Metadata preservation in multiline filter + * https://github.com/fluent/fluent-bit/issues/10576 + * + */ + +/* + * Helper structure to track records with metadata verification */ struct metadata_result { int current_record; - int records_with_full_metadata; /* Count of records with full metadata */ - int records_with_log_only; /* Count of records with only 'log' field */ + int records_with_full_metadata; /* Count of records with metadata */ + int records_missing_metadata; /* Count of records with missing metadata */ + + /* Track metadata values for each record (for detailed verification) */ + char record_streams[10][32]; /* stream value for each record */ + char record_files[10][64]; /* file value for each record */ }; /* * Callback that verifies metadata preservation * * Before the fix: continuation lines would have only 1 field (log) - * After the fix: all lines should have multiple fields (time, stream, log, file, etc.) + * After the fix: all lines should have multiple fields (stream, log, file, etc.) */ static int flush_callback_metadata_check(struct flb_ml_parser *parser, - struct flb_ml_stream *mst, - void *data, char *buf_data, size_t buf_size) + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) { int ret; + int i; int field_count; size_t off = 0; msgpack_unpacked result; msgpack_object *map; - msgpack_object key; + msgpack_object key, val; struct flb_time tm; struct metadata_result *res = data; + int has_stream = 0; + int has_file = 0; + + fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", ANSI_YELLOW, ANSI_RESET); /* Unpack the record */ msgpack_unpacked_init(&result); @@ -1685,32 +1700,47 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, /* Verify timestamp is not zero */ TEST_CHECK(flb_time_to_nanosec(&tm) != 0L); - /* Count fields in the map */ + /* Count fields and check for stream/file */ field_count = map->via.map.size; - fprintf(stdout, "[Record %d] Fields: %d, Timestamp: %lu.%lu\n", - res->current_record, field_count, - (unsigned long)tm.tm.tv_sec, (unsigned long)tm.tm.tv_nsec); - - /* Print field names */ - fprintf(stdout, " Field names: "); - for (int i = 0; i < field_count; i++) { + for (i = 0; i < field_count; i++) { key = map->via.map.ptr[i].key; - if (key.type == MSGPACK_OBJECT_STR) { - fprintf(stdout, "%.*s", (int)key.via.str.size, key.via.str.ptr); - if (i < field_count - 1) { - fprintf(stdout, ", "); + val = map->via.map.ptr[i].val; + + if (key.type == MSGPACK_OBJECT_STR && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "stream", 6) == 0) { + has_stream = 1; + if (res->current_record < 10) { + size_t copy_len = val.via.str.size < 31 ? val.via.str.size : 31; + strncpy(res->record_streams[res->current_record], + val.via.str.ptr, copy_len); + res->record_streams[res->current_record][copy_len] = '\0'; + } + } + if (key.via.str.size == 4 && strncmp(key.via.str.ptr, "file", 4) == 0) { + has_file = 1; + if (res->current_record < 10) { + size_t copy_len = val.via.str.size < 63 ? val.via.str.size : 63; + strncpy(res->record_files[res->current_record], + val.via.str.ptr, copy_len); + res->record_files[res->current_record][copy_len] = '\0'; + } } } } - fprintf(stdout, "\n"); - /* Track metadata presence */ - if (field_count > 1) { + fprintf(stdout, "[Record %d] Fields: %d, stream=%s, file=%s\n", + res->current_record, field_count, + res->record_streams[res->current_record], + res->record_files[res->current_record]); + + /* Track metadata */ + if (has_stream && has_file) { res->records_with_full_metadata++; } else { - res->records_with_log_only++; - fprintf(stdout, " WARNING: Record has only 'log' field (missing metadata)\n"); + res->records_missing_metadata++; + fprintf(stdout, " WARNING: Record %d missing metadata (stream=%d, file=%d)\n", + res->current_record, has_stream, has_file); } res->current_record++; @@ -1719,12 +1749,14 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, return 0; } -static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_id, - struct flb_time *tm, const char *log_content, - const char *stream_name, const char *file_path) +/* + * Helper funciton to append log with custom stream/file metadata + */ +static int append_log_with_metadata(struct flb_ml *ml, uint64_t stream_id, + struct flb_time *tm, const char *log_content, + const char *stream_name, const char *file_path) { int ret; - int log_len; size_t off = 0; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -1732,9 +1764,6 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ msgpack_object root; msgpack_object *map; - log_len = strlen(log_content); - - /* initialize buffers */ msgpack_sbuffer_init(&mp_sbuf); msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); @@ -1742,14 +1771,8 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ msgpack_pack_array(&mp_pck, 2); flb_time_append_to_msgpack(tm, &mp_pck, 0); - /* Map with 4 fields: time (string), stream, log, file */ - msgpack_pack_map(&mp_pck, 4); - - /* time field */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "time", 4); - msgpack_pack_str(&mp_pck, 25); - msgpack_pack_str_body(&mp_pck, "2025-12-01T17:33:44+00:00", 25); + /* Map with 3 fields: stream, log, file */ + msgpack_pack_map(&mp_pck, 3); /* stream field */ msgpack_pack_str(&mp_pck, 6); @@ -1760,8 +1783,8 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ /* log field */ msgpack_pack_str(&mp_pck, 3); msgpack_pack_str_body(&mp_pck, "log", 3); - msgpack_pack_str(&mp_pck, log_len); - msgpack_pack_str_body(&mp_pck, log_content, log_len); + msgpack_pack_str(&mp_pck, strlen(log_content)); + msgpack_pack_str_body(&mp_pck, log_content, strlen(log_content)); /* file field */ msgpack_pack_str(&mp_pck, 4); @@ -1783,7 +1806,6 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ /* Send to multiline processor */ ret = flb_ml_append_object(ml, stream_id, tm, NULL, map); - TEST_CHECK(ret == FLB_MULTILINE_OK); msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&mp_sbuf); @@ -1791,21 +1813,15 @@ static int append_log_to_multiline_processor(struct flb_ml *ml, uint64_t stream_ return ret; } - /* * Test issue 10576: Metadata preservation when lines are flushed * --------------------- - * - https://github.com/fluent/fluent-bit/issues/10576 + * https://github.com/fluent/fluent-bit/issues/10576 * - * Input pattern just as an example: - * - Line 1: "non-iso timestamp Likely to fail" (matches continuation) - * - Line 2: "Likely to fail" (matches continuation) - * - Line 3: "[iso timestamp] should be ok" (matches start_state) - * - Line 4: "non-iso timestamp Likely to fail" (matches continuation) - * - Line 5: "non-iso timestamp Likely to fail" (matches continuation) + * Scenario: Simulate slow log arrival by flushing after each line. * - * Before fix: Lines 1, 2, 4, 5 would have only {"log": "..."} (missing metadata) - * After fix: All lines should have {"time": "...", "stream": "...", "log": "...", "file": "..."} + * Before fix: Continuation lines would have only {"log": "..."} (missing metadata) + * After fix: All lines should have {"stream": "...", "log": "...", "file": "..."} */ static void test_issue_10576() { @@ -1879,27 +1895,25 @@ static void test_issue_10576() (void *)&res, &stream_id); TEST_CHECK(ret == 0); - /* Send each line and flush immediately after */ + /* Send each line and flush immediately after (simulate slow log arrival) */ for (i = 0; i < num_lines; i++) { flb_time_get(&tm); fprintf(stdout, "Input[%d]: %s\n", i, test_lines[i]); - ret = append_log_to_multiline_processor(ml, stream_id, &tm, test_lines[i], - "stdout", "/var/log/test.log"); - TEST_CHECK(ret == 0); + ret = append_log_with_metadata(ml, stream_id, &tm, test_lines[i], + "stdout", "/var/log/test.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); - /* Note: Flush after each line to simulate slow log arrival */ + /* Flush after each line to simulate slow log arrival */ flb_ml_flush_pending_now(ml); } /* Final flush to ensure nothing is left */ flb_ml_flush_pending_now(ml); - /* - * After the fix, ALL records should have full metadata. - */ - TEST_CHECK(res.records_with_log_only == 0); + /* Assertions: ALL records should have full metadata */ + TEST_CHECK(res.records_missing_metadata == 0); TEST_CHECK(res.records_with_full_metadata == res.current_record); TEST_CHECK(res.current_record == num_lines); @@ -1908,6 +1922,147 @@ static void test_issue_10576() flb_config_exit(config); } +/* + * Test issue 10576: Verify context is NOT registered after truncation + * --------------------- + * https://github.com/fluent/fluent-bit/issues/10576 + * + * Steps: + * 1. Set a small buffer_limit (80 bytes) + * 2. Send a start_state line with metadata (stream=stdout, file=app1.log) + * 3. Send a long continuation line to trigger truncation + * 4. Send a new start_state with DIFFERENT metadata (stream=stderr, file=app2.log) + * 5. Verify each record has its OWN correct metadata + * + * Result: + * - Record 0 (truncated): stream=stdout, file=/var/log/app1.log + * - Record 1 (new start): stream=stderr, file=/var/log/app2.log + */ +static void test_issue_truncation_10576() +{ + int ret; + uint64_t stream_id; + struct flb_config *config; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct flb_time tm; + struct metadata_result res = {0}; + + /* a long string that will cause truncation */ + char long_line[200]; + memset(long_line, 'X', sizeof(long_line) - 1); + long_line[sizeof(long_line) - 1] = '\0'; + + config = flb_config_init(); + TEST_CHECK(config != NULL); + + if (config->multiline_buffer_limit) { + flb_free(config->multiline_buffer_limit); + } + config->multiline_buffer_limit = flb_strdup("80"); + + /* Create ML context */ + ml = flb_ml_create(config, "truncation-context-test"); + TEST_CHECK(ml != NULL); + + /* Create custom multiline parser */ + mlp = flb_ml_parser_create(config, + "truncation_parser_10576", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser */ + NULL); /* parser_name */ + TEST_CHECK(mlp != NULL); + + /* start_state - matches [timestamp] format */ + ret = flb_ml_rule_create(mlp, "start_state", + "/^\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\]/", + "cont", NULL); + TEST_CHECK(ret == 0); + + /* cont - matches lines NOT starting with [timestamp] */ + ret = flb_ml_rule_create(mlp, "cont", + "/^(?!\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\])/", + "cont", NULL); + TEST_CHECK(ret == 0); + + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + mlp_i = flb_ml_parser_instance_create(ml, "truncation_parser_10576"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + /* Create stream */ + ret = flb_ml_stream_create(ml, "test-stream", -1, + flush_callback_metadata_check, + (void *)&res, &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + /* Append first line. It will match the start_state */ + ret = append_log_with_metadata(ml, stream_id, &tm, + "[2025-12-01T17:33:54.551Z] First line", + "stdout", "/var/log/app1.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* + * Append the second line. This will match the 'cont' state and concatenate. + * The concatenation will exceed the limit and correctly trigger truncation. + */ + ret = append_log_with_metadata(ml, stream_id, &tm, + long_line, + "stdout", "/var/log/app1.log"); + TEST_CHECK(ret == FLB_MULTILINE_TRUNCATED); + + + /* Append new line with new start_state with DIFFERENT metadata */ + ret = append_log_with_metadata(ml, stream_id, &tm, + "[2025-12-01T17:34:00.000Z] Second line", + "stderr", "/var/log/app2.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* Flush to get the second record */ + flb_ml_flush_pending_now(ml); + + /* Assertions */ + TEST_CHECK(res.current_record == 2); + TEST_CHECK(res.records_with_full_metadata == 2); + TEST_CHECK(res.records_missing_metadata == 0); + + /* Verify that each record has correct metadata */ + fprintf(stdout, "\n=== Metadata Verification ===\n"); + fprintf(stdout, "Record 0: stream='%s', file='%s'\n", + res.record_streams[0], res.record_files[0]); + fprintf(stdout, "Record 1: stream='%s', file='%s'\n", + res.record_streams[1], res.record_files[1]); + + /* Record 0: first group's metadata */ + TEST_CHECK(strcmp(res.record_streams[0], "stdout") == 0); + TEST_CHECK(strcmp(res.record_files[0], "/var/log/app1.log") == 0); + + /* Record 1: second group's metadata (NOT inherited from first) */ + TEST_CHECK(strcmp(res.record_streams[1], "stderr") == 0); + TEST_CHECK(strcmp(res.record_files[1], "/var/log/app2.log") == 0); + + if (strcmp(res.record_streams[1], "stderr") == 0 && + strcmp(res.record_files[1], "/var/log/app2.log") == 0) { + fprintf(stdout, "\nPASS: Second record has its own metadata!\n"); + } else { + fprintf(stdout, "\nFAIL: Second record inherited metadata from first group!\n"); + } + + flb_ml_destroy(ml); + flb_config_exit(config); +} TEST_LIST = { /* Normal features tests */ @@ -1930,5 +2085,6 @@ TEST_LIST = { { "issue_4949" , test_issue_4949}, { "issue_5504" , test_issue_5504}, { "issue_10576" , test_issue_10576}, + { "issue_truncation_10576", test_issue_truncation_10576 }, { 0 } }; From 1d8954fba76893d0c7f1f604915125569ef33fa0 Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Thu, 4 Dec 2025 09:25:00 +0100 Subject: [PATCH 5/6] tests: conding style fix Signed-off-by: Mirko Lazarevic --- tests/internal/multiline.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index a1108531861..9690eb3abec 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1737,7 +1737,8 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, /* Track metadata */ if (has_stream && has_file) { res->records_with_full_metadata++; - } else { + } + else { res->records_missing_metadata++; fprintf(stdout, " WARNING: Record %d missing metadata (stream=%d, file=%d)\n", res->current_record, has_stream, has_file); @@ -2056,7 +2057,8 @@ static void test_issue_truncation_10576() if (strcmp(res.record_streams[1], "stderr") == 0 && strcmp(res.record_files[1], "/var/log/app2.log") == 0) { fprintf(stdout, "\nPASS: Second record has its own metadata!\n"); - } else { + } + else { fprintf(stdout, "\nFAIL: Second record inherited metadata from first group!\n"); } From 79afb584007bc16dd59edc91f99247fd1e584ca7 Mon Sep 17 00:00:00 2001 From: Mirko Lazarevic Date: Thu, 4 Dec 2025 10:39:32 +0100 Subject: [PATCH 6/6] tests: fix typo Signed-off-by: Mirko Lazarevic --- tests/internal/multiline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index 9690eb3abec..e0223dde41f 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1751,7 +1751,7 @@ static int flush_callback_metadata_check(struct flb_ml_parser *parser, } /* - * Helper funciton to append log with custom stream/file metadata + * Helper function to append log with custom stream/file metadata */ static int append_log_with_metadata(struct flb_ml *ml, uint64_t stream_id, struct flb_time *tm, const char *log_content,