Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions include/fluent-bit/aws/flb_aws_aggregation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2025 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_AWS_AGGREGATION_H
#define FLB_AWS_AGGREGATION_H

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_time.h>

/* Aggregation buffer structure */
struct flb_aws_agg_buffer {
char *agg_buf; /* aggregated records buffer */
size_t agg_buf_size; /* total size of aggregation buffer */
size_t agg_buf_offset; /* current offset in aggregation buffer */
};

/* Initialize aggregation buffer
* Returns:
* 0 = success
* -1 = error
*/
int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size);

/* Destroy aggregation buffer */
void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf);

/* Try to add event data to aggregation buffer
* Returns:
* 0 = success, event added to aggregation buffer
* 1 = buffer full, caller should finalize and retry
*/
int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf,
const char *data, size_t data_len,
size_t max_record_size);

/* Finalize aggregated record
* Returns:
* 0 = success
* -1 = error (no data to finalize)
*
* Output is written to buf->agg_buf and the size is returned via out_size parameter
*/
int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf,
int add_final_newline,
size_t *out_size);

/* Reset aggregation buffer for reuse */
void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf);

/* Process event with simple aggregation
* Converts msgpack to JSON, optionally adds log_key and time_key,
* then adds to aggregation buffer
*
* Returns:
* -1 = failure, record not added
* 0 = success, record added
* 1 = buffer full, caller should finalize and retry
* 2 = record could not be processed, discard it
*/
int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf,
char *tmp_buf,
size_t tmp_buf_size,
size_t *tmp_buf_offset,
const msgpack_object *obj,
struct flb_time *tms,
struct flb_config *config,
struct flb_output_instance *ins,
const char *stream_name,
const char *log_key,
const char *time_key,
const char *time_key_format,
size_t max_event_size);

#endif
25 changes: 22 additions & 3 deletions plugins/out_kinesis_firehose/firehose.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,10 @@ static int cb_firehose_init(struct flb_output_instance *ins,
return -1;
}

struct flush *new_flush_buffer()
struct flush *new_flush_buffer(struct flb_firehose *ctx)
{
struct flush *buf;

int ret;

buf = flb_calloc(1, sizeof(struct flush));
if (!buf) {
Expand All @@ -341,6 +341,18 @@ struct flush *new_flush_buffer()
}
buf->events_capacity = MAX_EVENTS_PER_PUT;

/* Initialize aggregation buffer if simple_aggregation is enabled */
buf->agg_buf_initialized = FLB_FALSE;
if (ctx->simple_aggregation) {
ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer");
flush_destroy(buf);
return NULL;
}
buf->agg_buf_initialized = FLB_TRUE;
}

return buf;
}

Expand All @@ -356,7 +368,7 @@ static void cb_firehose_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

buf = new_flush_buffer();
buf = new_flush_buffer(ctx);
if (!buf) {
flb_plg_error(ctx->ins, "Failed to construct flush buffer");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand Down Expand Up @@ -508,6 +520,13 @@ static struct flb_config_map config_map[] = {
"AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
"$HOME/.aws/ directory."
},

{
FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false",
0, FLB_TRUE, offsetof(struct flb_firehose, simple_aggregation),
"Enable simple aggregation to combine multiple records into single API calls. "
"This reduces the number of requests and can improve throughput."
},
/* EOF */
{0}
};
Expand Down
6 changes: 6 additions & 0 deletions plugins/out_kinesis_firehose/firehose.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_aws_util.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/aws/flb_aws_aggregation.h>

#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"

Expand Down Expand Up @@ -56,6 +57,10 @@ struct flush {
char *event_buf;
size_t event_buf_size;

/* aggregation buffer for simple_aggregation mode */
struct flb_aws_agg_buffer agg_buf;
int agg_buf_initialized;

int records_sent;
int records_processed;
};
Expand Down Expand Up @@ -94,6 +99,7 @@ struct flb_firehose {
int custom_endpoint;
int retry_requests;
int compression;
int simple_aggregation;

/* must be freed on shutdown if custom_endpoint is not set */
char *endpoint;
Expand Down
174 changes: 173 additions & 1 deletion plugins/out_kinesis_firehose/firehose_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include <fluent-bit/flb_base64.h>
#include <fluent-bit/aws/flb_aws_compress.h>
#include <fluent-bit/aws/flb_aws_aggregation.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand All @@ -53,6 +54,9 @@

#define ERR_CODE_SERVICE_UNAVAILABLE "ServiceUnavailableException"

/* Forward declarations */
static int send_log_events(struct flb_firehose *ctx, struct flush *buf);

static struct flb_aws_header put_record_batch_header = {
.key = "X-Amz-Target",
.key_len = 12,
Expand Down Expand Up @@ -141,6 +145,29 @@ static int end_put_payload(struct flb_firehose *ctx, struct flush *buf,
}


/*
* Process event with simple aggregation (Firehose version)
* Uses shared aggregation implementation
*/
static int process_event_simple_aggregation(struct flb_firehose *ctx, struct flush *buf,
const msgpack_object *obj, struct flb_time *tms,
struct flb_config *config)
{
return flb_aws_aggregation_process_event(&buf->agg_buf,
buf->tmp_buf,
buf->tmp_buf_size,
&buf->tmp_buf_offset,
obj,
tms,
config,
ctx->ins,
ctx->delivery_stream,
ctx->log_key,
ctx->time_key,
ctx->time_key_format,
MAX_EVENT_SIZE);
}

/*
* Processes the msgpack object
* -1 = failure, record not added
Expand Down Expand Up @@ -356,6 +383,102 @@ static void reset_flush_buf(struct flb_firehose *ctx, struct flush *buf) {
buf->data_size += strlen(ctx->delivery_stream);
}

/* Finalize and send aggregated record */
static int send_aggregated_record(struct flb_firehose *ctx, struct flush *buf) {
int ret;
size_t agg_size;
size_t b64_len;
struct firehose_event *event;
void *compressed_tmp_buf;
size_t compressed_size;

/* Finalize the aggregated record */
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 1, &agg_size);
if (ret < 0) {
/* No data to finalize */
return 0;
}

/* Handle compression if enabled */
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
MAX_B64_EVENT_SIZE,
buf->agg_buf.agg_buf,
agg_size,
&compressed_tmp_buf,
&compressed_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s",
ctx->delivery_stream);
flb_aws_aggregation_reset(&buf->agg_buf);
return 0;
}

