Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
struct azure_kusto_file *chunk;
struct mk_list *tmp;
struct mk_list *head;
struct mk_list *f_tmp;
struct mk_list *f_head;
struct flb_fstore_file *fsf;
struct flb_fstore_stream *fs_stream;
Expand All @@ -427,7 +428,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
continue;
}

mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
chunk = fsf->data;

Expand Down
117 changes: 117 additions & 0 deletions tests/runtime/out_azure_kusto.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 700
#endif

/* Fluent Bit
* ==========
* Copyright (C) 2019-2022 The Fluent Bit Authors
Expand All @@ -20,8 +24,30 @@

#include <fluent-bit.h>
#include "flb_tests_runtime.h"
#include <unistd.h>
#include <sys/stat.h>
#include <ftw.h>
#include <limits.h>
#include <errno.h>

/* Test data */

static int flb_kusto_unlink_cb(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf)
{
return remove(fpath);
}

static void flb_kusto_rm_rf(const char *path)
{
struct stat st;

if (stat(path, &st) != 0) {
return;
}

nftw(path, flb_kusto_unlink_cb, 64, FTW_DEPTH | FTW_PHYS);
}

#include "data/common/json_invalid.h" /* JSON_INVALID */

/* Test functions */
Expand All @@ -30,6 +56,7 @@ void flb_test_azure_kusto_managed_identity_system(void);
void flb_test_azure_kusto_managed_identity_user(void);
void flb_test_azure_kusto_service_principal(void);
void flb_test_azure_kusto_workload_identity(void);
void flb_test_azure_kusto_buffering_backlog(void);

/* Test list */
TEST_LIST = {
Expand All @@ -38,6 +65,7 @@ TEST_LIST = {
{"managed_identity_user", flb_test_azure_kusto_managed_identity_user},
{"service_principal", flb_test_azure_kusto_service_principal},
{"workload_identity", flb_test_azure_kusto_workload_identity},
{"buffering_backlog", flb_test_azure_kusto_buffering_backlog},
{NULL, NULL}
};

Expand Down Expand Up @@ -210,4 +238,93 @@ void flb_test_azure_kusto_workload_identity(void)

flb_stop(ctx);
flb_destroy(ctx);
}

/* Regression: exercise buffering-enabled backlog processing on restart to validate nested mk_list_foreach_safe fix */
void flb_test_azure_kusto_buffering_backlog(void)
{
int i;
int ret;
int bytes;
char sample[] = "{\"k\":\"v\"}";
size_t sample_size = sizeof(sample) - 1;
char buffer_dir[PATH_MAX];
pid_t pid;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

pid = getpid();
snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-test-%d", (int) pid);

/* Ensure a clean buffer directory before starting */
flb_kusto_rm_rf(buffer_dir);
ret = mkdir(buffer_dir, 0700);
if (ret != 0) {
perror("mkdir(buffer_dir)");
TEST_CHECK(ret == 0);
return;
}

/* First run: enable buffering and write data to disk */
ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "test", NULL);
flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL);
flb_output_set(ctx, out_ffd, "client_id", "system", NULL);
flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL);
flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL);
flb_output_set(ctx, out_ffd, "table_name", "logs", NULL);
flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL);
flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

for (i = 0; i < 5; i++) {
bytes = flb_lib_push(ctx, in_ffd, sample, sample_size);
TEST_CHECK(bytes == (int) sample_size);
}

sleep(1); /* allow flush to write buffered chunks */

flb_stop(ctx);
flb_destroy(ctx);

/* Second run: restart to process backlog from buffer_dir */
ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "test", NULL);
flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL);
flb_output_set(ctx, out_ffd, "client_id", "system", NULL);
flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL);
flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL);
flb_output_set(ctx, out_ffd, "table_name", "logs", NULL);
flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL);
flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

sleep(1); /* ingest_all_chunks runs on startup for buffered backlog */

flb_stop(ctx);
flb_destroy(ctx);

/* Cleanup buffer directory after test */
flb_kusto_rm_rf(buffer_dir);
}