diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 3050286cd2c..e73bd7c90d9 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -268,6 +268,11 @@ static int package_content(struct flb_ml_stream *mst, if (ret == FLB_MULTILINE_TRUNCATED) { truncated = FLB_TRUE; } + + if (!truncated && stream_group->mp_sbuf.size == 0) { + flb_ml_register_context(stream_group, tm, full_map); + } + processed = FLB_TRUE; } else if (type == FLB_ML_ENDSWITH) { @@ -341,7 +346,7 @@ static int package_content(struct flb_ml_stream *mst, processed = FLB_TRUE; } - if (processed && metadata != NULL) { + if (!truncated && processed && metadata != NULL) { msgpack_pack_object(&stream_group->mp_md_pck, *metadata); } diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index 4a9ef50df87..e0223dde41f 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1643,6 +1643,429 @@ static void test_buffer_limit_disabled() flb_config_exit(config); } +/* + * Unit tests for issue 10576: Metadata preservation in multiline filter + * https://github.com/fluent/fluent-bit/issues/10576 + * + */ + +/* + * Helper structure to track records with metadata verification + */ +struct metadata_result { + int current_record; + int records_with_full_metadata; /* Count of records with metadata */ + int records_missing_metadata; /* Count of records with missing metadata */ + + /* Track metadata values for each record (for detailed verification) */ + char record_streams[10][32]; /* stream value for each record */ + char record_files[10][64]; /* file value for each record */ +}; + +/* + * Callback that verifies metadata preservation + * + * Before the fix: continuation lines would have only 1 field (log) + * After the fix: all lines should have multiple fields (stream, log, file, etc.) + */ +static int flush_callback_metadata_check(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + int ret; + int i; + int field_count; + size_t off = 0; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object key, val; + struct flb_time tm; + struct metadata_result *res = data; + int has_stream = 0; + int has_file = 0; + + fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", ANSI_YELLOW, ANSI_RESET); + + /* Unpack the record */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf_data, buf_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + /* Extract timestamp and map */ + flb_time_pop_from_msgpack(&tm, &result, &map); + + /* Verify timestamp is not zero */ + TEST_CHECK(flb_time_to_nanosec(&tm) != 0L); + + /* Count fields and check for stream/file */ + field_count = map->via.map.size; + + for (i = 0; i < field_count; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + + if (key.type == MSGPACK_OBJECT_STR && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "stream", 6) == 0) { + has_stream = 1; + if (res->current_record < 10) { + size_t copy_len = val.via.str.size < 31 ? val.via.str.size : 31; + strncpy(res->record_streams[res->current_record], + val.via.str.ptr, copy_len); + res->record_streams[res->current_record][copy_len] = '\0'; + } + } + if (key.via.str.size == 4 && strncmp(key.via.str.ptr, "file", 4) == 0) { + has_file = 1; + if (res->current_record < 10) { + size_t copy_len = val.via.str.size < 63 ? val.via.str.size : 63; + strncpy(res->record_files[res->current_record], + val.via.str.ptr, copy_len); + res->record_files[res->current_record][copy_len] = '\0'; + } + } + } + } + + fprintf(stdout, "[Record %d] Fields: %d, stream=%s, file=%s\n", + res->current_record, field_count, + res->record_streams[res->current_record], + res->record_files[res->current_record]); + + /* Track metadata */ + if (has_stream && has_file) { + res->records_with_full_metadata++; + } + else { + res->records_missing_metadata++; + fprintf(stdout, " WARNING: Record %d missing metadata (stream=%d, file=%d)\n", + res->current_record, has_stream, has_file); + } + + res->current_record++; + msgpack_unpacked_destroy(&result); + + return 0; +} + +/* + * Helper function to append log with custom stream/file metadata + */ +static int append_log_with_metadata(struct flb_ml *ml, uint64_t stream_id, + struct flb_time *tm, const char *log_content, + const char *stream_name, const char *file_path) +{ + int ret; + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *map; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Array: [timestamp, map] */ + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(tm, &mp_pck, 0); + + /* Map with 3 fields: stream, log, file */ + msgpack_pack_map(&mp_pck, 3); + + /* stream field */ + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "stream", 6); + msgpack_pack_str(&mp_pck, strlen(stream_name)); + msgpack_pack_str_body(&mp_pck, stream_name, strlen(stream_name)); + + /* log field */ + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + msgpack_pack_str(&mp_pck, strlen(log_content)); + msgpack_pack_str_body(&mp_pck, log_content, strlen(log_content)); + + /* file field */ + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "file", 4); + msgpack_pack_str(&mp_pck, strlen(file_path)); + msgpack_pack_str_body(&mp_pck, file_path, strlen(file_path)); + + /* Unpack and lookup the content map */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return -1; + } + + root = result.data; + map = &root.via.array.ptr[1]; + + /* Send to multiline processor */ + ret = flb_ml_append_object(ml, stream_id, tm, NULL, map); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + + return ret; +} + +/* + * Test issue 10576: Metadata preservation when lines are flushed + * --------------------- + * https://github.com/fluent/fluent-bit/issues/10576 + * + * Scenario: Simulate slow log arrival by flushing after each line. + * + * Before fix: Continuation lines would have only {"log": "..."} (missing metadata) + * After fix: All lines should have {"stream": "...", "log": "...", "file": "..."} + */ +static void test_issue_10576() +{ + int ret; + int i; + uint64_t stream_id; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct metadata_result res = {0}; + + /* Test input - mix of start_state and continuation lines */ + const char *test_lines[] = { + "Mon Dec 1 17:33:44 UTC 2025 Likely to fail", /* continuation (no [timestamp]) */ + "Mon Dec 1 17:33:49 UTC 2025 Likely to fail", /* continuation */ + "[2025-12-01T17:33:54.551Z] should be ok", /* start_state */ + "Mon Dec 1 17:33:59 UTC 2025 Likely to fail", /* continuation */ + "Mon Dec 1 17:34:04 UTC 2025 Likely to fail", /* continuation */ + "[2025-12-01T17:34:09.555Z] should be ok", /* start_state */ + }; + + int num_lines = sizeof(test_lines) / sizeof(test_lines[0]); + + /* Initialize */ + config = flb_config_init(); + TEST_CHECK(config != NULL); + + /* Create custom multiline parser */ + mlp = flb_ml_parser_create(config, + "parser_10576", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser */ + NULL); /* parser_name */ + TEST_CHECK(mlp != NULL); + + /* start_state - matches [YYYY-MM-DDTHH:MM:SS.sssZ] format */ + ret = flb_ml_rule_create(mlp, "start_state", + "/^\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\]/", + "cont", NULL); + TEST_CHECK(ret == 0); + + /* cont - matches lines NOT starting with [timestamp] */ + ret = flb_ml_rule_create(mlp, "cont", + "/^(?!\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\])/", + "cont", NULL); + TEST_CHECK(ret == 0); + + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + /* Create ML context */ + ml = flb_ml_create(config, "test-metadata"); + TEST_CHECK(ml != NULL); + + mlp_i = flb_ml_parser_instance_create(ml, "parser_10576"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + /* Create stream */ + ret = flb_ml_stream_create(ml, "test-stream", -1, + flush_callback_metadata_check, + (void *)&res, &stream_id); + TEST_CHECK(ret == 0); + + /* Send each line and flush immediately after (simulate slow log arrival) */ + for (i = 0; i < num_lines; i++) { + flb_time_get(&tm); + + fprintf(stdout, "Input[%d]: %s\n", i, test_lines[i]); + + ret = append_log_with_metadata(ml, stream_id, &tm, test_lines[i], + "stdout", "/var/log/test.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* Flush after each line to simulate slow log arrival */ + flb_ml_flush_pending_now(ml); + } + + /* Final flush to ensure nothing is left */ + flb_ml_flush_pending_now(ml); + + /* Assertions: ALL records should have full metadata */ + TEST_CHECK(res.records_missing_metadata == 0); + TEST_CHECK(res.records_with_full_metadata == res.current_record); + TEST_CHECK(res.current_record == num_lines); + + /* Cleanup */ + flb_ml_destroy(ml); + flb_config_exit(config); +} + +/* + * Test issue 10576: Verify context is NOT registered after truncation + * --------------------- + * https://github.com/fluent/fluent-bit/issues/10576 + * + * Steps: + * 1. Set a small buffer_limit (80 bytes) + * 2. Send a start_state line with metadata (stream=stdout, file=app1.log) + * 3. Send a long continuation line to trigger truncation + * 4. Send a new start_state with DIFFERENT metadata (stream=stderr, file=app2.log) + * 5. Verify each record has its OWN correct metadata + * + * Result: + * - Record 0 (truncated): stream=stdout, file=/var/log/app1.log + * - Record 1 (new start): stream=stderr, file=/var/log/app2.log + */ +static void test_issue_truncation_10576() +{ + int ret; + uint64_t stream_id; + struct flb_config *config; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct flb_time tm; + struct metadata_result res = {0}; + + /* a long string that will cause truncation */ + char long_line[200]; + memset(long_line, 'X', sizeof(long_line) - 1); + long_line[sizeof(long_line) - 1] = '\0'; + + config = flb_config_init(); + TEST_CHECK(config != NULL); + + if (config->multiline_buffer_limit) { + flb_free(config->multiline_buffer_limit); + } + config->multiline_buffer_limit = flb_strdup("80"); + + /* Create ML context */ + ml = flb_ml_create(config, "truncation-context-test"); + TEST_CHECK(ml != NULL); + + /* Create custom multiline parser */ + mlp = flb_ml_parser_create(config, + "truncation_parser_10576", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser */ + NULL); /* parser_name */ + TEST_CHECK(mlp != NULL); + + /* start_state - matches [timestamp] format */ + ret = flb_ml_rule_create(mlp, "start_state", + "/^\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\]/", + "cont", NULL); + TEST_CHECK(ret == 0); + + /* cont - matches lines NOT starting with [timestamp] */ + ret = flb_ml_rule_create(mlp, "cont", + "/^(?!\\[\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z\\])/", + "cont", NULL); + TEST_CHECK(ret == 0); + + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + mlp_i = flb_ml_parser_instance_create(ml, "truncation_parser_10576"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + /* Create stream */ + ret = flb_ml_stream_create(ml, "test-stream", -1, + flush_callback_metadata_check, + (void *)&res, &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + /* Append first line. It will match the start_state */ + ret = append_log_with_metadata(ml, stream_id, &tm, + "[2025-12-01T17:33:54.551Z] First line", + "stdout", "/var/log/app1.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* + * Append the second line. This will match the 'cont' state and concatenate. + * The concatenation will exceed the limit and correctly trigger truncation. + */ + ret = append_log_with_metadata(ml, stream_id, &tm, + long_line, + "stdout", "/var/log/app1.log"); + TEST_CHECK(ret == FLB_MULTILINE_TRUNCATED); + + + /* Append new line with new start_state with DIFFERENT metadata */ + ret = append_log_with_metadata(ml, stream_id, &tm, + "[2025-12-01T17:34:00.000Z] Second line", + "stderr", "/var/log/app2.log"); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* Flush to get the second record */ + flb_ml_flush_pending_now(ml); + + /* Assertions */ + TEST_CHECK(res.current_record == 2); + TEST_CHECK(res.records_with_full_metadata == 2); + TEST_CHECK(res.records_missing_metadata == 0); + + /* Verify that each record has correct metadata */ + fprintf(stdout, "\n=== Metadata Verification ===\n"); + fprintf(stdout, "Record 0: stream='%s', file='%s'\n", + res.record_streams[0], res.record_files[0]); + fprintf(stdout, "Record 1: stream='%s', file='%s'\n", + res.record_streams[1], res.record_files[1]); + + /* Record 0: first group's metadata */ + TEST_CHECK(strcmp(res.record_streams[0], "stdout") == 0); + TEST_CHECK(strcmp(res.record_files[0], "/var/log/app1.log") == 0); + + /* Record 1: second group's metadata (NOT inherited from first) */ + TEST_CHECK(strcmp(res.record_streams[1], "stderr") == 0); + TEST_CHECK(strcmp(res.record_files[1], "/var/log/app2.log") == 0); + + if (strcmp(res.record_streams[1], "stderr") == 0 && + strcmp(res.record_files[1], "/var/log/app2.log") == 0) { + fprintf(stdout, "\nPASS: Second record has its own metadata!\n"); + } + else { + fprintf(stdout, "\nFAIL: Second record inherited metadata from first group!\n"); + } + + flb_ml_destroy(ml); + flb_config_exit(config); +} + TEST_LIST = { /* Normal features tests */ { "parser_docker", test_parser_docker}, @@ -1663,5 +2086,7 @@ TEST_LIST = { { "issue_4034" , test_issue_4034}, { "issue_4949" , test_issue_4949}, { "issue_5504" , test_issue_5504}, + { "issue_10576" , test_issue_10576}, + { "issue_truncation_10576", test_issue_truncation_10576 }, { 0 } };