Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions plugins/in_tail/tail.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */
#define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */

/* Database */
#define FLB_TAIL_DB_ID_NONE 0 /* File not in database or deleted */

/* Config */
#define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */
#define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */
Expand Down
99 changes: 89 additions & 10 deletions plugins/in_tail/tail_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include "tail_sql.h"
#include "tail_file.h"

struct query_status {
int id;
int rows;
int64_t offset;
};
/* Callback to detect if a query returned any rows */
static int cb_column_exists(void *data, int argc, char **argv, char **cols)
{
int *found = (int *)data;
*found = 1;
return 0;
}

/* Open or create database required by tail plugin */
struct flb_sqldb *flb_tail_db_open(const char *path,
Expand All @@ -38,6 +40,7 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
struct flb_config *config)
{
int ret;
int column_found;
char tmp[64];
struct flb_sqldb *db;

Expand All @@ -55,6 +58,54 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
return NULL;
}

/* Check if 'skip' column exists (migration for older databases) */
column_found = 0;
ret = flb_sqldb_query(db,
"SELECT 1 FROM pragma_table_info('in_tail_files') "
"WHERE name='skip';",
cb_column_exists, &column_found);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: could not query table info for 'skip' column");
flb_sqldb_close(db);
return NULL;
}
if (column_found == 0) {
flb_plg_debug(ctx->ins, "db: migrating database, adding 'skip' column");
ret = flb_sqldb_query(db,
"ALTER TABLE in_tail_files "
"ADD COLUMN skip INTEGER DEFAULT 0;",
NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: could not add 'skip' column");
flb_sqldb_close(db);
return NULL;
}
}

/* Check if 'anchor' column exists (migration for older databases) */
column_found = 0;
ret = flb_sqldb_query(db,
"SELECT 1 FROM pragma_table_info('in_tail_files') "
"WHERE name='anchor';",
cb_column_exists, &column_found);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: could not query table info for 'anchor' column");
flb_sqldb_close(db);
return NULL;
}
if (column_found == 0) {
flb_plg_debug(ctx->ins, "db: migrating database, adding 'anchor' column");
ret = flb_sqldb_query(db,
"ALTER TABLE in_tail_files "
"ADD COLUMN anchor INTEGER DEFAULT 0;",
NULL, NULL);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "db: could not add 'anchor' column");
flb_sqldb_close(db);
return NULL;
}
}

if (ctx->db_sync >= 0) {
snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
ctx->db_sync);
Expand Down Expand Up @@ -130,7 +181,8 @@ static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx,
*/
static int db_file_exists(struct flb_tail_file *file,
struct flb_tail_config *ctx,
uint64_t *id, uint64_t *inode, off_t *offset)
uint64_t *id, uint64_t *inode,
int64_t *offset, uint64_t *skip, int64_t *anchor)
{
int ret;
int exists = FLB_FALSE;
Expand All @@ -149,6 +201,8 @@ static int db_file_exists(struct flb_tail_file *file,
/* name: column 1 */
name = sqlite3_column_text(ctx->stmt_get_file, 1);
if (ctx->compare_filename && name == NULL) {
sqlite3_clear_bindings(ctx->stmt_get_file);
sqlite3_reset(ctx->stmt_get_file);
flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id);
return -1;
}
Expand All @@ -159,12 +213,18 @@ static int db_file_exists(struct flb_tail_file *file,
/* inode: column 3 */
*inode = sqlite3_column_int64(ctx->stmt_get_file, 3);

/* skip: column 6 */
*skip = sqlite3_column_int64(ctx->stmt_get_file, 6);

/* anchor: column 7 */
*anchor = sqlite3_column_int64(ctx->stmt_get_file, 7);

/* Checking if the file's name and inode match exactly */
if (ctx->compare_filename) {
if (flb_tail_target_file_name_cmp((char *) name, file) != 0) {
exists = FLB_FALSE;
flb_plg_debug(ctx->ins, "db: exists stale file from database:"
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRId64
" name=%s file_inode=%"PRIu64" file_name=%s",
*id, *inode, *offset, name, file->inode,
file->name);
Expand Down Expand Up @@ -199,6 +259,8 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct
sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
sqlite3_bind_int64(ctx->stmt_insert_file, 5, file->skip_bytes);
sqlite3_bind_int64(ctx->stmt_insert_file, 6, file->anchor_offset);

/* Run the insert */
ret = sqlite3_step(ctx->stmt_insert_file);
Expand Down Expand Up @@ -258,11 +320,13 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
{
int ret;
uint64_t id = 0;
off_t offset = 0;
int64_t offset = 0;
uint64_t skip = 0;
int64_t anchor = 0;
uint64_t inode = 0;

/* Check if the file exists */
ret = db_file_exists(file, ctx, &id, &inode, &offset);
ret = db_file_exists(file, ctx, &id, &inode, &offset, &skip, &anchor);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64,
file->inode);
Expand All @@ -281,6 +345,18 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
else {
file->db_id = id;
file->offset = offset;
file->skip_bytes = skip;
file->anchor_offset = anchor;

/* Initialize skipping mode if needed */
if (file->skip_bytes > 0) {
file->exclude_bytes = file->skip_bytes;
file->skipping_mode = FLB_TRUE;
}
else {
file->exclude_bytes = 0;
file->skipping_mode = FLB_FALSE;
}
}

return 0;
Expand All @@ -294,7 +370,9 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,

/* Bind parameters */
sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
sqlite3_bind_int64(ctx->stmt_offset, 2, file->skip_bytes);
sqlite3_bind_int64(ctx->stmt_offset, 3, file->anchor_offset);
sqlite3_bind_int64(ctx->stmt_offset, 4, file->db_id);

ret = sqlite3_step(ctx->stmt_offset);

Expand Down Expand Up @@ -363,6 +441,7 @@ int flb_tail_db_file_delete(struct flb_tail_file *file,
}

flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name);
file->db_id = FLB_TAIL_DB_ID_NONE;
return 0;
}

Expand Down
Loading
Loading