Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 120 additions & 10 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -746,32 +746,107 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
size_t token_size = 0;
char *token_start = 0;
char *token_end = NULL;
char *search_start;
size_t remaining;
flb_sds_t working_buffer = NULL;

/*
* Prepend any buffered incomplete data from previous chunks.
* HTTP chunked encoding can split JSON objects across chunk boundaries,
* so we need to buffer incomplete data until we find a complete JSON line.
*/
if (ctx->chunk_buffer != NULL) {
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
if (!working_buffer) {
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
return -1;
}
/*
* flb_sds_cat modifies and returns the first argument, so working_buffer
* IS ctx->chunk_buffer (reallocated). Clear our reference to it.
*/
ctx->chunk_buffer = NULL;
token_start = working_buffer;
}
else {
token_start = c->resp.payload;
}

token_start = c->resp.payload;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
while ( token_end != NULL && ret == 0 ) {
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);

while (token_end != NULL && ret == 0) {
token_size = token_end - token_start;

/* Skip empty lines */
if (token_size == 0) {
token_start = token_end + 1;
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
continue;
}

ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
if (ret == -1) {
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
c->resp.payload);
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON");
}
else {
*bytes_consumed += token_size + 1;
/*
* For non-buffered data, track consumed bytes.
* For buffered data, we'll mark everything consumed after the loop.
*/
if (!working_buffer) {
*bytes_consumed += token_size + 1;
}
ret = process_watched_event(ctx, buf_data, buf_size);
}

flb_free(buf_data);
if (buf_data) {
buf_data = NULL;
buf_data = NULL;

token_start = token_end + 1;
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
}

/*
* Always consume all bytes from the current chunk since we've examined them all.
* Even if we buffer the data, we've still "consumed" it from the HTTP payload.
*/
*bytes_consumed = c->resp.payload_size;

/*
* If there's remaining data without a newline delimiter, it means the JSON
* object is incomplete (split across chunk boundaries). Buffer it for next chunk.
*/
if (working_buffer) {
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
}
else {
remaining = c->resp.payload_size - (token_start - c->resp.payload);
}

if (remaining > 0 && ret == 0) {
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
if (!ctx->chunk_buffer) {
flb_plg_error(ctx->ins, "failed to create chunk buffer");
ret = -1;
}
else {
flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining);
}
token_start = token_end+1;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
}

if (working_buffer) {
flb_sds_destroy(working_buffer);
}

if (buf_data) {
flb_free(buf_data);
}

return ret;
}

Expand Down Expand Up @@ -889,6 +964,13 @@ static int check_and_init_stream(struct k8s_events *ctx)
flb_upstream_conn_release(ctx->current_connection);
ctx->current_connection = NULL;
}

/* Clear any buffered incomplete data on failure */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -927,6 +1009,28 @@ static int k8s_events_collect(struct flb_input_instance *ins,
}
else if (ret == FLB_HTTP_OK) {
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");

/*
* If there's buffered data when stream closes, try to process it.
* This handles the case where the last chunk doesn't end with a newline.
*/
if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) {
int buf_ret;
int root_type;
size_t consumed = 0;
char *buf_data = NULL;
size_t buf_size;

buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer),
&buf_data, &buf_size, &root_type, &consumed);
if (buf_ret == 0) {
process_watched_event(ctx, buf_data, buf_size);
}

if (buf_data) {
flb_free(buf_data);
}
}
}
else {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
Expand All @@ -938,6 +1042,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
flb_upstream_conn_release(ctx->current_connection);
ctx->streaming_client = NULL;
ctx->current_connection = NULL;

/* Clear any buffered incomplete data when stream closes */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}
}

pthread_mutex_unlock(&ctx->lock);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ struct k8s_events {
struct flb_connection *current_connection;
struct flb_http_client *streaming_client;

/* Buffer for incomplete JSON data from chunked responses */
flb_sds_t chunk_buffer;

/* limit for event queries */
int limit_request;
/* last highest seen resource_version */
Expand Down
7 changes: 7 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
pthread_mutexattr_init(&attr);
pthread_mutex_init(&ctx->lock, &attr);

/* Initialize buffer for incomplete chunk data */
ctx->chunk_buffer = NULL;

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
flb_ra_destroy(ctx->ra_resource_version);
}

if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
}

if(ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
}
Expand Down
46 changes: 46 additions & 0 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv()
test_ctx_destroy(ctx);
}

/* Test with smaller chunks - splits single event into 3 chunks */
void flb_test_events_with_3chunks()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int trys;

int ret;
int num;
const char *filename = "eventlist_v1_with_lastTimestamp";
const char *stream_filename = "watch_v1_with_lastTimestamp";

clear_output_num();

/* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */
struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, stream_filename, 400
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* waiting to flush */
for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) {
flb_time_msleep(1000);
}

num = get_output_num();
if (!TEST_CHECK(num >= 2)) {
TEST_MSG("2 output records are expected found %d", num);
}

mock_k8s_api_destroy(k8s_server);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
{"events_v1_with_3chunks", flb_test_events_with_3chunks},
{NULL, NULL}
};