Skip to content

Commit 7e561b6

Browse files
committed
refactor: add 3 chunks test, modify buffer logic
Signed-off-by: Jesse Awan <[email protected]>
1 parent e523339 commit 7e561b6

File tree

2 files changed

+85
-5
lines changed

2 files changed

+85
-5
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -759,9 +759,14 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
759759
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
760760
if (!working_buffer) {
761761
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
762+
flb_sds_destroy(ctx->chunk_buffer);
763+
ctx->chunk_buffer = NULL;
762764
return -1;
763765
}
764-
/* Clear the old buffer since we've concatenated it */
766+
/*
767+
* flb_sds_cat modifies and returns the first argument, so working_buffer
768+
* IS ctx->chunk_buffer (reallocated). Clear our reference to it.
769+
*/
765770
ctx->chunk_buffer = NULL;
766771
token_start = working_buffer;
767772
}
@@ -788,10 +793,11 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
788793
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON");
789794
}
790795
else {
791-
if (working_buffer) {
792-
*bytes_consumed += token_size + 1 - flb_sds_len(ctx->chunk_buffer ? ctx->chunk_buffer : "");
793-
}
794-
else {
796+
/*
797+
* For non-buffered data, track consumed bytes.
798+
* For buffered data, we'll mark everything consumed after the loop.
799+
*/
800+
if (!working_buffer) {
795801
*bytes_consumed += token_size + 1;
796802
}
797803
ret = process_watched_event(ctx, buf_data, buf_size);
@@ -804,6 +810,12 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
804810
search_start = token_start;
805811
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
806812
}
813+
814+
/*
815+
* Always consume all bytes from the current chunk since we've examined them all.
816+
* Even if we buffer the data, we've still "consumed" it from the HTTP payload.
817+
*/
818+
*bytes_consumed = c->resp.payload_size;
807819

808820
/*
809821
* If there's remaining data without a newline delimiter, it means the JSON
@@ -997,6 +1009,28 @@ static int k8s_events_collect(struct flb_input_instance *ins,
9971009
}
9981010
else if (ret == FLB_HTTP_OK) {
9991011
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");
1012+
1013+
/*
1014+
* If there's buffered data when stream closes, try to process it.
1015+
* This handles the case where the last chunk doesn't end with a newline.
1016+
*/
1017+
if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) {
1018+
int buf_ret;
1019+
int root_type;
1020+
size_t consumed = 0;
1021+
char *buf_data = NULL;
1022+
size_t buf_size;
1023+
1024+
buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer),
1025+
&buf_data, &buf_size, &root_type, &consumed);
1026+
if (buf_ret == 0) {
1027+
process_watched_event(ctx, buf_data, buf_size);
1028+
}
1029+
1030+
if (buf_data) {
1031+
flb_free(buf_data);
1032+
}
1033+
}
10001034
}
10011035
else {
10021036
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",

tests/runtime/in_kubernetes_events.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv()
444444
test_ctx_destroy(ctx);
445445
}
446446

447+
/* Test with smaller chunks - splits single event into 3 chunks */
448+
void flb_test_events_with_3chunks()
449+
{
450+
struct flb_lib_out_cb cb_data;
451+
struct test_ctx *ctx;
452+
int trys;
453+
454+
int ret;
455+
int num;
456+
const char *filename = "eventlist_v1_with_lastTimestamp";
457+
const char *stream_filename = "watch_v1_with_lastTimestamp";
458+
459+
clear_output_num();
460+
461+
/* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */
462+
struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
463+
filename, stream_filename, 400
464+
);
465+
466+
cb_data.cb = cb_check_result_json;
467+
cb_data.data = (void *)k8s_server;
468+
469+
ctx = test_ctx_create(&cb_data);
470+
if (!TEST_CHECK(ctx != NULL)) {
471+
TEST_MSG("test_ctx_create failed");
472+
exit(EXIT_FAILURE);
473+
}
474+
475+
ret = flb_start(ctx->flb);
476+
TEST_CHECK(ret == 0);
477+
478+
/* waiting to flush */
479+
for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) {
480+
flb_time_msleep(1000);
481+
}
482+
483+
num = get_output_num();
484+
if (!TEST_CHECK(num >= 2)) {
485+
TEST_MSG("2 output records are expected found %d", num);
486+
}
487+
488+
mock_k8s_api_destroy(k8s_server);
489+
test_ctx_destroy(ctx);
490+
}
491+
447492
TEST_LIST = {
448493
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
449494
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
450495
{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
496+
{"events_v1_with_3chunks", flb_test_events_with_3chunks},
451497
{NULL, NULL}
452498
};
453499

0 commit comments

Comments
 (0)