/* Ensure event_buf is large enough */
if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) {
flb_free(buf->event_buf);
buf->event_buf = compressed_tmp_buf;
buf->event_buf_size = compressed_size;
compressed_tmp_buf = NULL;
} else {
memcpy(buf->event_buf, compressed_tmp_buf, compressed_size);
flb_free(compressed_tmp_buf);
}
agg_size = compressed_size;
}
else {
/* Base64 encode the aggregated record */
size_t size = (agg_size * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
}
}

ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
if (ret != 0) {
flb_errno();
return -1;
}
agg_size = b64_len;
}

/* Copy to tmp_buf for sending */
if (buf->tmp_buf_size < agg_size) {
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
flb_aws_aggregation_reset(&buf->agg_buf);
return 0;
}

memcpy(buf->tmp_buf, buf->event_buf, agg_size);

/* Create event record */
event = &buf->events[0];
event->json = buf->tmp_buf;
event->len = agg_size;
event->timestamp.tv_sec = 0;
event->timestamp.tv_nsec = 0;
buf->event_index = 1;

/* Calculate data_size for the payload */
buf->data_size = PUT_RECORD_BATCH_HEADER_LEN + PUT_RECORD_BATCH_FOOTER_LEN;
buf->data_size += strlen(ctx->delivery_stream);
buf->data_size += agg_size + PUT_RECORD_BATCH_PER_RECORD_LEN;

/* Send the aggregated record */
ret = send_log_events(ctx, buf);

/* Reset aggregation buffer */
flb_aws_aggregation_reset(&buf->agg_buf);

return ret;
}

/* constructs a put payload, and then sends */
static int send_log_events(struct flb_firehose *ctx, struct flush *buf) {
int ret;
Expand Down Expand Up @@ -445,6 +568,46 @@ static int add_event(struct flb_firehose *ctx, struct flush *buf,
reset_flush_buf(ctx, buf);
}

/* Use simple aggregation if enabled */
if (ctx->simple_aggregation) {
retry_add_event_agg:
retry_add = FLB_FALSE;
ret = process_event_simple_aggregation(ctx, buf, obj, tms, config);
if (ret < 0) {
return -1;
}
else if (ret == 1) {
/* Buffer full - check if buffer was empty before sending (record too large) */
if (buf->agg_buf.agg_buf_offset == 0) {
flb_plg_warn(ctx->ins, "Discarding unprocessable record (too large for aggregation buffer), %s",
ctx->delivery_stream);
reset_flush_buf(ctx, buf);
return 0;
}

/* Send aggregated record and retry */
ret = send_aggregated_record(ctx, buf);
reset_flush_buf(ctx, buf);
if (ret < 0) {
return -1;
}
retry_add = FLB_TRUE;
}
else if (ret == 2) {
/* Discard this record */
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
ctx->delivery_stream);
return 0;
}

if (retry_add == FLB_TRUE) {
goto retry_add_event_agg;
}

return 0;
}

/* Normal processing without aggregation */
retry_add_event:
retry_add = FLB_FALSE;
ret = process_event(ctx, buf, obj, tms, config);
Expand Down Expand Up @@ -603,7 +766,13 @@ int process_and_send_records(struct flb_firehose *ctx, struct flush *buf,
flb_log_event_decoder_destroy(&log_decoder);

/* send any remaining events */
ret = send_log_events(ctx, buf);
if (ctx->simple_aggregation) {
/* Send any remaining aggregated data */
ret = send_aggregated_record(ctx, buf);
}
else {
ret = send_log_events(ctx, buf);
}
reset_flush_buf(ctx, buf);

if (ret < 0) {
Expand Down Expand Up @@ -953,6 +1122,9 @@ int put_record_batch(struct flb_firehose *ctx, struct flush *buf,
void flush_destroy(struct flush *buf)
{
if (buf) {
if (buf->agg_buf_initialized) {
flb_aws_aggregation_destroy(&buf->agg_buf);
}
flb_free(buf->tmp_buf);
flb_free(buf->out_buf);
flb_free(buf->events);
Expand Down
Loading
Loading