diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 79f3b16e676..c09981239d3 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -746,32 +746,119 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, size_t token_size = 0; char *token_start = 0; char *token_end = NULL; + char *search_start; + size_t remaining; + flb_sds_t working_buffer = NULL; + + /* + * Prepend any buffered incomplete data from previous chunks. + * HTTP chunked encoding can split JSON objects across chunk boundaries, + * so we need to buffer incomplete data until we find a complete JSON line. + */ + if (ctx->chunk_buffer != NULL) { + size_t buffer_len = flb_sds_len(ctx->chunk_buffer); + flb_plg_debug(ctx->ins, "prepending %zu bytes from chunk_buffer to %zu new bytes", + buffer_len, c->resp.payload_size); + working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size); + if (!working_buffer) { + flb_plg_error(ctx->ins, "failed to concatenate chunk buffer"); + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + return -1; + } + /* + * flb_sds_cat modifies and returns the first argument, so working_buffer + * IS ctx->chunk_buffer (reallocated). Clear our reference to it. + */ + ctx->chunk_buffer = NULL; + token_start = working_buffer; + } + else { + flb_plg_debug(ctx->ins, "processing %zu bytes from new chunk", c->resp.payload_size); + token_start = c->resp.payload; + } - token_start = c->resp.payload; - token_end = strpbrk(token_start, JSON_ARRAY_DELIM); - while ( token_end != NULL && ret == 0 ) { + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + + while (token_end != NULL && ret == 0) { token_size = token_end - token_start; + + /* Skip empty lines */ + if (token_size == 0) { + token_start = token_end + 1; + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + continue; + } + ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed); - if (ret == -1) { - flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", - c->resp.payload); + if (ret == 0) { + /* Successfully parsed JSON */ + flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size); + ret = process_watched_event(ctx, buf_data, buf_size); + flb_free(buf_data); + buf_data = NULL; + + token_start = token_end + 1; + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); } else { - *bytes_consumed += token_size + 1; - ret = process_watched_event(ctx, buf_data, buf_size); + /* JSON parse failed - this line is incomplete, don't advance */ + flb_plg_debug(ctx->ins, "JSON parse failed for %zu bytes at offset %ld - will buffer", + token_size, token_start - (working_buffer ? working_buffer : c->resp.payload)); + break; } + } + + /* + * Calculate remaining unparsed data. + * If we broke out of the loop due to parse failure or no newline found, + * buffer the remaining data for the next chunk. + */ + if (working_buffer) { + remaining = flb_sds_len(working_buffer) - (token_start - working_buffer); + } + else { + remaining = c->resp.payload_size - (token_start - c->resp.payload); + } - flb_free(buf_data); - if (buf_data) { - buf_data = NULL; + if (remaining > 0) { + /* We have unparsed data - buffer it for next chunk */ + flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining); + ctx->chunk_buffer = flb_sds_create_len(token_start, remaining); + if (!ctx->chunk_buffer) { + flb_plg_error(ctx->ins, "failed to create chunk buffer"); + if (working_buffer) { + flb_sds_destroy(working_buffer); + } + if (buf_data) { + flb_free(buf_data); + } + return -1; } - token_start = token_end+1; - token_end = strpbrk(token_start, JSON_ARRAY_DELIM); } + else { + flb_plg_debug(ctx->ins, "all data processed, no buffering needed"); + } + + /* + * At this point we've either parsed all complete lines and/or buffered + * any remaining tail into ctx->chunk_buffer, so we no longer need any + * bytes from this HTTP payload. Tell the HTTP client that the whole + * payload has been consumed to avoid duplicates. + */ + *bytes_consumed = c->resp.payload_size; + if (working_buffer) { + flb_sds_destroy(working_buffer); + } + if (buf_data) { flb_free(buf_data); } + return ret; } @@ -889,6 +976,13 @@ static int check_and_init_stream(struct k8s_events *ctx) flb_upstream_conn_release(ctx->current_connection); ctx->current_connection = NULL; } + + /* Clear any buffered incomplete data on failure */ + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + } + return FLB_FALSE; } @@ -927,6 +1021,28 @@ static int k8s_events_collect(struct flb_input_instance *ins, } else if (ret == FLB_HTTP_OK) { flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval."); + + /* + * If there's buffered data when stream closes, try to process it. + * This handles the case where the last chunk doesn't end with a newline. + */ + if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) { + int buf_ret; + int root_type; + size_t consumed = 0; + char *buf_data = NULL; + size_t buf_size; + + buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer), + &buf_data, &buf_size, &root_type, &consumed); + if (buf_ret == 0) { + process_watched_event(ctx, buf_data, buf_size); + } + + if (buf_data) { + flb_free(buf_data); + } + } } else { flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", @@ -938,6 +1054,12 @@ static int k8s_events_collect(struct flb_input_instance *ins, flb_upstream_conn_release(ctx->current_connection); ctx->streaming_client = NULL; ctx->current_connection = NULL; + + /* Clear any buffered incomplete data when stream closes */ + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + } } pthread_mutex_unlock(&ctx->lock); diff --git a/plugins/in_kubernetes_events/kubernetes_events.h b/plugins/in_kubernetes_events/kubernetes_events.h index 1309406349f..43b136e0807 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.h +++ b/plugins/in_kubernetes_events/kubernetes_events.h @@ -85,6 +85,9 @@ struct k8s_events { struct flb_connection *current_connection; struct flb_http_client *streaming_client; + /* Buffer for incomplete JSON data from chunked responses */ + flb_sds_t chunk_buffer; + /* limit for event queries */ int limit_request; /* last highest seen resource_version */ diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.c b/plugins/in_kubernetes_events/kubernetes_events_conf.c index e84872f70e7..067d84a265e 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.c @@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) pthread_mutexattr_init(&attr); pthread_mutex_init(&ctx->lock, &attr); + /* Initialize buffer for incomplete chunk data */ + ctx->chunk_buffer = NULL; + /* Load the config map */ ret = flb_input_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx) flb_ra_destroy(ctx->ra_resource_version); } + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + } + if(ctx->streaming_client) { flb_http_client_destroy(ctx->streaming_client); } diff --git a/tests/runtime/in_kubernetes_events.c b/tests/runtime/in_kubernetes_events.c index d17707057f2..784bf2b140b 100644 --- a/tests/runtime/in_kubernetes_events.c +++ b/tests/runtime/in_kubernetes_events.c @@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv() test_ctx_destroy(ctx); } +/* Test with smaller chunks - splits single event into 3 chunks */ +void flb_test_events_with_3chunks() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int trys; + + int ret; + int num; + const char *filename = "eventlist_v1_with_lastTimestamp"; + const char *stream_filename = "watch_v1_with_lastTimestamp"; + + clear_output_num(); + + /* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */ + struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( + filename, stream_filename, 400 + ); + + cb_data.cb = cb_check_result_json; + cb_data.data = (void *)k8s_server; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) { + flb_time_msleep(1000); + } + + num = get_output_num(); + if (!TEST_CHECK(num >= 2)) { + TEST_MSG("2 output records are expected found %d", num); + } + + mock_k8s_api_destroy(k8s_server); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp}, {"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp}, //{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv}, + {"events_v1_with_3chunks", flb_test_events_with_3chunks}, {NULL, NULL} };