Skip to content

Commit 8640940

Browse files
committed
in_tail: fix gzip data loss on resume and update DB schema
Previously, when tailing gzip files, there was no mechanism to persistently store the uncompressed position ('skip_bytes'). This meant that upon restart, the plugin could not correctly locate the reading position, identifying it as a rotation or new file case, potentially leading to data loss. To fix this, 'skip_bytes' is now stored in the database to persist the uncompressed offset. Additionally, 'exclude_bytes' is introduced to track runtime skipping without interfering with the persistent value. The SQLite schema has been updated to include 'anchor_offset' and 'skip_bytes' columns to support these features. Signed-off-by: jinyong.choi <[email protected]>
1 parent 9cc380f commit 8640940

File tree

6 files changed

+234
-26
lines changed

6 files changed

+234
-26
lines changed

plugins/in_tail/tail_db.c

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
#include "tail_sql.h"
2626
#include "tail_file.h"
2727

28-
struct query_status {
29-
int id;
30-
int rows;
31-
int64_t offset;
32-
};
28+
/* Callback to detect if a query returned any rows */
29+
static int cb_column_exists(void *data, int argc, char **argv, char **cols)
30+
{
31+
int *found = (int *)data;
32+
*found = 1;
33+
return 0;
34+
}
3335

3436
/* Open or create database required by tail plugin */
3537
struct flb_sqldb *flb_tail_db_open(const char *path,
@@ -38,6 +40,7 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
3840
struct flb_config *config)
3941
{
4042
int ret;
43+
int column_found;
4144
char tmp[64];
4245
struct flb_sqldb *db;
4346

@@ -55,6 +58,54 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
5558
return NULL;
5659
}
5760

