diff --git a/plugins/in_tail/tail.h b/plugins/in_tail/tail.h index a1c7784707d..dfb241a93df 100644 --- a/plugins/in_tail/tail.h +++ b/plugins/in_tail/tail.h @@ -33,6 +33,9 @@ #define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */ #define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */ +/* Database */ +#define FLB_TAIL_DB_ID_NONE 0 /* File not in database or deleted */ + /* Config */ #define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */ #define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */ diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c index a8190e121d0..a95c4241cda 100644 --- a/plugins/in_tail/tail_db.c +++ b/plugins/in_tail/tail_db.c @@ -25,11 +25,13 @@ #include "tail_sql.h" #include "tail_file.h" -struct query_status { - int id; - int rows; - int64_t offset; -}; +/* Callback to detect if a query returned any rows */ +static int cb_column_exists(void *data, int argc, char **argv, char **cols) +{ + int *found = (int *)data; + *found = 1; + return 0; +} /* Open or create database required by tail plugin */ struct flb_sqldb *flb_tail_db_open(const char *path, @@ -38,6 +40,7 @@ struct flb_sqldb *flb_tail_db_open(const char *path, struct flb_config *config) { int ret; + int column_found; char tmp[64]; struct flb_sqldb *db; @@ -55,6 +58,54 @@ struct flb_sqldb *flb_tail_db_open(const char *path, return NULL; } + /* Check if 'skip' column exists (migration for older databases) */ + column_found = 0; + ret = flb_sqldb_query(db, + "SELECT 1 FROM pragma_table_info('in_tail_files') " + "WHERE name='skip';", + cb_column_exists, &column_found); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not query table info for 'skip' column"); + flb_sqldb_close(db); + return NULL; + } + if (column_found == 0) { + flb_plg_debug(ctx->ins, "db: migrating database, adding 'skip' column"); + ret = flb_sqldb_query(db, + "ALTER TABLE in_tail_files " + "ADD COLUMN skip INTEGER DEFAULT 0;", + NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not add 'skip' column"); + flb_sqldb_close(db); + return NULL; + } + } + + /* Check if 'anchor' column exists (migration for older databases) */ + column_found = 0; + ret = flb_sqldb_query(db, + "SELECT 1 FROM pragma_table_info('in_tail_files') " + "WHERE name='anchor';", + cb_column_exists, &column_found); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not query table info for 'anchor' column"); + flb_sqldb_close(db); + return NULL; + } + if (column_found == 0) { + flb_plg_debug(ctx->ins, "db: migrating database, adding 'anchor' column"); + ret = flb_sqldb_query(db, + "ALTER TABLE in_tail_files " + "ADD COLUMN anchor INTEGER DEFAULT 0;", + NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not add 'anchor' column"); + flb_sqldb_close(db); + return NULL; + } + } + if (ctx->db_sync >= 0) { snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, ctx->db_sync); @@ -130,7 +181,8 @@ static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx, */ static int db_file_exists(struct flb_tail_file *file, struct flb_tail_config *ctx, - uint64_t *id, uint64_t *inode, off_t *offset) + uint64_t *id, uint64_t *inode, + int64_t *offset, uint64_t *skip, int64_t *anchor) { int ret; int exists = FLB_FALSE; @@ -149,6 +201,8 @@ static int db_file_exists(struct flb_tail_file *file, /* name: column 1 */ name = sqlite3_column_text(ctx->stmt_get_file, 1); if (ctx->compare_filename && name == NULL) { + sqlite3_clear_bindings(ctx->stmt_get_file); + sqlite3_reset(ctx->stmt_get_file); flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id); return -1; } @@ -159,12 +213,18 @@ static int db_file_exists(struct flb_tail_file *file, /* inode: column 3 */ *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); + /* skip: column 6 */ + *skip = sqlite3_column_int64(ctx->stmt_get_file, 6); + + /* anchor: column 7 */ + *anchor = sqlite3_column_int64(ctx->stmt_get_file, 7); + /* Checking if the file's name and inode match exactly */ if (ctx->compare_filename) { if (flb_tail_target_file_name_cmp((char *) name, file) != 0) { exists = FLB_FALSE; flb_plg_debug(ctx->ins, "db: exists stale file from database:" - " id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64 + " id=%"PRIu64" inode=%"PRIu64" offset=%"PRId64 " name=%s file_inode=%"PRIu64" file_name=%s", *id, *inode, *offset, name, file->inode, file->name); @@ -199,6 +259,8 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset); sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode); sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); + sqlite3_bind_int64(ctx->stmt_insert_file, 5, file->skip_bytes); + sqlite3_bind_int64(ctx->stmt_insert_file, 6, file->anchor_offset); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_file); @@ -258,11 +320,13 @@ int flb_tail_db_file_set(struct flb_tail_file *file, { int ret; uint64_t id = 0; - off_t offset = 0; + int64_t offset = 0; + uint64_t skip = 0; + int64_t anchor = 0; uint64_t inode = 0; /* Check if the file exists */ - ret = db_file_exists(file, ctx, &id, &inode, &offset); + ret = db_file_exists(file, ctx, &id, &inode, &offset, &skip, &anchor); if (ret == -1) { flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64, file->inode); @@ -281,6 +345,18 @@ int flb_tail_db_file_set(struct flb_tail_file *file, else { file->db_id = id; file->offset = offset; + file->skip_bytes = skip; + file->anchor_offset = anchor; + + /* Initialize skipping mode if needed */ + if (file->skip_bytes > 0) { + file->exclude_bytes = file->skip_bytes; + file->skipping_mode = FLB_TRUE; + } + else { + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + } } return 0; @@ -294,7 +370,9 @@ int flb_tail_db_file_offset(struct flb_tail_file *file, /* Bind parameters */ sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset); - sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id); + sqlite3_bind_int64(ctx->stmt_offset, 2, file->skip_bytes); + sqlite3_bind_int64(ctx->stmt_offset, 3, file->anchor_offset); + sqlite3_bind_int64(ctx->stmt_offset, 4, file->db_id); ret = sqlite3_step(ctx->stmt_offset); @@ -363,6 +441,7 @@ int flb_tail_db_file_delete(struct flb_tail_file *file, } flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); + file->db_id = FLB_TAIL_DB_ID_NONE; return 0; } diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1624bd94c7a..b834945473f 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -44,6 +44,7 @@ #include "tail_dockermode.h" #include "tail_multiline.h" #include "tail_scan.h" +#include #ifdef FLB_SYSTEM_WINDOWS #include "win32.h" @@ -1027,6 +1028,8 @@ static int set_file_position(struct flb_tail_config *ctx, struct flb_tail_file *file) { int64_t ret; + int64_t seek_pos; + int has_db_position; #ifdef FLB_HAVE_SQLDB /* @@ -1036,12 +1039,37 @@ static int set_file_position(struct flb_tail_config *ctx, if (ctx->db) { ret = flb_tail_db_file_set(file, ctx); if (ret == 0) { - if (file->offset > 0) { - ret = lseek(file->fd, file->offset, SEEK_SET); + + /* + * Determine seek position based on file type and DB state: + * + * - Gzip files with anchor/skip info: use anchor_offset + * - Normal files or gzip migration fallback: use offset + * - No DB position + read_from_head=off: seek to EOF + */ + seek_pos = file->offset; + has_db_position = (file->offset > 0); + + /* Override for gzip files with proper anchor tracking */ + if (file->decompression_context != NULL && + (file->anchor_offset > 0 || file->skip_bytes > 0)) { + seek_pos = file->anchor_offset; + has_db_position = FLB_TRUE; + } + + if (has_db_position) { + ret = lseek(file->fd, seek_pos, SEEK_SET); if (ret == -1) { flb_errno(); return -1; } + file->offset = ret; + + /* Initialize skip state for gzip resume */ + if (file->decompression_context != NULL && file->skip_bytes > 0) { + file->exclude_bytes = file->skip_bytes; + file->skipping_mode = FLB_TRUE; + } } else if (ctx->read_from_head == FLB_FALSE) { ret = lseek(file->fd, 0, SEEK_END); @@ -1050,8 +1078,23 @@ static int set_file_position(struct flb_tail_config *ctx, return -1; } file->offset = ret; + file->anchor_offset = ret; flb_tail_db_file_offset(file, ctx); } + + if (file->decompression_context == NULL) { + file->stream_offset = file->offset; + } + else { + /* + * For single-member gzip, stream_offset = skip_bytes is correct. + * For multi-member gzip, skip_bytes only tracks bytes within the + * current member (reset at member boundaries), so stream_offset + * may not reflect the total decompressed bytes from prior members. + */ + file->stream_offset = file->skip_bytes; + } + return 0; } } @@ -1084,6 +1127,15 @@ static int set_file_position(struct flb_tail_config *ctx, if (file->decompression_context == NULL) { file->stream_offset = ret; } + else { + /* + * Compressed file without DB: no persistent state available. + * Initialize skip-related fields to 0 for code consistency. + */ + file->anchor_offset = file->offset; + file->exclude_bytes = 0; + file->stream_offset = 0; + } return 0; } @@ -1274,11 +1326,17 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); file->dmode_firstline = false; #ifdef FLB_HAVE_SQLDB - file->db_id = 0; + file->db_id = FLB_TAIL_DB_ID_NONE; #endif file->skip_next = FLB_FALSE; file->skip_warn = FLB_FALSE; + /* Initialize gzip resume fields */ + file->anchor_offset = 0; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + /* Multiline core mode */ if (ctx->ml_ctx) { /* @@ -1448,6 +1506,47 @@ void flb_tail_file_remove(struct flb_tail_file *file) flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", file->inode, file->name); + if (file->buf_len > 0) { + if (file->decompression_context == NULL) { + /* + * If there is data in the buffer, it means it was not processed. + * We must rewind the offset to ensure this data is re-read on restart. + */ + int64_t old_offset = file->offset; + + if (file->offset > file->buf_len) { + file->offset -= file->buf_len; + } + else { + file->offset = 0; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rewind offset for %s: " + "old=%jd new=%jd (buf_len=%zu)", + file->inode, file->name, + (intmax_t)old_offset, (intmax_t)file->offset, + file->buf_len); + +#ifdef FLB_HAVE_SQLDB + if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + else { + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" file=%s: buffered data (%zu bytes) " + "remains on exit for gzip input; cannot rewind raw " + "offset safely with streaming decompression", + file->inode, file->name, file->buf_len); +#ifdef FLB_HAVE_SQLDB + if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + } + if (file->decompression_context != NULL) { flb_decompression_context_destroy(file->decompression_context); } @@ -1566,6 +1665,10 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi file->inode, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; /* Update offset in the database file */ #ifdef FLB_HAVE_SQLDB @@ -1593,6 +1696,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) uint8_t *read_buffer; size_t read_size; size_t size; + size_t remain; char *tmp; int ret; int lines; @@ -1765,6 +1869,28 @@ int flb_tail_file_chunk(struct flb_tail_file *file) return FLB_TAIL_ERROR; } + if (file->skipping_mode == FLB_TRUE && decompressed_data_length > 0) { + flb_plg_debug(ctx->ins, + "Skipping: anchor=%jd offset=%jd " + "exclude=%zu decompressed=%zu", + (intmax_t)file->anchor_offset, + (intmax_t)file->offset, + file->exclude_bytes, decompressed_data_length); + if (file->exclude_bytes >= decompressed_data_length) { + file->exclude_bytes -= decompressed_data_length; + decompressed_data_length = 0; + } + else { + remain = decompressed_data_length - file->exclude_bytes; + memmove(&file->buf_data[file->buf_len], + &file->buf_data[file->buf_len + file->exclude_bytes], + remain); + decompressed_data_length = remain; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + } + } + stream_data_length = decompressed_data_length; } } @@ -1804,6 +1930,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file) file->buf_len -= processed_bytes; file->buf_data[file->buf_len] = '\0'; + if (file->decompression_context) { + file->skip_bytes += processed_bytes; + + /* + * Gzip member boundary: update anchor when we complete a member + * and all decompressed data is consumed. + */ + if (file->decompression_context->state == + FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER && + file->decompression_context->input_buffer_length == 0 && + file->buf_len == 0) { + flb_plg_debug(file->config->ins, + "Gzip member completed: updating anchor " + "from %jd to %jd, resetting skip from " + "%"PRIu64" to 0", + (intmax_t)file->anchor_offset, + (intmax_t)file->offset, + file->skip_bytes); + file->anchor_offset = file->offset; + file->skip_bytes = 0; + } + } + #ifdef FLB_HAVE_SQLDB if (file->config->db) { flb_tail_db_file_offset(file, file->config); diff --git a/plugins/in_tail/tail_file_internal.h b/plugins/in_tail/tail_file_internal.h index bca35ed67cf..2d9dfc770c2 100644 --- a/plugins/in_tail/tail_file_internal.h +++ b/plugins/in_tail/tail_file_internal.h @@ -43,6 +43,10 @@ struct flb_tail_file { int64_t size; int64_t offset; /* this represents the raw file offset, not the input data offset (see stream_offset) */ + int64_t anchor_offset; /* compressed: file offset at member start */ + uint64_t skip_bytes; /* compressed: decompressed bytes to skip */ + uint64_t exclude_bytes; /* compressed: runtime countdown during skip */ + int skipping_mode; /* compressed: skipping previously read data */ int64_t last_line; uint64_t dev_id; uint64_t inode; diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c index eee9babd343..7552b562ae8 100644 --- a/plugins/in_tail/tail_fs_inotify.c +++ b/plugins/in_tail/tail_fs_inotify.c @@ -267,6 +267,10 @@ static int tail_fs_event(struct flb_input_instance *ins, file->inode, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; /* Update offset in the database file */ #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_tail/tail_fs_stat.c b/plugins/in_tail/tail_fs_stat.c index 36eead51464..8f662d6f735 100644 --- a/plugins/in_tail/tail_fs_stat.c +++ b/plugins/in_tail/tail_fs_stat.c @@ -136,6 +136,10 @@ static int tail_fs_check(struct flb_input_instance *ins, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; memcpy(&fst->st, &st, sizeof(struct stat)); #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h index e157450e353..dca5952df14 100644 --- a/plugins/in_tail/tail_sql.h +++ b/plugins/in_tail/tail_sql.h @@ -34,21 +34,23 @@ " offset INTEGER," \ " inode INTEGER," \ " created INTEGER," \ - " rotated INTEGER DEFAULT 0" \ + " rotated INTEGER DEFAULT 0," \ + " skip INTEGER DEFAULT 0," \ + " anchor INTEGER DEFAULT 0" \ ");" #define SQL_GET_FILE \ "SELECT * from in_tail_files WHERE inode=@inode order by id desc;" -#define SQL_INSERT_FILE \ - "INSERT INTO in_tail_files (name, offset, inode, created)" \ - " VALUES (@name, @offset, @inode, @created);" +#define SQL_INSERT_FILE \ + "INSERT INTO in_tail_files (name, offset, inode, created, skip, anchor)" \ + " VALUES (@name, @offset, @inode, @created, @skip, @anchor);" #define SQL_ROTATE_FILE \ "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;" -#define SQL_UPDATE_OFFSET \ - "UPDATE in_tail_files set offset=@offset WHERE id=@id;" +#define SQL_UPDATE_OFFSET \ + "UPDATE in_tail_files set offset=@offset, skip=@skip, anchor=@anchor WHERE id=@id;" #define SQL_DELETE_FILE \ "DELETE FROM in_tail_files WHERE id=@id;" diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index 2b1ebb5228a..197208383b9 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -34,6 +34,10 @@ Approach for this tests is basing on filter_kubernetes tests #include #include #include +#include +#ifdef FLB_HAVE_SQLDB +#include +#endif #include "flb_tests_runtime.h" #define NEW_LINE "\n" @@ -267,10 +271,193 @@ static ssize_t write_msg(struct test_tail_ctx *ctx, char *msg, size_t msg_len) return w_byte; } +/* + * Write raw data to file with optional newline. + * If msg is NULL, only writes a newline (useful for completing incomplete lines). + * If add_newline is FLB_FALSE, data remains in tail buffer as incomplete line. + */ +static ssize_t write_raw(struct test_tail_ctx *ctx, char *msg, size_t msg_len, + int add_newline) +{ + int i; + ssize_t w_byte = 0; + + for (i = 0; i < ctx->fd_num; i++) { + if (msg != NULL && msg_len > 0) { + w_byte = write(ctx->fds[i], msg, msg_len); + if (!TEST_CHECK(w_byte == msg_len)) { + TEST_MSG("write failed ret=%ld", w_byte); + return -1; + } + } + if (add_newline) { + w_byte = write(ctx->fds[i], NEW_LINE, strlen(NEW_LINE)); + if (!TEST_CHECK(w_byte == strlen(NEW_LINE))) { + TEST_MSG("write newline failed ret=%ld", w_byte); + return -1; + } + } + fsync(ctx->fds[i]); + } + return w_byte; +} + #define DPATH FLB_TESTS_DATA_PATH "/data/tail" #define MAX_LINES 32 +/* Gzip helpers */ +static int create_gzip_file(const char *path, const char *data, size_t len) +{ + int ret; + void *gz_data; + size_t gz_len; + FILE *fp; + + ret = flb_gzip_compress((void *)data, len, &gz_data, &gz_len); + if (ret != 0) { + return -1; + } + + fp = fopen(path, "wb"); + if (!fp) { + flb_free(gz_data); + return -1; + } + + if (fwrite(gz_data, 1, gz_len, fp) != gz_len) { + fclose(fp); + flb_free(gz_data); + return -1; + } + fclose(fp); + printf("Created gzip file %s size=%lu\n", path, (unsigned long)gz_len); + flb_free(gz_data); + + return 0; +} + +static int append_gzip_file(const char *path, const char *data, size_t len) +{ + int ret; + void *gz_data; + size_t gz_len; + FILE *fp; + + ret = flb_gzip_compress((void *)data, len, &gz_data, &gz_len); + if (ret != 0) { + return -1; + } + + fp = fopen(path, "ab"); + if (!fp) { + flb_free(gz_data); + return -1; + } + + if (fwrite(gz_data, 1, gz_len, fp) != gz_len) { + fclose(fp); + flb_free(gz_data); + return -1; + } + fclose(fp); + flb_free(gz_data); + + return 0; +} + +struct test_ctx { + int count; + int found_line2; +}; + +static int cb_check_gzip_resume(void *record, size_t size, void *data) +{ + struct test_ctx *ctx = data; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object v; + size_t off = 0; + int i; + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) { + root = result.data; + if (root.type == MSGPACK_OBJECT_ARRAY && root.via.array.size == 2) { + ctx->count++; + + /* Check content for "line2" */ + val = root.via.array.ptr[1]; /* map */ + if (val.type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < val.via.map.size; i++) { + key = val.via.map.ptr[i].key; + v = val.via.map.ptr[i].val; + if (key.type == MSGPACK_OBJECT_STR && + key.via.str.size == 3 && + memcmp(key.via.str.ptr, "log", 3) == 0) { + if (v.type == MSGPACK_OBJECT_STR) { + if (v.via.str.size >= 5 && + memcmp(v.via.str.ptr, "line2", 5) == 0) { + ctx->found_line2 = 1; + } + } + } + } + } + } + } + msgpack_unpacked_destroy(&result); + + flb_free(record); + return 0; +} + +#ifdef FLB_HAVE_SQLDB +/* + * Helper function to get current file offset from tail DB + */ +static int64_t get_db_offset(const char *db_path, const char *file_name) +{ + sqlite3 *db; + sqlite3_stmt *stmt; + int rc; + int64_t offset = -1; + char like_pattern[256]; + const char *query = "SELECT offset FROM in_tail_files WHERE name LIKE ? LIMIT 1;"; + + rc = sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, NULL); + if (rc != SQLITE_OK) { + return -1; + } + + /* + * Table name is 'in_tail_files' + * We need to find the offset for the specific file. + * Since the test uses a specific filename, we can query by name. + */ + rc = sqlite3_prepare_v2(db, query, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + sqlite3_close(db); + return -1; + } + + snprintf(like_pattern, sizeof(like_pattern), "%%%s%%", file_name); + sqlite3_bind_text(stmt, 1, like_pattern, -1, SQLITE_TRANSIENT); + + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + offset = sqlite3_column_int64(stmt, 0); + } + + sqlite3_finalize(stmt); + sqlite3_close(db); + + return offset; +} +#endif /* FLB_HAVE_SQLDB */ + int64_t result_time; struct tail_test_result { const char *target; @@ -311,6 +498,37 @@ void wait_with_timeout(uint32_t timeout_ms, struct tail_test_result *result, int } } +/* + * Wait until output count reaches expected value or timeout. + * Returns the final count. + */ +static int wait_for_count_with_timeout(int *count_ptr, int expected, uint32_t timeout_ms) +{ + struct flb_time start_time; + struct flb_time end_time; + struct flb_time diff_time; + uint64_t elapsed_time_flb = 0; + + flb_time_get(&start_time); + + while (1) { + if (*count_ptr >= expected) { + return *count_ptr; + } + + flb_time_msleep(50); + flb_time_get(&end_time); + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + + if (elapsed_time_flb > timeout_ms) { + break; + } + } + + return *count_ptr; +} + void wait_num_with_timeout(uint32_t timeout_ms, int *output_num) { struct flb_time start_time; @@ -2649,6 +2867,552 @@ void flb_test_db_compare_filename() test_tail_ctx_destroy(ctx); unlink(db); } + +/* + * Test: flb_test_db_offset_rewind_on_shutdown + * + * This test verifies that unprocessed buffered data is not lost on shutdown. + * When Fluent Bit shuts down with data still in the buffer, the offset should + * be rewound so that the data is re-read on restart. + * + * Scenario: + * 1. Start Fluent Bit with DB enabled + * 2. Write initial data and wait for it to be processed + * 3. Write additional data and immediately stop (before flush) + * 4. Restart Fluent Bit + * 5. Verify that the data written before shutdown is re-read + */ + +void flb_test_db_offset_rewind_on_shutdown() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"test_offset_rewind.log"}; + char *db = "test_offset_rewind.db"; + char *msg_init = "initial message"; + char *msg_before_shutdown = "message before shutdown"; + int i; + int ret; + int num; + int unused; + + unlink(file[0]); + unlink(db); + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + /* First run: write initial data */ + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Set debug log level to see offset rewind messages */ + flb_service_set(ctx->flb, "Log_Level", "debug", NULL); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Write initial message */ + ret = write_msg(ctx, msg_init, strlen(msg_init)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Wait for data to be processed */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 1)) { + TEST_MSG("initial message not received. expect=1 got=%d", num); + } + + /* + * Wait for tail to READ the file into buffer (advancing the DB offset). + * Instead of sleeping, we poll the DB until the offset increases. + * The offset should advance by the length of msg_before_shutdown. + */ + int64_t start_offset = -1; + int64_t current_offset = -1; + int attempts = 0; + + /* Get initial offset (should be after the first message) */ + while (start_offset == -1 && attempts < 20) { + start_offset = get_db_offset(db, file[0]); + if (start_offset != -1) { + break; + } + flb_time_msleep(100); + attempts++; + } + + if (!TEST_CHECK(start_offset >= 0)) { + TEST_MSG("failed to get initial db offset"); + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* + * Write message WITHOUT newline - this will remain in the tail buffer + * as an incomplete line, which is the scenario we want to test. + * The tail plugin processes complete lines (ending with \n), so + * data without newline stays in buf_len until more data arrives. + */ + ret = write_raw(ctx, msg_before_shutdown, strlen(msg_before_shutdown), FLB_FALSE); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Poll loop */ + attempts = 0; + while (attempts < 20) { /* Max 2 seconds wait */ + current_offset = get_db_offset(db, file[0]); + if (current_offset > start_offset) { + break; + } + flb_time_msleep(100); + attempts++; + } + + if (!TEST_CHECK(current_offset > start_offset)) { + TEST_MSG("DB offset did not advance. start=%ld current=%ld", + start_offset, current_offset); + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Close file descriptors before stopping */ + if (ctx->fds != NULL) { + for (i = 0; i < ctx->fd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + ctx->fds = NULL; + } + + /* Stop immediately - simulating abrupt shutdown */ + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* Second run: restart and verify data is re-read */ + clear_output_num(); + num = get_output_num(); + if (!TEST_CHECK(num == 0)) { + TEST_MSG("output count not cleared. expect=0 got=%d", num); + } + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed on restart"); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Set debug log level to see offset rewind messages */ + flb_service_set(ctx->flb, "Log_Level", "debug", NULL); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Restart the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* + * Write a newline to complete the incomplete line from before shutdown. + * This simulates the scenario where more data arrives after restart, + * completing the previously incomplete line. + */ + ret = write_raw(ctx, NULL, 0, FLB_TRUE); + if (!TEST_CHECK(ret > 0)) { + TEST_MSG("write newline failed"); + } + + /* Wait for data to be processed */ + flb_time_msleep(500); + + num = get_output_num(); + /* + * After restart, we expect to receive the message that was written + * before shutdown. If the offset rewind fix works correctly, + * msg_before_shutdown should be re-read. + * We expect at least 1 message (msg_before_shutdown). + */ + if (!TEST_CHECK(num == 1)) { + TEST_MSG("data loss detected after restart. expect==1 got=%d", num); + } + + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); +} + +/* Test case for Gzip resume data loss regression */ +void flb_test_db_gzip_resume_loss() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_resume.log.gz"; + char *db_file = "test_gzip_resume.db"; + const char *content1 = "line1\nline2"; + const char *content2 = "\nline3\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create Gzip file with incomplete line at end */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for output count to reach 1 */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Only line1 */ + + /* 3. Restart Fluent Bit */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + t_ctx.count = 0; + t_ctx.found_line2 = 0; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for output count to reach 2 (line2 + line3) */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.found_line2 == 1); + TEST_CHECK(t_ctx.count == 2); + + unlink(log_file); + unlink(db_file); +} + + +void flb_test_db_gzip_inotify_append() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_inotify.log.gz"; + char *db_file = "test_gzip_inotify.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + + cb.cb = cb_check_gzip_resume; /* Reusing callback as it counts lines */ + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create initial Gzip file */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + /* explicit refresh_interval to be sure, though inotify is default */ + "refresh_interval", "1", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for initial read */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + TEST_CHECK(t_ctx.count == 1); + + /* 3. Append to Gzip file while running (Simulate Inotify Event) */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + /* Wait for inotify/refresh and processing */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + /* 4. Verify total count */ + TEST_CHECK(t_ctx.count == 2); + /* Verify line2 was actually processed */ + TEST_CHECK(t_ctx.found_line2 == 1); + + flb_stop(ctx); + flb_destroy(ctx); + unlink(log_file); + unlink(db_file); +} + + +void flb_test_db_gzip_rotation() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_rotate.log.gz"; + char *rot_file = "test_gzip_rotate.log.gz.1"; + char *db_file = "test_gzip_rotate.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + const char *content3 = "line3_new\n"; + const char *content4 = "line4_old\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(rot_file); + unlink(db_file); + + /* 1. Create initial Gzip file */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "refresh_interval", "1", + "rotate_wait", "5", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for initial read */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + TEST_CHECK(t_ctx.count == 1); + + /* 3. Rotate file: Rename .gz -> .gz.1 */ + ret = rename(log_file, rot_file); + TEST_CHECK(ret == 0); + + /* 4. Create NEW file with same name immediately */ + TEST_CHECK(create_gzip_file(log_file, content2, strlen(content2)) == 0); + + /* Wait for rotation detection and new file processing */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + /* 5. Append to BOTH files within rotate_wait window */ + /* 5a. Append to new file */ + TEST_CHECK(append_gzip_file(log_file, content3, strlen(content3)) == 0); + + /* 5b. Append to OLD file (rotated) - should still be monitored */ + TEST_CHECK(append_gzip_file(rot_file, content4, strlen(content4)) == 0); + + /* Wait for processing */ + wait_for_count_with_timeout(&t_ctx.count, 4, 2000); + + /* 6. Verify total count */ + TEST_CHECK(t_ctx.count == 4); + + flb_stop(ctx); + flb_destroy(ctx); + unlink(log_file); + unlink(rot_file); + unlink(db_file); +} + + +void flb_test_db_gzip_multi_resume() +{ + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_multi.log.gz"; + char *db_file = "test_gzip_multi.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + const char *content3 = "line3\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create file with Line1 */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start (Run 1) */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Processed Line1 */ + t_ctx.count = 0; + + /* 3. Restart (Run 2) -> Should SKIP Line1 and process Line2 */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Should process ONLY line2 */ + t_ctx.count = 0; + + /* 4. Restart (Run 3) -> Should SKIP Line1+Line2 and process Line3 */ + TEST_CHECK(append_gzip_file(log_file, content3, strlen(content3)) == 0); + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); + + unlink(log_file); + unlink(db_file); +} + #endif /* FLB_HAVE_SQLDB */ /* Test list */ @@ -2680,6 +3444,11 @@ TEST_LIST = { {"db", flb_test_db}, {"db_delete_stale_file", flb_test_db_delete_stale_file}, {"db_compare_filename", flb_test_db_compare_filename}, + {"db_offset_rewind_on_shutdown", flb_test_db_offset_rewind_on_shutdown}, + {"db_gzip_resume_loss", flb_test_db_gzip_resume_loss }, + {"db_gzip_inotify_append", flb_test_db_gzip_inotify_append }, + {"db_gzip_rotation", flb_test_db_gzip_rotation }, + {"db_gzip_multi_resume", flb_test_db_gzip_multi_resume }, #endif #ifdef FLB_HAVE_UNICODE_ENCODER