diff --git a/include/fluent-bit/aws/flb_aws_aggregation.h b/include/fluent-bit/aws/flb_aws_aggregation.h new file mode 100644 index 00000000000..c9a64f236f7 --- /dev/null +++ b/include/fluent-bit/aws/flb_aws_aggregation.h @@ -0,0 +1,90 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_AWS_AGGREGATION_H +#define FLB_AWS_AGGREGATION_H + +#include +#include + +/* Aggregation buffer structure */ +struct flb_aws_agg_buffer { + char *agg_buf; /* aggregated records buffer */ + size_t agg_buf_size; /* total size of aggregation buffer */ + size_t agg_buf_offset; /* current offset in aggregation buffer */ +}; + +/* Initialize aggregation buffer + * Returns: + * 0 = success + * -1 = error + */ +int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size); + +/* Destroy aggregation buffer */ +void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf); + +/* Try to add event data to aggregation buffer + * Returns: + * 0 = success, event added to aggregation buffer + * 1 = buffer full, caller should finalize and retry + */ +int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf, + const char *data, size_t data_len, + size_t max_record_size); + +/* Finalize aggregated record + * Returns: + * 0 = success + * -1 = error (no data to finalize) + * + * Output is written to buf->agg_buf and the size is returned via out_size parameter + */ +int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf, + int add_final_newline, + size_t *out_size); + +/* Reset aggregation buffer for reuse */ +void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf); + +/* Process event with simple aggregation + * Converts msgpack to JSON, optionally adds log_key and time_key, + * then adds to aggregation buffer + * + * Returns: + * -1 = failure, record not added + * 0 = success, record added + * 1 = buffer full, caller should finalize and retry + * 2 = record could not be processed, discard it + */ +int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf, + char *tmp_buf, + size_t tmp_buf_size, + size_t *tmp_buf_offset, + const msgpack_object *obj, + struct flb_time *tms, + struct flb_config *config, + struct flb_output_instance *ins, + const char *stream_name, + const char *log_key, + const char *time_key, + const char *time_key_format, + size_t max_event_size); + +#endif diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index 0b998599f05..d068279e0c5 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -314,10 +314,10 @@ static int cb_firehose_init(struct flb_output_instance *ins, return -1; } -struct flush *new_flush_buffer() +struct flush *new_flush_buffer(struct flb_firehose *ctx) { struct flush *buf; - + int ret; buf = flb_calloc(1, sizeof(struct flush)); if (!buf) { @@ -341,6 +341,18 @@ struct flush *new_flush_buffer() } buf->events_capacity = MAX_EVENTS_PER_PUT; + /* Initialize aggregation buffer if simple_aggregation is enabled */ + buf->agg_buf_initialized = FLB_FALSE; + if (ctx->simple_aggregation) { + ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer"); + flush_destroy(buf); + return NULL; + } + buf->agg_buf_initialized = FLB_TRUE; + } + return buf; } @@ -356,7 +368,7 @@ static void cb_firehose_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; - buf = new_flush_buffer(); + buf = new_flush_buffer(ctx); if (!buf) { flb_plg_error(ctx->ins, "Failed to construct flush buffer"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -508,6 +520,13 @@ static struct flb_config_map config_map[] = { "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " "$HOME/.aws/ directory." }, + + { + FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false", + 0, FLB_TRUE, offsetof(struct flb_firehose, simple_aggregation), + "Enable simple aggregation to combine multiple records into single API calls. " + "This reduces the number of requests and can improve throughput." + }, /* EOF */ {0} }; diff --git a/plugins/out_kinesis_firehose/firehose.h b/plugins/out_kinesis_firehose/firehose.h index 6d27bf783be..ac25701ef96 100644 --- a/plugins/out_kinesis_firehose/firehose.h +++ b/plugins/out_kinesis_firehose/firehose.h @@ -26,6 +26,7 @@ #include #include #include +#include #define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" @@ -56,6 +57,10 @@ struct flush { char *event_buf; size_t event_buf_size; + /* aggregation buffer for simple_aggregation mode */ + struct flb_aws_agg_buffer agg_buf; + int agg_buf_initialized; + int records_sent; int records_processed; }; @@ -94,6 +99,7 @@ struct flb_firehose { int custom_endpoint; int retry_requests; int compression; + int simple_aggregation; /* must be freed on shutdown if custom_endpoint is not set */ char *endpoint; diff --git a/plugins/out_kinesis_firehose/firehose_api.c b/plugins/out_kinesis_firehose/firehose_api.c index a87f2008d9f..0fd3d5c7c74 100644 --- a/plugins/out_kinesis_firehose/firehose_api.c +++ b/plugins/out_kinesis_firehose/firehose_api.c @@ -38,6 +38,7 @@ #include #include +#include #include #include @@ -53,6 +54,9 @@ #define ERR_CODE_SERVICE_UNAVAILABLE "ServiceUnavailableException" +/* Forward declarations */ +static int send_log_events(struct flb_firehose *ctx, struct flush *buf); + static struct flb_aws_header put_record_batch_header = { .key = "X-Amz-Target", .key_len = 12, @@ -141,6 +145,29 @@ static int end_put_payload(struct flb_firehose *ctx, struct flush *buf, } +/* + * Process event with simple aggregation (Firehose version) + * Uses shared aggregation implementation + */ +static int process_event_simple_aggregation(struct flb_firehose *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms, + struct flb_config *config) +{ + return flb_aws_aggregation_process_event(&buf->agg_buf, + buf->tmp_buf, + buf->tmp_buf_size, + &buf->tmp_buf_offset, + obj, + tms, + config, + ctx->ins, + ctx->delivery_stream, + ctx->log_key, + ctx->time_key, + ctx->time_key_format, + MAX_EVENT_SIZE); +} + /* * Processes the msgpack object * -1 = failure, record not added @@ -356,6 +383,102 @@ static void reset_flush_buf(struct flb_firehose *ctx, struct flush *buf) { buf->data_size += strlen(ctx->delivery_stream); } +/* Finalize and send aggregated record */ +static int send_aggregated_record(struct flb_firehose *ctx, struct flush *buf) { + int ret; + size_t agg_size; + size_t b64_len; + struct firehose_event *event; + void *compressed_tmp_buf; + size_t compressed_size; + + /* Finalize the aggregated record */ + ret = flb_aws_aggregation_finalize(&buf->agg_buf, 1, &agg_size); + if (ret < 0) { + /* No data to finalize */ + return 0; + } + + /* Handle compression if enabled */ + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + ret = flb_aws_compression_b64_truncate_compress(ctx->compression, + MAX_B64_EVENT_SIZE, + buf->agg_buf.agg_buf, + agg_size, + &compressed_tmp_buf, + &compressed_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s", + ctx->delivery_stream); + flb_aws_aggregation_reset(&buf->agg_buf); + return 0; + } + + /* Ensure event_buf is large enough */ + if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) { + flb_free(buf->event_buf); + buf->event_buf = compressed_tmp_buf; + buf->event_buf_size = compressed_size; + compressed_tmp_buf = NULL; + } else { + memcpy(buf->event_buf, compressed_tmp_buf, compressed_size); + flb_free(compressed_tmp_buf); + } + agg_size = compressed_size; + } + else { + /* Base64 encode the aggregated record */ + size_t size = (agg_size * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) buf->agg_buf.agg_buf, agg_size); + if (ret != 0) { + flb_errno(); + return -1; + } + agg_size = b64_len; + } + + /* Copy to tmp_buf for sending */ + if (buf->tmp_buf_size < agg_size) { + flb_plg_error(ctx->ins, "Aggregated record too large for buffer"); + flb_aws_aggregation_reset(&buf->agg_buf); + return 0; + } + + memcpy(buf->tmp_buf, buf->event_buf, agg_size); + + /* Create event record */ + event = &buf->events[0]; + event->json = buf->tmp_buf; + event->len = agg_size; + event->timestamp.tv_sec = 0; + event->timestamp.tv_nsec = 0; + buf->event_index = 1; + + /* Calculate data_size for the payload */ + buf->data_size = PUT_RECORD_BATCH_HEADER_LEN + PUT_RECORD_BATCH_FOOTER_LEN; + buf->data_size += strlen(ctx->delivery_stream); + buf->data_size += agg_size + PUT_RECORD_BATCH_PER_RECORD_LEN; + + /* Send the aggregated record */ + ret = send_log_events(ctx, buf); + + /* Reset aggregation buffer */ + flb_aws_aggregation_reset(&buf->agg_buf); + + return ret; +} + /* constructs a put payload, and then sends */ static int send_log_events(struct flb_firehose *ctx, struct flush *buf) { int ret; @@ -445,6 +568,46 @@ static int add_event(struct flb_firehose *ctx, struct flush *buf, reset_flush_buf(ctx, buf); } + /* Use simple aggregation if enabled */ + if (ctx->simple_aggregation) { +retry_add_event_agg: + retry_add = FLB_FALSE; + ret = process_event_simple_aggregation(ctx, buf, obj, tms, config); + if (ret < 0) { + return -1; + } + else if (ret == 1) { + /* Buffer full - check if buffer was empty before sending (record too large) */ + if (buf->agg_buf.agg_buf_offset == 0) { + flb_plg_warn(ctx->ins, "Discarding unprocessable record (too large for aggregation buffer), %s", + ctx->delivery_stream); + reset_flush_buf(ctx, buf); + return 0; + } + + /* Send aggregated record and retry */ + ret = send_aggregated_record(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + retry_add = FLB_TRUE; + } + else if (ret == 2) { + /* Discard this record */ + flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", + ctx->delivery_stream); + return 0; + } + + if (retry_add == FLB_TRUE) { + goto retry_add_event_agg; + } + + return 0; + } + + /* Normal processing without aggregation */ retry_add_event: retry_add = FLB_FALSE; ret = process_event(ctx, buf, obj, tms, config); @@ -603,7 +766,13 @@ int process_and_send_records(struct flb_firehose *ctx, struct flush *buf, flb_log_event_decoder_destroy(&log_decoder); /* send any remaining events */ - ret = send_log_events(ctx, buf); + if (ctx->simple_aggregation) { + /* Send any remaining aggregated data */ + ret = send_aggregated_record(ctx, buf); + } + else { + ret = send_log_events(ctx, buf); + } reset_flush_buf(ctx, buf); if (ret < 0) { @@ -953,6 +1122,9 @@ int put_record_batch(struct flb_firehose *ctx, struct flush *buf, void flush_destroy(struct flush *buf) { if (buf) { + if (buf->agg_buf_initialized) { + flb_aws_aggregation_destroy(&buf->agg_buf); + } flb_free(buf->tmp_buf); flb_free(buf->out_buf); flb_free(buf->events); diff --git a/plugins/out_kinesis_streams/kinesis.c b/plugins/out_kinesis_streams/kinesis.c index 03556752340..55a0cf41f53 100644 --- a/plugins/out_kinesis_streams/kinesis.c +++ b/plugins/out_kinesis_streams/kinesis.c @@ -308,10 +308,10 @@ static int cb_kinesis_init(struct flb_output_instance *ins, return -1; } -static struct flush *new_flush_buffer(const char *tag, int tag_len) +static struct flush *new_flush_buffer(struct flb_kinesis *ctx, const char *tag, int tag_len) { struct flush *buf; - + int ret; buf = flb_calloc(1, sizeof(struct flush)); if (!buf) { @@ -338,6 +338,18 @@ static struct flush *new_flush_buffer(const char *tag, int tag_len) buf->tag = tag; buf->tag_len = tag_len; + /* Initialize aggregation buffer if simple_aggregation is enabled */ + buf->agg_buf_initialized = FLB_FALSE; + if (ctx->simple_aggregation) { + ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer"); + kinesis_flush_destroy(buf); + return NULL; + } + buf->agg_buf_initialized = FLB_TRUE; + } + return buf; } @@ -353,7 +365,7 @@ static void cb_kinesis_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; - buf = new_flush_buffer(event_chunk->tag, flb_sds_len(event_chunk->tag)); + buf = new_flush_buffer(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag)); if (!buf) { flb_plg_error(ctx->ins, "Failed to construct flush buffer"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -503,6 +515,13 @@ static struct flb_config_map config_map[] = { "$HOME/.aws/ directory." }, + { + FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false", + 0, FLB_TRUE, offsetof(struct flb_kinesis, simple_aggregation), + "Enable simple aggregation to combine multiple records into single API calls. " + "This reduces the number of requests and can improve throughput." + }, + /* EOF */ {0} }; diff --git a/plugins/out_kinesis_streams/kinesis.h b/plugins/out_kinesis_streams/kinesis.h index a731d35841a..0faccf49bdf 100644 --- a/plugins/out_kinesis_streams/kinesis.h +++ b/plugins/out_kinesis_streams/kinesis.h @@ -26,6 +26,7 @@ #include #include #include +#include #define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" @@ -56,6 +57,10 @@ struct flush { char *event_buf; size_t event_buf_size; + /* aggregation buffer for simple_aggregation mode */ + struct flb_aws_agg_buffer agg_buf; + int agg_buf_initialized; + int records_sent; int records_processed; @@ -92,6 +97,7 @@ struct flb_kinesis { const char *log_key; const char *external_id; int retry_requests; + int simple_aggregation; char *sts_endpoint; int custom_endpoint; uint16_t port; diff --git a/plugins/out_kinesis_streams/kinesis_api.c b/plugins/out_kinesis_streams/kinesis_api.c index 3f6d0f939d8..e3e8b502694 100644 --- a/plugins/out_kinesis_streams/kinesis_api.c +++ b/plugins/out_kinesis_streams/kinesis_api.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,9 @@ #define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException" +/* Forward declarations */ +static int send_log_events(struct flb_kinesis *ctx, struct flush *buf); + static struct flb_aws_header put_records_target_header = { .key = "X-Amz-Target", .key_len = 12, @@ -202,6 +206,29 @@ static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf, } +/* + * Process event with simple aggregation (Kinesis Streams version) + * Uses shared aggregation implementation + */ +static int process_event_simple_aggregation(struct flb_kinesis *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms, + struct flb_config *config) +{ + return flb_aws_aggregation_process_event(&buf->agg_buf, + buf->tmp_buf, + buf->tmp_buf_size, + &buf->tmp_buf_offset, + obj, + tms, + config, + ctx->ins, + ctx->stream_name, + ctx->log_key, + ctx->time_key, + ctx->time_key_format, + MAX_EVENT_SIZE); +} + /* * Processes the msgpack object * -1 = failure, record not added @@ -391,6 +418,70 @@ static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) { buf->data_size += strlen(ctx->stream_name); } +/* Finalize and send aggregated record (Kinesis Streams version - no final newline) */ +static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) { + int ret; + size_t agg_size; + size_t b64_len; + struct kinesis_event *event; + + /* Finalize without final newline (Kinesis Streams doesn't need it) */ + ret = flb_aws_aggregation_finalize(&buf->agg_buf, 0, &agg_size); + if (ret < 0) { + return 0; + } + + /* Base64 encode the aggregated record */ + size_t size = (agg_size * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) buf->agg_buf.agg_buf, agg_size); + if (ret != 0) { + flb_errno(); + return -1; + } + agg_size = b64_len; + + /* Copy to tmp_buf */ + if (buf->tmp_buf_size < agg_size) { + flb_plg_error(ctx->ins, "Aggregated record too large for buffer"); + flb_aws_aggregation_reset(&buf->agg_buf); + return 0; + } + + memcpy(buf->tmp_buf, buf->event_buf, agg_size); + + /* Create event record */ + event = &buf->events[0]; + event->json = buf->tmp_buf; + event->len = agg_size; + event->timestamp.tv_sec = 0; + event->timestamp.tv_nsec = 0; + buf->event_index = 1; + + /* Calculate data_size for the payload */ + buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN; + buf->data_size += strlen(ctx->stream_name); + buf->data_size += agg_size + PUT_RECORDS_PER_RECORD_LEN; + + /* Send the aggregated record */ + ret = send_log_events(ctx, buf); + + /* Reset aggregation buffer */ + flb_aws_aggregation_reset(&buf->agg_buf); + + return ret; +} + /* constructs a put payload, and then sends */ static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) { int ret; @@ -476,10 +567,48 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf, size_t event_bytes = 0; if (buf->event_index == 0) { - /* init */ reset_flush_buf(ctx, buf); } + /* Use simple aggregation if enabled */ + if (ctx->simple_aggregation) { +retry_add_event_agg: + retry_add = FLB_FALSE; + ret = process_event_simple_aggregation(ctx, buf, obj, tms, config); + if (ret < 0) { + return -1; + } + else if (ret == 1) { + /* Buffer full - check if buffer was empty before sending (record too large) */ + if (buf->agg_buf.agg_buf_offset == 0) { + flb_plg_warn(ctx->ins, "Discarding unprocessable record (too large for aggregation buffer), %s", + ctx->stream_name); + reset_flush_buf(ctx, buf); + return 0; + } + + /* Send aggregated record and retry */ + ret = send_aggregated_record(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + retry_add = FLB_TRUE; + } + else if (ret == 2) { + flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", + ctx->stream_name); + return 0; + } + + if (retry_add == FLB_TRUE) { + goto retry_add_event_agg; + } + + return 0; + } + + /* Normal processing without aggregation */ retry_add_event: retry_add = FLB_FALSE; ret = process_event(ctx, buf, obj, tms, config); @@ -488,16 +617,13 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf, } else if (ret == 1) { if (buf->event_index <= 0) { - /* somehow the record was larger than our entire request buffer */ flb_plg_warn(ctx->ins, "Discarding massive log record, %s", ctx->stream_name); - return 0; /* discard this record and return to caller */ + return 0; } - /* send logs and then retry the add */ retry_add = FLB_TRUE; goto send; } else if (ret == 2) { - /* discard this record and return to caller */ flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", ctx->stream_name); return 0; @@ -508,17 +634,14 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf, if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) { if (buf->event_index <= 0) { - /* somehow the record was larger than our entire request buffer */ flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s", event_bytes, ctx->stream_name); - return 0; /* discard this record and return to caller */ + return 0; } - /* do not send this event */ retry_add = FLB_TRUE; goto send; } - /* send is not needed yet, return to caller */ buf->data_size += event_bytes; buf->event_index++; @@ -633,7 +756,12 @@ int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, flb_log_event_decoder_destroy(&log_decoder); /* send any remaining events */ - ret = send_log_events(ctx, buf); + if (ctx->simple_aggregation) { + ret = send_aggregated_record(ctx, buf); + } + else { + ret = send_log_events(ctx, buf); + } reset_flush_buf(ctx, buf); if (ret < 0) { return -1; @@ -981,6 +1109,9 @@ int put_records(struct flb_kinesis *ctx, struct flush *buf, void kinesis_flush_destroy(struct flush *buf) { if (buf) { + if (buf->agg_buf_initialized) { + flb_aws_aggregation_destroy(&buf->agg_buf); + } flb_free(buf->tmp_buf); flb_free(buf->out_buf); flb_free(buf->events); diff --git a/src/aws/CMakeLists.txt b/src/aws/CMakeLists.txt index 941e811b633..a8d1bdf7bbb 100644 --- a/src/aws/CMakeLists.txt +++ b/src/aws/CMakeLists.txt @@ -15,6 +15,7 @@ set(src "flb_aws_imds.c" "flb_aws_credentials_http.c" "flb_aws_credentials_profile.c" + "flb_aws_aggregation.c" ) message(STATUS "=== AWS Credentials ===") diff --git a/src/aws/flb_aws_aggregation.c b/src/aws/flb_aws_aggregation.c new file mode 100644 index 00000000000..c27b8d4dcd6 --- /dev/null +++ b/src/aws/flb_aws_aggregation.c @@ -0,0 +1,234 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size) +{ + if (!buf) { + return -1; + } + + buf->agg_buf = flb_malloc(max_record_size); + if (!buf->agg_buf) { + flb_errno(); + return -1; + } + + buf->agg_buf_size = max_record_size; + buf->agg_buf_offset = 0; + + return 0; +} + +void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf) +{ + if (buf && buf->agg_buf) { + flb_free(buf->agg_buf); + buf->agg_buf = NULL; + buf->agg_buf_size = 0; + buf->agg_buf_offset = 0; + } +} + +int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf, + const char *data, size_t data_len, + size_t max_record_size) +{ + if (!buf || !data || data_len == 0) { + return -1; + } + + /* Check if adding this data would exceed the max record size */ + if (buf->agg_buf_offset + data_len > max_record_size) { + /* Buffer full, caller should finalize and retry */ + return 1; + } + + /* Add data to aggregation buffer */ + memcpy(buf->agg_buf + buf->agg_buf_offset, data, data_len); + buf->agg_buf_offset += data_len; + + return 0; +} + +int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf, + int add_final_newline, + size_t *out_size) +{ + if (!buf || !out_size) { + return -1; + } + + /* Check if there's any data to finalize */ + if (buf->agg_buf_offset == 0) { + return -1; + } + + /* Add final newline if requested (for Firehose) */ + if (add_final_newline && buf->agg_buf_offset < buf->agg_buf_size) { + buf->agg_buf[buf->agg_buf_offset] = '\n'; + buf->agg_buf_offset++; + } + + *out_size = buf->agg_buf_offset; + return 0; +} + +void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf) +{ + if (buf) { + buf->agg_buf_offset = 0; + } +} + +/* + * Process event with simple aggregation + * Shared implementation for Kinesis Streams and Firehose + */ +int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf, + char *tmp_buf, + size_t tmp_buf_size, + size_t *tmp_buf_offset, + const msgpack_object *obj, + struct flb_time *tms, + struct flb_config *config, + struct flb_output_instance *ins, + const char *stream_name, + const char *log_key, + const char *time_key, + const char *time_key_format, + size_t max_event_size) +{ + size_t written = 0; + int ret; + char *tmp_buf_ptr; + char *time_key_ptr; + struct tm time_stamp; + struct tm *tmp; + size_t len; + size_t tmp_size; + char *out_buf; + + tmp_buf_ptr = tmp_buf + *tmp_buf_offset; + ret = flb_msgpack_to_json(tmp_buf_ptr, + tmp_buf_size - *tmp_buf_offset, + obj, config->json_escape_unicode); + if (ret <= 0) { + return 1; + } + written = (size_t) ret; + + /* Discard empty messages */ + if (written <= 2) { + flb_plg_debug(ins, "Found empty log message, %s", stream_name); + return 2; + } + + if (log_key) { + written -= 2; + tmp_buf_ptr++; + (*tmp_buf_offset)++; + } + + if ((written + 1) >= max_event_size) { + flb_plg_warn(ins, "[size=%zu] Discarding record which is larger than " + "max size allowed, %s", written + 1, stream_name); + return 2; + } + + if (time_key) { + tmp = gmtime_r(&tms->tm.tv_sec, &time_stamp); + if (!tmp) { + flb_plg_error(ins, "Could not create time stamp for %lu unix " + "seconds, discarding record, %s", tms->tm.tv_sec, stream_name); + return 2; + } + + len = flb_aws_strftime_precision(&out_buf, time_key_format, tms); + tmp_size = (tmp_buf_size - *tmp_buf_offset) - written; + if (len > tmp_size) { + flb_free(out_buf); + return 1; + } + + if (len == 0) { + flb_plg_error(ins, "Failed to add time_key %s to record, %s", + time_key, stream_name); + flb_free(out_buf); + return 2; + } + else { + time_key_ptr = tmp_buf_ptr + written - 1; + memcpy(time_key_ptr, ",", 1); + time_key_ptr++; + memcpy(time_key_ptr, "\"", 1); + time_key_ptr++; + memcpy(time_key_ptr, time_key, strlen(time_key)); + time_key_ptr += strlen(time_key); + memcpy(time_key_ptr, "\":\"", 3); + time_key_ptr += 3; + + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); + time_key_ptr += len; + memcpy(time_key_ptr, "\"}", 2); + time_key_ptr += 2; + written = (time_key_ptr - tmp_buf_ptr); + } + } + + if ((written + 1) >= max_event_size) { + flb_plg_warn(ins, "[size=%zu] Discarding record which is larger than " + "max size allowed, %s", written + 1, stream_name); + return 2; + } + + /* Append newline */ + tmp_size = (tmp_buf_size - *tmp_buf_offset) - written; + if (tmp_size <= 1) { + return 1; + } + + memcpy(tmp_buf_ptr + written, "\n", 1); + written++; + + /* Try to add to aggregation buffer */ + tmp_buf_ptr = tmp_buf + *tmp_buf_offset; + ret = flb_aws_aggregation_add(agg_buf, tmp_buf_ptr, written, max_event_size); + + if (ret == 1) { + return 1; + } + else if (ret < 0) { + flb_plg_error(ins, "Failed to add record to aggregation buffer"); + return -1; + } + + *tmp_buf_offset += written; + return 0; +} diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 45d769b9c25..a9ab28649f0 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -138,6 +138,7 @@ if(FLB_AWS) ${UNIT_TESTS_FILES} aws_util.c aws_compress.c + aws_aggregation.c aws_credentials.c aws_credentials_ec2.c aws_credentials_sts.c diff --git a/tests/internal/aws_aggregation.c b/tests/internal/aws_aggregation.c new file mode 100644 index 00000000000..a29ca5ec83f --- /dev/null +++ b/tests/internal/aws_aggregation.c @@ -0,0 +1,544 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "flb_tests_internal.h" + +#include + +#define MAX_RECORD_SIZE 1024000 + +/* Test: Initialize and destroy aggregation buffer */ +void test_aws_aggregation_init_destroy() +{ + struct flb_aws_agg_buffer buf; + int ret; + + /* Test successful initialization */ + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf != NULL); + TEST_CHECK(buf.agg_buf_size == MAX_RECORD_SIZE); + TEST_CHECK(buf.agg_buf_offset == 0); + + /* Test destroy */ + flb_aws_aggregation_destroy(&buf); + TEST_CHECK(buf.agg_buf == NULL); + TEST_CHECK(buf.agg_buf_size == 0); + TEST_CHECK(buf.agg_buf_offset == 0); +} + +/* Test: Add single record to buffer */ +void test_aws_aggregation_add_single() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "{\"message\":\"test\"}\n"; + size_t data_len = strlen(data); + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Add single record */ + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == data_len); + + /* Verify content */ + TEST_CHECK(memcmp(buf.agg_buf, data, data_len) == 0); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Add multiple records to buffer */ +void test_aws_aggregation_add_multiple() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data1 = "{\"message\":\"test1\"}\n"; + const char *data2 = "{\"message\":\"test2\"}\n"; + const char *data3 = "{\"message\":\"test3\"}\n"; + size_t len1 = strlen(data1); + size_t len2 = strlen(data2); + size_t len3 = strlen(data3); + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Add first record */ + ret = flb_aws_aggregation_add(&buf, data1, len1, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == len1); + + /* Add second record */ + ret = flb_aws_aggregation_add(&buf, data2, len2, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == len1 + len2); + + /* Add third record */ + ret = flb_aws_aggregation_add(&buf, data3, len3, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == len1 + len2 + len3); + + /* Verify all content is concatenated */ + TEST_CHECK(memcmp(buf.agg_buf, data1, len1) == 0); + TEST_CHECK(memcmp(buf.agg_buf + len1, data2, len2) == 0); + TEST_CHECK(memcmp(buf.agg_buf + len1 + len2, data3, len3) == 0); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Buffer full detection */ +void test_aws_aggregation_buffer_full() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t small_size = 100; + char data[150]; + + /* Initialize with small buffer */ + ret = flb_aws_aggregation_init(&buf, small_size); + TEST_CHECK(ret == 0); + + /* Create data larger than buffer */ + memset(data, 'A', sizeof(data) - 1); + data[sizeof(data) - 1] = '\0'; + + /* Try to add data that exceeds buffer size */ + ret = flb_aws_aggregation_add(&buf, data, sizeof(data) - 1, small_size); + TEST_CHECK(ret == 1); /* Should return 1 (buffer full) */ + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Buffer full with multiple adds */ +void test_aws_aggregation_buffer_full_multiple() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t small_size = 100; + const char *data = "0123456789"; /* 10 bytes */ + size_t data_len = strlen(data); + int i; + + ret = flb_aws_aggregation_init(&buf, small_size); + TEST_CHECK(ret == 0); + + /* Add records until buffer is full */ + for (i = 0; i < 9; i++) { + ret = flb_aws_aggregation_add(&buf, data, data_len, small_size); + TEST_CHECK(ret == 0); /* Should succeed */ + } + + /* This should fill the buffer (90 bytes used) */ + TEST_CHECK(buf.agg_buf_offset == 90); + + /* Try to add one more (would be 100 bytes, at limit) */ + ret = flb_aws_aggregation_add(&buf, data, data_len, small_size); + TEST_CHECK(ret == 0); /* Should succeed, exactly at limit */ + + /* Now buffer is full, next add should fail */ + ret = flb_aws_aggregation_add(&buf, data, data_len, small_size); + TEST_CHECK(ret == 1); /* Should return 1 (buffer full) */ + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Finalize with both modes (with and without newline) */ +void test_aws_aggregation_finalize() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "{\"message\":\"test\"}\n"; + size_t data_len = strlen(data); + size_t out_size; + + /* Test without newline (Kinesis Streams mode) */ + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == data_len); + + flb_aws_aggregation_destroy(&buf); + + /* Test with newline (Firehose mode) */ + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_finalize(&buf, 1, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == data_len + 1); + TEST_CHECK(buf.agg_buf[data_len] == '\n'); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Finalize empty buffer */ +void test_aws_aggregation_finalize_empty() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t out_size; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Try to finalize empty buffer */ + ret = flb_aws_aggregation_finalize(&buf, 1, &out_size); + TEST_CHECK(ret == -1); /* Should fail */ + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Reset and reuse buffer (complete cycle) */ +void test_aws_aggregation_reset_reuse() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data1 = "{\"message\":\"test1\"}\n"; + const char *data2 = "{\"message\":\"test2\"}\n"; + size_t len1 = strlen(data1); + size_t len2 = strlen(data2); + size_t out_size; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* First batch */ + ret = flb_aws_aggregation_add(&buf, data1, len1, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_finalize(&buf, 1, &out_size); + TEST_CHECK(ret == 0); + + /* Reset for reuse */ + flb_aws_aggregation_reset(&buf); + TEST_CHECK(buf.agg_buf_offset == 0); + + /* Second batch */ + ret = flb_aws_aggregation_add(&buf, data2, len2, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == len2); + + ret = flb_aws_aggregation_finalize(&buf, 1, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == len2 + 1); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Large aggregation (many small records) */ +void test_aws_aggregation_large() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "{\"msg\":\"x\"}\n"; /* 12 bytes */ + size_t data_len = strlen(data); + int i; + int count = 1000; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Add 1000 small records */ + for (i = 0; i < count; i++) { + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + } + + TEST_CHECK(buf.agg_buf_offset == data_len * count); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: NULL parameter handling */ +void test_aws_aggregation_null_params() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t out_size; + + /* Test init with NULL */ + ret = flb_aws_aggregation_init(NULL, MAX_RECORD_SIZE); + TEST_CHECK(ret == -1); + + /* Initialize properly for other tests */ + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Test add with NULL buffer */ + ret = flb_aws_aggregation_add(NULL, "data", 4, MAX_RECORD_SIZE); + TEST_CHECK(ret == -1); + + /* Test add with NULL data */ + ret = flb_aws_aggregation_add(&buf, NULL, 4, MAX_RECORD_SIZE); + TEST_CHECK(ret == -1); + + /* Test add with zero length */ + ret = flb_aws_aggregation_add(&buf, "data", 0, MAX_RECORD_SIZE); + TEST_CHECK(ret == -1); + + /* Test finalize with NULL buffer */ + ret = flb_aws_aggregation_finalize(NULL, 1, &out_size); + TEST_CHECK(ret == -1); + + /* Test finalize with NULL out_size */ + ret = flb_aws_aggregation_finalize(&buf, 1, NULL); + TEST_CHECK(ret == -1); + + /* Test destroy with NULL (should not crash) */ + flb_aws_aggregation_destroy(NULL); + + /* Test reset with NULL (should not crash) */ + flb_aws_aggregation_reset(NULL); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Exact boundary conditions */ +void test_aws_aggregation_boundary() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t exact_size = 50; + char data[50]; + size_t out_size; + + /* Initialize with exact size */ + ret = flb_aws_aggregation_init(&buf, exact_size); + TEST_CHECK(ret == 0); + + /* Fill exactly to the boundary */ + memset(data, 'X', exact_size); + ret = flb_aws_aggregation_add(&buf, data, exact_size, exact_size); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == exact_size); + + /* Try to add one more byte - should fail */ + ret = flb_aws_aggregation_add(&buf, "Y", 1, exact_size); + TEST_CHECK(ret == 1); + + /* Finalize without newline should work */ + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == exact_size); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Finalize with newline at boundary */ +void test_aws_aggregation_finalize_boundary() +{ + struct flb_aws_agg_buffer buf; + int ret; + size_t size = 100; + char data[99]; + size_t out_size; + + ret = flb_aws_aggregation_init(&buf, size); + TEST_CHECK(ret == 0); + + /* Fill to size-1 to leave room for newline */ + memset(data, 'A', 99); + ret = flb_aws_aggregation_add(&buf, data, 99, size); + TEST_CHECK(ret == 0); + + /* Finalize with newline should work */ + ret = flb_aws_aggregation_finalize(&buf, 1, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == 100); + TEST_CHECK(buf.agg_buf[99] == '\n'); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Multiple reset cycles */ +void test_aws_aggregation_multiple_resets() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "test_data\n"; + size_t data_len = strlen(data); + int i; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Perform multiple add/reset cycles */ + for (i = 0; i < 10; i++) { + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == data_len); + + flb_aws_aggregation_reset(&buf); + TEST_CHECK(buf.agg_buf_offset == 0); + } + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Very small buffer size */ +void test_aws_aggregation_tiny_buffer() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "AB"; + size_t out_size; + + /* Initialize with very small buffer */ + ret = flb_aws_aggregation_init(&buf, 10); + TEST_CHECK(ret == 0); + + /* Add small data */ + ret = flb_aws_aggregation_add(&buf, data, 2, 10); + TEST_CHECK(ret == 0); + + /* Add more small data */ + ret = flb_aws_aggregation_add(&buf, data, 2, 10); + TEST_CHECK(ret == 0); + + /* Should have 4 bytes */ + TEST_CHECK(buf.agg_buf_offset == 4); + + /* Finalize */ + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == 4); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Sequential finalize without reset */ +void test_aws_aggregation_double_finalize() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "test\n"; + size_t data_len = strlen(data); + size_t out_size1, out_size2; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + ret = flb_aws_aggregation_add(&buf, data, data_len, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* First finalize */ + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size1); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size1 == data_len); + + /* Second finalize without reset - should still work */ + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size2); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size2 == out_size1); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Add after finalize without reset */ +void test_aws_aggregation_add_after_finalize() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data1 = "first\n"; + const char *data2 = "second\n"; + size_t len1 = strlen(data1); + size_t len2 = strlen(data2); + size_t out_size; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Add first data */ + ret = flb_aws_aggregation_add(&buf, data1, len1, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Finalize */ + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size); + TEST_CHECK(ret == 0); + + /* Add more data without reset - should append */ + ret = flb_aws_aggregation_add(&buf, data2, len2, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == len1 + len2); + + flb_aws_aggregation_destroy(&buf); +} + +/* Test: Alternating add and finalize patterns */ +void test_aws_aggregation_alternating_pattern() +{ + struct flb_aws_agg_buffer buf; + int ret; + const char *data = "X"; + size_t out_size; + int i; + + ret = flb_aws_aggregation_init(&buf, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + + /* Add one byte, finalize, reset - repeat */ + for (i = 0; i < 5; i++) { + ret = flb_aws_aggregation_add(&buf, data, 1, MAX_RECORD_SIZE); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.agg_buf_offset == 1); + + ret = flb_aws_aggregation_finalize(&buf, 0, &out_size); + TEST_CHECK(ret == 0); + TEST_CHECK(out_size == 1); + + flb_aws_aggregation_reset(&buf); + TEST_CHECK(buf.agg_buf_offset == 0); + } + + flb_aws_aggregation_destroy(&buf); +} + +/* Test list */ +TEST_LIST = { + {"aws_aggregation_init_destroy", test_aws_aggregation_init_destroy}, + {"aws_aggregation_add_single", test_aws_aggregation_add_single}, + {"aws_aggregation_add_multiple", test_aws_aggregation_add_multiple}, + {"aws_aggregation_buffer_full", test_aws_aggregation_buffer_full}, + {"aws_aggregation_buffer_full_multiple", test_aws_aggregation_buffer_full_multiple}, + {"aws_aggregation_finalize", test_aws_aggregation_finalize}, + {"aws_aggregation_finalize_empty", test_aws_aggregation_finalize_empty}, + {"aws_aggregation_reset_reuse", test_aws_aggregation_reset_reuse}, + {"aws_aggregation_large", test_aws_aggregation_large}, + {"aws_aggregation_null_params", test_aws_aggregation_null_params}, + {"aws_aggregation_boundary", test_aws_aggregation_boundary}, + {"aws_aggregation_finalize_boundary", test_aws_aggregation_finalize_boundary}, + {"aws_aggregation_multiple_resets", test_aws_aggregation_multiple_resets}, + {"aws_aggregation_tiny_buffer", test_aws_aggregation_tiny_buffer}, + {"aws_aggregation_double_finalize", test_aws_aggregation_double_finalize}, + {"aws_aggregation_add_after_finalize", test_aws_aggregation_add_after_finalize}, + {"aws_aggregation_alternating_pattern", test_aws_aggregation_alternating_pattern}, + {NULL, NULL} +}; diff --git a/tests/runtime/out_firehose.c b/tests/runtime/out_firehose.c index cead6b3cac1..e69637070c7 100644 --- a/tests/runtime/out_firehose.c +++ b/tests/runtime/out_firehose.c @@ -189,6 +189,341 @@ void flb_test_firehose_nonsense_error(void) } +void flb_test_firehose_simple_aggregation(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push multiple small records */ + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test1\"}]", 25); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test2\"}]", 25); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test3\"}]", 25); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_with_time_key(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "time_key", "timestamp", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with time_key enabled */ + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"with_time1\"}]", 30); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"with_time2\"}]", 30); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_with_log_key(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record = "[1, {\"message\":\"with_log_key\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "log_key", "log", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with log_key enabled */ + flb_lib_push(ctx, in_ffd, (char *) record, strlen(record)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_many_records(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int i; + char record[100]; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push many small records to test aggregation efficiency */ + for (i = 0; i < 50; i++) { + ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i); + TEST_CHECK(ret < sizeof(record)); + flb_lib_push(ctx, in_ffd, record, strlen(record)); + } + + sleep(3); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_with_compression(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"compressed1\"}]"; + const char *record2 = "[1, {\"message\":\"compressed2\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "compression", "gzip", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with compression enabled */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_combined_params(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record = "[1, {\"message\":\"combined_test\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "time_key", "timestamp", NULL); + flb_output_set(ctx, out_ffd, "time_key_format", "%Y-%m-%d %H:%M:%S", NULL); + flb_output_set(ctx, out_ffd, "compression", "gzip", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Test with all features combined */ + flb_lib_push(ctx, in_ffd, (char *) record, strlen(record)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_empty_records(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push empty and minimal records */ + flb_lib_push(ctx, in_ffd, (char *) "[1, {}]", 7); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"a\":\"\"}]", 13); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_aggregation_error_handling(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"error_test1\"}]"; + const char *record2 = "[1, {\"message\":\"error_test2\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + setenv("TEST_PUT_RECORD_BATCH_ERROR", ERROR_THROUGHPUT, 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Test error handling with aggregation enabled */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + unsetenv("TEST_PUT_RECORD_BATCH_ERROR"); +} + +void flb_test_firehose_aggregation_custom_time_format(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record = "[1, {\"message\":\"unix_time\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "time_key", "ts", NULL); + flb_output_set(ctx, out_ffd, "time_key_format", "%s", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Test with Unix timestamp format */ + flb_lib_push(ctx, in_ffd, (char *) record, strlen(record)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"success", flb_test_firehose_success }, @@ -196,5 +531,14 @@ TEST_LIST = { {"throughput_error", flb_test_firehose_throughput_error }, {"unknown_error", flb_test_firehose_error_unknown }, {"nonsense_error", flb_test_firehose_nonsense_error }, + {"simple_aggregation", flb_test_firehose_simple_aggregation }, + {"aggregation_with_time_key", flb_test_firehose_aggregation_with_time_key }, + {"aggregation_with_log_key", flb_test_firehose_aggregation_with_log_key }, + {"aggregation_many_records", flb_test_firehose_aggregation_many_records }, + {"aggregation_with_compression", flb_test_firehose_aggregation_with_compression }, + {"aggregation_combined_params", flb_test_firehose_aggregation_combined_params }, + {"aggregation_empty_records", flb_test_firehose_aggregation_empty_records }, + {"aggregation_error_handling", flb_test_firehose_aggregation_error_handling }, + {"aggregation_custom_time_format", flb_test_firehose_aggregation_custom_time_format }, {NULL, NULL} }; diff --git a/tests/runtime/out_kinesis.c b/tests/runtime/out_kinesis.c index fa66a7efb19..1b6f74e8b24 100644 --- a/tests/runtime/out_kinesis.c +++ b/tests/runtime/out_kinesis.c @@ -295,6 +295,154 @@ void flb_test_kinesis_invalid_port(void) flb_destroy(ctx); } +void flb_test_kinesis_simple_aggregation(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push multiple small records */ + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test1\"}]", 25); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test2\"}]", 25); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"test3\"}]", 25); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_aggregation_with_time_key(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "time_key", "timestamp", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with time_key enabled */ + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"with_time1\"}]", 30); + flb_lib_push(ctx, in_ffd, (char *) "[1, {\"message\":\"with_time2\"}]", 30); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_aggregation_with_log_key(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record = "[1, {\"message\":\"with_log_key\"}]"; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "log_key", "log", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with log_key enabled */ + flb_lib_push(ctx, in_ffd, (char *) record, strlen(record)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_aggregation_many_records(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int i; + char record[100]; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push many small records to test aggregation efficiency */ + for (i = 0; i < 50; i++) { + ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"test\"}]", i); + TEST_CHECK(ret < sizeof(record)); + flb_lib_push(ctx, in_ffd, record, strlen(record)); + } + + sleep(3); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"success", flb_test_firehose_success }, @@ -305,5 +453,9 @@ TEST_LIST = { {"default_port", flb_test_kinesis_default_port }, {"custom_port", flb_test_kinesis_custom_port }, {"invalid_port", flb_test_kinesis_invalid_port }, + {"simple_aggregation", flb_test_kinesis_simple_aggregation }, + {"aggregation_with_time_key", flb_test_kinesis_aggregation_with_time_key }, + {"aggregation_with_log_key", flb_test_kinesis_aggregation_with_log_key }, + {"aggregation_many_records", flb_test_kinesis_aggregation_many_records }, {NULL, NULL} };