61+
/* Check if 'skip' column exists (migration for older databases) */
62+
column_found = 0;
63+
ret = flb_sqldb_query(db,
64+
"SELECT 1 FROM pragma_table_info('in_tail_files') "
65+
"WHERE name='skip';",
66+
cb_column_exists, &column_found);
67+
if (ret != FLB_OK) {
68+
flb_plg_error(ctx->ins, "db: could not query table info for 'skip' column");
69+
flb_sqldb_close(db);
70+
return NULL;
71+
}
72+
if (column_found == 0) {
73+
flb_plg_debug(ctx->ins, "db: migrating database, adding 'skip' column");
74+
ret = flb_sqldb_query(db,
75+
"ALTER TABLE in_tail_files "
76+
"ADD COLUMN skip INTEGER DEFAULT 0;",
77+
NULL, NULL);
78+
if (ret != FLB_OK) {
79+
flb_plg_error(ctx->ins, "db: could not add 'skip' column");
80+
flb_sqldb_close(db);
81+
return NULL;
82+
}
83+
}
84+
85+
/* Check if 'anchor' column exists (migration for older databases) */
86+
column_found = 0;
87+
ret = flb_sqldb_query(db,
88+
"SELECT 1 FROM pragma_table_info('in_tail_files') "
89+
"WHERE name='anchor';",
90+
cb_column_exists, &column_found);
91+
if (ret != FLB_OK) {
92+
flb_plg_error(ctx->ins, "db: could not query table info for 'anchor' column");
93+
flb_sqldb_close(db);
94+
return NULL;
95+
}
96+
if (column_found == 0) {
97+
flb_plg_debug(ctx->ins, "db: migrating database, adding 'anchor' column");
98+
ret = flb_sqldb_query(db,
99+
"ALTER TABLE in_tail_files "
100+
"ADD COLUMN anchor INTEGER DEFAULT 0;",
101+
NULL, NULL);
102+
if (ret != FLB_OK) {
103+
flb_plg_error(ctx->ins, "db: could not add 'anchor' column");
104+
flb_sqldb_close(db);
105+
return NULL;
106+
}
107+
}
108+
58109
if (ctx->db_sync >= 0) {
59110
snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
60111
ctx->db_sync);
@@ -130,7 +181,8 @@ static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx,
130181
*/
131182
static int db_file_exists(struct flb_tail_file *file,
132183
struct flb_tail_config *ctx,
133-
uint64_t *id, uint64_t *inode, off_t *offset)
184+
uint64_t *id, uint64_t *inode,
185+
int64_t *offset, uint64_t *skip, int64_t *anchor)
134186
{
135187
int ret;
136188
int exists = FLB_FALSE;
@@ -149,6 +201,8 @@ static int db_file_exists(struct flb_tail_file *file,
149201
/* name: column 1 */
150202
name = sqlite3_column_text(ctx->stmt_get_file, 1);
151203
if (ctx->compare_filename && name == NULL) {
204+
sqlite3_clear_bindings(ctx->stmt_get_file);
205+
sqlite3_reset(ctx->stmt_get_file);
152206
flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id);
153207
return -1;
154208
}
@@ -159,12 +213,18 @@ static int db_file_exists(struct flb_tail_file *file,
159213
/* inode: column 3 */
160214
*inode = sqlite3_column_int64(ctx->stmt_get_file, 3);
161215

216+
/* skip: column 6 */
217+
*skip = sqlite3_column_int64(ctx->stmt_get_file, 6);
218+
219+
/* anchor: column 7 */
220+
*anchor = sqlite3_column_int64(ctx->stmt_get_file, 7);
221+
162222
/* Checking if the file's name and inode match exactly */
163223
if (ctx->compare_filename) {
164224
if (flb_tail_target_file_name_cmp((char *) name, file) != 0) {
165225
exists = FLB_FALSE;
166226
flb_plg_debug(ctx->ins, "db: exists stale file from database:"
167-
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64
227+
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRId64
168228
" name=%s file_inode=%"PRIu64" file_name=%s",
169229
*id, *inode, *offset, name, file->inode,
170230
file->name);
@@ -199,6 +259,8 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct
199259
sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
200260
sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
201261
sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
262+
sqlite3_bind_int64(ctx->stmt_insert_file, 5, file->skip_bytes);
263+
sqlite3_bind_int64(ctx->stmt_insert_file, 6, file->anchor_offset);
202264

203265
/* Run the insert */
204266
ret = sqlite3_step(ctx->stmt_insert_file);
@@ -258,11 +320,13 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
258320
{
259321
int ret;
260322
uint64_t id = 0;
261-
off_t offset = 0;
323+
int64_t offset = 0;
324+
uint64_t skip = 0;
325+
int64_t anchor = 0;
262326
uint64_t inode = 0;
263327

264328
/* Check if the file exists */
265-
ret = db_file_exists(file, ctx, &id, &inode, &offset);
329+
ret = db_file_exists(file, ctx, &id, &inode, &offset, &skip, &anchor);
266330
if (ret == -1) {
267331
flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64,
268332
file->inode);
@@ -281,6 +345,18 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
281345
else {
282346
file->db_id = id;
283347
file->offset = offset;
348+
file->skip_bytes = skip;
349+
file->anchor_offset = anchor;
350+
351+
/* Initialize skipping mode if needed */
352+
if (file->skip_bytes > 0) {
353+
file->exclude_bytes = file->skip_bytes;
354+
file->skipping_mode = FLB_TRUE;
355+
}
356+
else {
357+
file->exclude_bytes = 0;
358+
file->skipping_mode = FLB_FALSE;
359+
}
284360
}
285361

286362
return 0;
@@ -294,7 +370,9 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
294370

295371
/* Bind parameters */
296372
sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
297-
sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
373+
sqlite3_bind_int64(ctx->stmt_offset, 2, file->skip_bytes);
374+
sqlite3_bind_int64(ctx->stmt_offset, 3, file->anchor_offset);
375+
sqlite3_bind_int64(ctx->stmt_offset, 4, file->db_id);
298376

299377
ret = sqlite3_step(ctx->stmt_offset);
300378

plugins/in_tail/tail_file.c

Lines changed: 126 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "tail_dockermode.h"
4545
#include "tail_multiline.h"
4646
#include "tail_scan.h"
47+
#include <fluent-bit/flb_compression.h>
4748

4849
#ifdef FLB_SYSTEM_WINDOWS
4950
#include "win32.h"
@@ -1036,12 +1037,39 @@ static int set_file_position(struct flb_tail_config *ctx,
10361037
if (ctx->db) {
10371038
ret = flb_tail_db_file_set(file, ctx);
10381039
if (ret == 0) {
1039-
if (file->offset > 0) {
1040-
ret = lseek(file->fd, file->offset, SEEK_SET);
1040+
int64_t seek_pos;
1041+
int has_db_position;
1042+
1043+
/*
1044+
* Determine seek position based on file type and DB state:
1045+
*
1046+
* - Gzip files with anchor/skip info: use anchor_offset
1047+
* - Normal files or gzip migration fallback: use offset
1048+
* - No DB position + read_from_head=off: seek to EOF
1049+
*/
1050+
seek_pos = file->offset;
1051+
has_db_position = (file->offset > 0);
1052+
1053+
/* Override for gzip files with proper anchor tracking */
1054+
if (file->decompression_context != NULL &&
1055+
(file->anchor_offset > 0 || file->skip_bytes > 0)) {
1056+
seek_pos = file->anchor_offset;
1057+
has_db_position = FLB_TRUE;
1058+
}
1059+
1060+
if (has_db_position) {
1061+
ret = lseek(file->fd, seek_pos, SEEK_SET);
10411062
if (ret == -1) {
10421063
flb_errno();
10431064
return -1;
10441065
}
1066+
file->offset = ret;
1067+
1068+
/* Initialize skip state for gzip resume */
1069+
if (file->decompression_context != NULL && file->skip_bytes > 0) {
1070+
file->exclude_bytes = file->skip_bytes;
1071+
file->skipping_mode = FLB_TRUE;
1072+
}
10451073
}
10461074
else if (ctx->read_from_head == FLB_FALSE) {
10471075
ret = lseek(file->fd, 0, SEEK_END);
@@ -1050,8 +1078,23 @@ static int set_file_position(struct flb_tail_config *ctx,
10501078
return -1;
10511079
}
10521080
file->offset = ret;
1081+
file->anchor_offset = ret;
10531082
flb_tail_db_file_offset(file, ctx);
10541083
}
1084+
1085+
if (file->decompression_context == NULL) {
1086+
file->stream_offset = file->offset;
1087+
}
1088+
else {
1089+
/*
1090+
* For single-member gzip, stream_offset = skip_bytes is correct.
1091+
* For multi-member gzip, skip_bytes only tracks bytes within the
1092+
* current member (reset at member boundaries), so stream_offset
1093+
* may not reflect the total decompressed bytes from prior members.
1094+
*/
1095+
file->stream_offset = file->skip_bytes;
1096+
}
1097+
10551098
return 0;
10561099
}
10571100
}
@@ -1084,6 +1127,15 @@ static int set_file_position(struct flb_tail_config *ctx,
10841127
if (file->decompression_context == NULL) {
10851128
file->stream_offset = ret;
10861129
}
1130+
else {
1131+
/*
1132+
* Compressed file without DB: no persistent state available.
1133+
* Initialize skip-related fields to 0 for code consistency.
1134+
*/
1135+
file->anchor_offset = file->offset;
1136+
file->exclude_bytes = 0;
1137+
file->stream_offset = 0;
1138+
}
10871139

10881140
return 0;
10891141
}
@@ -1279,6 +1331,12 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
12791331
file->skip_next = FLB_FALSE;
12801332
file->skip_warn = FLB_FALSE;
12811333

1334+
/* Initialize gzip resume fields */
1335+
file->anchor_offset = 0;
1336+
file->skip_bytes = 0;
1337+
file->exclude_bytes = 0;
1338+
file->skipping_mode = FLB_FALSE;
1339+
12821340
/* Multiline core mode */
12831341
if (ctx->ml_ctx) {
12841342
/*
@@ -1454,18 +1512,20 @@ void flb_tail_file_remove(struct flb_tail_file *file)
14541512
* If there is data in the buffer, it means it was not processed.
14551513
* We must rewind the offset to ensure this data is re-read on restart.
14561514
*/
1457-
off_t old_offset = file->offset;
1515+
int64_t old_offset = file->offset;
14581516

14591517
if (file->offset > file->buf_len) {
14601518
file->offset -= file->buf_len;
1461-
} else {
1519+
}
1520+
else {
14621521
file->offset = 0;
14631522
}
14641523

14651524
flb_plg_debug(ctx->ins, "inode=%"PRIu64" rewind offset for %s: "
1466-
"old=%"PRId64" new=%"PRId64" (buf_len=%lu)",
1467-
file->inode, file->name, old_offset, file->offset,
1468-
(unsigned long)file->buf_len);
1525+
"old=%jd new=%jd (buf_len=%zu)",
1526+
file->inode, file->name,
1527+
(intmax_t)old_offset, (intmax_t)file->offset,
1528+
file->buf_len);
14691529

14701530
#ifdef FLB_HAVE_SQLDB
14711531
if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) {
@@ -1474,9 +1534,16 @@ void flb_tail_file_remove(struct flb_tail_file *file)
14741534
#endif
14751535
}
14761536
else {
1477-
flb_plg_warn(ctx->ins, "inode=%"PRIu64" cannot rewind compressed file %s; "
1478-
"%lu decompressed bytes in buffer may be lost on restart",
1479-
file->inode, file->name, (unsigned long)file->buf_len);
1537+
flb_plg_debug(ctx->ins,
1538+
"inode=%"PRIu64" file=%s: buffered data (%zu bytes) "
1539+
"remains on exit for gzip input; cannot rewind raw "
1540+
"offset safely with streaming decompression",
1541+
file->inode, file->name, file->buf_len);
1542+
#ifdef FLB_HAVE_SQLDB
1543+
if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) {
1544+
flb_tail_db_file_offset(file, ctx);
1545+
}
1546+
#endif
14801547
}
14811548
}
14821549

@@ -1598,6 +1665,10 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi
15981665
file->inode, file->name, size_delta);
15991666
file->offset = offset;
16001667
file->buf_len = 0;
1668+
file->anchor_offset = offset;
1669+
file->skip_bytes = 0;
1670+
file->exclude_bytes = 0;
1671+
file->skipping_mode = FLB_FALSE;
16011672

16021673
/* Update offset in the database file */
16031674
#ifdef FLB_HAVE_SQLDB
@@ -1797,6 +1868,28 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
17971868
return FLB_TAIL_ERROR;
17981869
}
17991870

1871+
if (file->skipping_mode == FLB_TRUE && decompressed_data_length > 0) {
1872+
flb_plg_debug(ctx->ins,
1873+
"Skipping: anchor=%jd offset=%jd "
1874+
"exclude=%zu decompressed=%zu",
1875+
(intmax_t)file->anchor_offset,
1876+
(intmax_t)file->offset,
1877+
file->exclude_bytes, decompressed_data_length);
1878+
if (file->exclude_bytes >= decompressed_data_length) {
1879+
file->exclude_bytes -= decompressed_data_length;
1880+
decompressed_data_length = 0;
1881+
}
1882+
else {
1883+
size_t remain = decompressed_data_length - file->exclude_bytes;
1884+
memmove(&file->buf_data[file->buf_len],
1885+
&file->buf_data[file->buf_len + file->exclude_bytes],
1886+
remain);
1887+
decompressed_data_length = remain;
1888+
file->exclude_bytes = 0;
1889+
file->skipping_mode = FLB_FALSE;
1890+
}
1891+
}
1892+
18001893
stream_data_length = decompressed_data_length;
18011894
}
18021895
}
@@ -1836,6 +1929,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
18361929
file->buf_len -= processed_bytes;
18371930
file->buf_data[file->buf_len] = '\0';
18381931

1932+
if (file->decompression_context) {
1933+
file->skip_bytes += processed_bytes;
1934+
1935+
/*
1936+
* Gzip member boundary: update anchor when we complete a member
1937+
* and all decompressed data is consumed.
1938+
*/
1939+
if (file->decompression_context->state ==
1940+
FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER &&
1941+
file->decompression_context->input_buffer_length == 0 &&
1942+
file->buf_len == 0) {
1943+
flb_plg_debug(file->config->ins,
1944+
"Gzip member completed: updating anchor "
1945+
"from %jd to %jd, resetting skip from "
1946+
"%"PRIu64" to 0",
1947+
(intmax_t)file->anchor_offset,
1948+
(intmax_t)file->offset,
1949+
file->skip_bytes);
1950+
file->anchor_offset = file->offset;
1951+
file->skip_bytes = 0;
1952+
}
1953+
}
1954+
18391955
#ifdef FLB_HAVE_SQLDB
18401956
if (file->config->db) {
18411957
flb_tail_db_file_offset(file, file->config);

0 commit comments

Comments
 (0)