Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 135 additions & 13 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -746,32 +746,119 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
size_t token_size = 0;
char *token_start = 0;
char *token_end = NULL;
char *search_start;
size_t remaining;
flb_sds_t working_buffer = NULL;

/*
* Prepend any buffered incomplete data from previous chunks.
* HTTP chunked encoding can split JSON objects across chunk boundaries,
* so we need to buffer incomplete data until we find a complete JSON line.
*/
if (ctx->chunk_buffer != NULL) {
size_t buffer_len = flb_sds_len(ctx->chunk_buffer);
flb_plg_debug(ctx->ins, "prepending %zu bytes from chunk_buffer to %zu new bytes",
buffer_len, c->resp.payload_size);
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
if (!working_buffer) {
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
return -1;
}
/*
* flb_sds_cat modifies and returns the first argument, so working_buffer
* IS ctx->chunk_buffer (reallocated). Clear our reference to it.
*/
ctx->chunk_buffer = NULL;
token_start = working_buffer;
}
else {
flb_plg_debug(ctx->ins, "processing %zu bytes from new chunk", c->resp.payload_size);
token_start = c->resp.payload;
}

token_start = c->resp.payload;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
while ( token_end != NULL && ret == 0 ) {
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);

while (token_end != NULL && ret == 0) {
token_size = token_end - token_start;

/* Skip empty lines */
if (token_size == 0) {
token_start = token_end + 1;
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
continue;
}

ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
if (ret == -1) {
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
c->resp.payload);
if (ret == 0) {
/* Successfully parsed JSON */
flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size);
ret = process_watched_event(ctx, buf_data, buf_size);
flb_free(buf_data);
buf_data = NULL;

token_start = token_end + 1;
search_start = token_start;
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
}
else {
*bytes_consumed += token_size + 1;
ret = process_watched_event(ctx, buf_data, buf_size);
/* JSON parse failed - this line is incomplete, don't advance */
flb_plg_debug(ctx->ins, "JSON parse failed for %zu bytes at offset %ld - will buffer",
token_size, token_start - (working_buffer ? working_buffer : c->resp.payload));
break;
}
}

/*
* Calculate remaining unparsed data.
* If we broke out of the loop due to parse failure or no newline found,
* buffer the remaining data for the next chunk.
*/
if (working_buffer) {
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
}
else {
remaining = c->resp.payload_size - (token_start - c->resp.payload);
}

flb_free(buf_data);
if (buf_data) {
buf_data = NULL;
if (remaining > 0) {
/* We have unparsed data - buffer it for next chunk */
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
if (!ctx->chunk_buffer) {
flb_plg_error(ctx->ins, "failed to create chunk buffer");
if (working_buffer) {
flb_sds_destroy(working_buffer);
}
if (buf_data) {
flb_free(buf_data);
}
return -1;
}
token_start = token_end+1;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
}
else {
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
}

/*
* At this point we've either parsed all complete lines and/or buffered
* any remaining tail into ctx->chunk_buffer, so we no longer need any
* bytes from this HTTP payload. Tell the HTTP client that the whole
* payload has been consumed to avoid duplicates.
*/
*bytes_consumed = c->resp.payload_size;

if (working_buffer) {
flb_sds_destroy(working_buffer);
}

if (buf_data) {
flb_free(buf_data);
}

return ret;
}

Expand Down Expand Up @@ -889,6 +976,13 @@ static int check_and_init_stream(struct k8s_events *ctx)
flb_upstream_conn_release(ctx->current_connection);
ctx->current_connection = NULL;
}

/* Clear any buffered incomplete data on failure */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -927,6 +1021,28 @@ static int k8s_events_collect(struct flb_input_instance *ins,
}
else if (ret == FLB_HTTP_OK) {
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");

/*
* If there's buffered data when stream closes, try to process it.
* This handles the case where the last chunk doesn't end with a newline.
*/
if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) {
int buf_ret;
int root_type;
size_t consumed = 0;
char *buf_data = NULL;
size_t buf_size;

buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer),
&buf_data, &buf_size, &root_type, &consumed);
if (buf_ret == 0) {
process_watched_event(ctx, buf_data, buf_size);
}

if (buf_data) {
flb_free(buf_data);
}
}
}
else {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
Expand All @@ -938,6 +1054,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
flb_upstream_conn_release(ctx->current_connection);
ctx->streaming_client = NULL;
ctx->current_connection = NULL;

/* Clear any buffered incomplete data when stream closes */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}
}

pthread_mutex_unlock(&ctx->lock);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ struct k8s_events {
struct flb_connection *current_connection;
struct flb_http_client *streaming_client;

/* Buffer for incomplete JSON data from chunked responses */
flb_sds_t chunk_buffer;

/* limit for event queries */
int limit_request;
/* last highest seen resource_version */
Expand Down
7 changes: 7 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
pthread_mutexattr_init(&attr);
pthread_mutex_init(&ctx->lock, &attr);

/* Initialize buffer for incomplete chunk data */
ctx->chunk_buffer = NULL;

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
flb_ra_destroy(ctx->ra_resource_version);
}

if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
}

if(ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
}
Expand Down
46 changes: 46 additions & 0 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv()
test_ctx_destroy(ctx);
}

/* Test with smaller chunks - splits single event into 3 chunks */
void flb_test_events_with_3chunks()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int trys;

int ret;
int num;
const char *filename = "eventlist_v1_with_lastTimestamp";
const char *stream_filename = "watch_v1_with_lastTimestamp";

clear_output_num();

/* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */
struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, stream_filename, 400
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* waiting to flush */
for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) {
flb_time_msleep(1000);
}

num = get_output_num();
if (!TEST_CHECK(num >= 2)) {
TEST_MSG("2 output records are expected found %d", num);
}

mock_k8s_api_destroy(k8s_server);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
{"events_v1_with_3chunks", flb_test_events_with_3chunks},
{NULL, NULL}
};

Loading