Skip to content

Commit c30cfab

Browse files
committed
in_kubernetes_events: buffer incomplete JSON across HTTP chunks
Fixes #11252 Problem: The kubernetes_events plugin was failing with 'bad formed JSON' errors when HTTP chunked transfer encoding split JSON event objects across chunk boundaries. Root Cause Analysis: - Kubernetes watch API sends newline-delimited JSON over HTTP/1.1 chunked transfer encoding - HTTP chunks are arbitrary-sized (commonly 1000-4000 bytes) - A single JSON object can be split mid-object across chunks: Chunk 1 (1000 bytes): {"type":"ADDED","object":{"name":"po Chunk 2 (176 bytes): d-123","spec":{...}}} - The HTTP client correctly decodes chunks and returns FLB_HTTP_CHUNK_AVAILABLE after each chunk - However, the plugin was trying to parse incomplete JSON from each chunk, causing parse errors Solution: Implement application-layer buffering in the kubernetes_events plugin: 1. Added chunk_buffer field to k8s_events context to buffer incomplete data across HTTP chunks 2. Modified process_http_chunk() to: - Prepend any buffered data from previous chunks - Parse only complete JSON objects (delimited by newlines) - Buffer remaining incomplete data for the next chunk - This follows standard network programming practice: the application layer handles message boundaries 3. Clear buffer when stream closes or on connection errors Why not fix in HTTP client: - HTTP client returns FLB_HTTP_CHUNK_AVAILABLE by design after each chunk is decoded - HTTP layer shouldn't know about JSON or application protocols - Similar to how TCP delivers packets but HTTP buffers for messages - This maintains separation of concerns and doesn't break other plugins Testing: - Re-enabled events_v1_with_chunkedrecv test which simulates exactly this scenario (1176-byte JSON split into 1000+176 byte chunks) Signed-off-by: Jesse Awan <[email protected]>
1 parent c88c545 commit c30cfab

File tree

4 files changed

+97
-11
lines changed

4 files changed

+97
-11
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -746,32 +746,95 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
746746
size_t token_size = 0;
747747
char *token_start = 0;
748748
char *token_end = NULL;
749+
char *search_start;
750+
size_t remaining;
751+
flb_sds_t working_buffer = NULL;
752+
753+
/*
754+
* Prepend any buffered incomplete data from previous chunks.
755+
* HTTP chunked encoding can split JSON objects across chunk boundaries,
756+
* so we need to buffer incomplete data until we find a complete JSON line.
757+
*/
758+
if (ctx->chunk_buffer != NULL) {
759+
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
760+
if (!working_buffer) {
761+
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
762+
return -1;
763+
}
764+
/* Clear the old buffer since we've concatenated it */
765+
ctx->chunk_buffer = NULL;
766+
token_start = working_buffer;
767+
}
768+
else {
769+
token_start = c->resp.payload;
770+
}
749771

750-
token_start = c->resp.payload;
751-
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
752-
while ( token_end != NULL && ret == 0 ) {
772+
search_start = token_start;
773+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
774+
775+
while (token_end != NULL && ret == 0) {
753776
token_size = token_end - token_start;
777+
778+
/* Skip empty lines */
779+
if (token_size == 0) {
780+
token_start = token_end + 1;
781+
search_start = token_start;
782+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
783+
continue;
784+
}
785+
754786
ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
755787
if (ret == -1) {
756-
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
757-
c->resp.payload);
788+
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON");
758789
}
759790
else {
760-
*bytes_consumed += token_size + 1;
791+
if (working_buffer) {
792+
*bytes_consumed += token_size + 1 - flb_sds_len(ctx->chunk_buffer ? ctx->chunk_buffer : "");
793+
}
794+
else {
795+
*bytes_consumed += token_size + 1;
796+
}
761797
ret = process_watched_event(ctx, buf_data, buf_size);
762798
}
763799

764800
flb_free(buf_data);
765-
if (buf_data) {
766-
buf_data = NULL;
801+
buf_data = NULL;
802+
803+
token_start = token_end + 1;
804+
search_start = token_start;
805+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
806+
}
807+
808+
/*
809+
* If there's remaining data without a newline delimiter, it means the JSON
810+
* object is incomplete (split across chunk boundaries). Buffer it for next chunk.
811+
*/
812+
if (working_buffer) {
813+
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
814+
}
815+
else {
816+
remaining = c->resp.payload_size - (token_start - c->resp.payload);
817+
}
818+
819+
if (remaining > 0 && ret == 0) {
820+
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
821+
if (!ctx->chunk_buffer) {
822+
flb_plg_error(ctx->ins, "failed to create chunk buffer");
823+
ret = -1;
824+
}
825+
else {
826+
flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining);
767827
}
768-
token_start = token_end+1;
769-
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
770828
}
771829

830+
if (working_buffer) {
831+
flb_sds_destroy(working_buffer);
832+
}
833+
772834
if (buf_data) {
773835
flb_free(buf_data);
774836
}
837+
775838
return ret;
776839
}
777840

@@ -889,6 +952,13 @@ static int check_and_init_stream(struct k8s_events *ctx)
889952
flb_upstream_conn_release(ctx->current_connection);
890953
ctx->current_connection = NULL;
891954
}
955+
956+
/* Clear any buffered incomplete data on failure */
957+
if (ctx->chunk_buffer) {
958+
flb_sds_destroy(ctx->chunk_buffer);
959+
ctx->chunk_buffer = NULL;
960+
}
961+
892962
return FLB_FALSE;
893963
}
894964

@@ -938,6 +1008,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
9381008
flb_upstream_conn_release(ctx->current_connection);
9391009
ctx->streaming_client = NULL;
9401010
ctx->current_connection = NULL;
1011+
1012+
/* Clear any buffered incomplete data when stream closes */
1013+
if (ctx->chunk_buffer) {
1014+
flb_sds_destroy(ctx->chunk_buffer);
1015+
ctx->chunk_buffer = NULL;
1016+
}
9411017
}
9421018

9431019
pthread_mutex_unlock(&ctx->lock);

plugins/in_kubernetes_events/kubernetes_events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ struct k8s_events {
8585
struct flb_connection *current_connection;
8686
struct flb_http_client *streaming_client;
8787

88+
/* Buffer for incomplete JSON data from chunked responses */
89+
flb_sds_t chunk_buffer;
90+
8891
/* limit for event queries */
8992
int limit_request;
9093
/* last highest seen resource_version */

plugins/in_kubernetes_events/kubernetes_events_conf.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
158158
pthread_mutexattr_init(&attr);
159159
pthread_mutex_init(&ctx->lock, &attr);
160160

161+
/* Initialize buffer for incomplete chunk data */
162+
ctx->chunk_buffer = NULL;
163+
161164
/* Load the config map */
162165
ret = flb_input_config_map_set(ins, (void *) ctx);
163166
if (ret == -1) {
@@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
289292
flb_ra_destroy(ctx->ra_resource_version);
290293
}
291294

295+
if (ctx->chunk_buffer) {
296+
flb_sds_destroy(ctx->chunk_buffer);
297+
}
298+
292299
if(ctx->streaming_client) {
293300
flb_http_client_destroy(ctx->streaming_client);
294301
}

tests/runtime/in_kubernetes_events.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ void flb_test_events_with_chunkedrecv()
447447
TEST_LIST = {
448448
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
449449
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
450-
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
450+
{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
451451
{NULL, NULL}
452452
};
453453

0 commit comments

Comments
 (0)