Skip to content

Commit 7fd60fe

Browse files
committed
out_kinesis_firehose: Add simple_aggregation operation
- Add simple_aggregation config parameter and implementation to plugin Signed-off-by: Shelby Hagman <[email protected]>
1 parent 093fb61 commit 7fd60fe

File tree

3 files changed

+201
-4
lines changed

3 files changed

+201
-4
lines changed

plugins/out_kinesis_firehose/firehose.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,10 @@ static int cb_firehose_init(struct flb_output_instance *ins,
314314
return -1;
315315
}
316316

317-
struct flush *new_flush_buffer()
317+
struct flush *new_flush_buffer(struct flb_firehose *ctx)
318318
{
319319
struct flush *buf;
320-
320+
int ret;
321321

322322
buf = flb_calloc(1, sizeof(struct flush));
323323
if (!buf) {
@@ -341,6 +341,18 @@ struct flush *new_flush_buffer()
341341
}
342342
buf->events_capacity = MAX_EVENTS_PER_PUT;
343343

344+
/* Initialize aggregation buffer if simple_aggregation is enabled */
345+
buf->agg_buf_initialized = FLB_FALSE;
346+
if (ctx->simple_aggregation) {
347+
ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE);
348+
if (ret < 0) {
349+
flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer");
350+
flush_destroy(buf);
351+
return NULL;
352+
}
353+
buf->agg_buf_initialized = FLB_TRUE;
354+
}
355+
344356
return buf;
345357
}
346358

@@ -356,7 +368,7 @@ static void cb_firehose_flush(struct flb_event_chunk *event_chunk,
356368
(void) i_ins;
357369
(void) config;
358370

359-
buf = new_flush_buffer();
371+
buf = new_flush_buffer(ctx);
360372
if (!buf) {
361373
flb_plg_error(ctx->ins, "Failed to construct flush buffer");
362374
FLB_OUTPUT_RETURN(FLB_RETRY);
@@ -508,6 +520,13 @@ static struct flb_config_map config_map[] = {
508520
"AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
509521
"$HOME/.aws/ directory."
510522
},
523+
524+
{
525+
FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false",
526+
0, FLB_TRUE, offsetof(struct flb_firehose, simple_aggregation),
527+
"Enable simple aggregation to combine multiple records into single API calls. "
528+
"This reduces the number of requests and can improve throughput."
529+
},
511530
/* EOF */
512531
{0}
513532
};

plugins/out_kinesis_firehose/firehose.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <fluent-bit/flb_http_client.h>
2727
#include <fluent-bit/flb_aws_util.h>
2828
#include <fluent-bit/flb_signv4.h>
29+
#include <fluent-bit/aws/flb_aws_aggregation.h>
2930

3031
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3132

@@ -56,6 +57,10 @@ struct flush {
5657
char *event_buf;
5758
size_t event_buf_size;
5859

60+
/* aggregation buffer for simple_aggregation mode */
61+
struct flb_aws_agg_buffer agg_buf;
62+
int agg_buf_initialized;
63+
5964
int records_sent;
6065
int records_processed;
6166
};
@@ -94,6 +99,7 @@ struct flb_firehose {
9499
int custom_endpoint;
95100
int retry_requests;
96101
int compression;
102+
int simple_aggregation;
97103

98104
/* must be freed on shutdown if custom_endpoint is not set */
99105
char *endpoint;

plugins/out_kinesis_firehose/firehose_api.c

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
#include <fluent-bit/flb_base64.h>
4040
#include <fluent-bit/aws/flb_aws_compress.h>
41+
#include <fluent-bit/aws/flb_aws_aggregation.h>
4142

4243
#include <monkey/mk_core.h>
4344
#include <msgpack.h>
@@ -53,6 +54,9 @@
5354

5455
#define ERR_CODE_SERVICE_UNAVAILABLE "ServiceUnavailableException"
5556

57+
/* Forward declarations */
58+
static int send_log_events(struct flb_firehose *ctx, struct flush *buf);
59+
5660
static struct flb_aws_header put_record_batch_header = {
5761
.key = "X-Amz-Target",
5862
.key_len = 12,
@@ -141,6 +145,29 @@ static int end_put_payload(struct flb_firehose *ctx, struct flush *buf,
141145
}
142146

143147

148+
/*
149+
* Process event with simple aggregation (Firehose version)
150+
* Uses shared aggregation implementation
151+
*/
152+
static int process_event_simple_aggregation(struct flb_firehose *ctx, struct flush *buf,
153+
const msgpack_object *obj, struct flb_time *tms,
154+
struct flb_config *config)
155+
{
156+
return flb_aws_aggregation_process_event(&buf->agg_buf,
157+
buf->tmp_buf,
158+
buf->tmp_buf_size,
159+
&buf->tmp_buf_offset,
160+
obj,
161+
tms,
162+
config,
163+
ctx->ins,
164+
ctx->delivery_stream,
165+
ctx->log_key,
166+
ctx->time_key,
167+
ctx->time_key_format,
168+
MAX_EVENT_SIZE);
169+
}
170+
144171
/*
145172
* Processes the msgpack object
146173
* -1 = failure, record not added
@@ -356,6 +383,102 @@ static void reset_flush_buf(struct flb_firehose *ctx, struct flush *buf) {
356383
buf->data_size += strlen(ctx->delivery_stream);
357384
}
358385

386+
/* Finalize and send aggregated record */
387+
static int send_aggregated_record(struct flb_firehose *ctx, struct flush *buf) {
388+
int ret;
389+
size_t agg_size;
390+
size_t b64_len;
391+
struct firehose_event *event;
392+
void *compressed_tmp_buf;
393+
size_t compressed_size;
394+
395+
/* Finalize the aggregated record */
396+
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 1, &agg_size);
397+
if (ret < 0) {
398+
/* No data to finalize */
399+
return 0;
400+
}
401+
402+
/* Handle compression if enabled */
403+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
404+
ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
405+
MAX_B64_EVENT_SIZE,
406+
buf->agg_buf.agg_buf,
407+
agg_size,
408+
&compressed_tmp_buf,
409+
&compressed_size);
410+
if (ret == -1) {
411+
flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s",
412+
ctx->delivery_stream);
413+
flb_aws_aggregation_reset(&buf->agg_buf);
414+
return 0;
415+
}
416+
417+
/* Ensure event_buf is large enough */
418+
if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) {
419+
flb_free(buf->event_buf);
420+
buf->event_buf = compressed_tmp_buf;
421+
buf->event_buf_size = compressed_size;
422+
compressed_tmp_buf = NULL;
423+
} else {
424+
memcpy(buf->event_buf, compressed_tmp_buf, compressed_size);
425+
flb_free(compressed_tmp_buf);
426+
}
427+
agg_size = compressed_size;
428+
}
429+
else {
430+
/* Base64 encode the aggregated record */
431+
size_t size = (agg_size * 1.5) + 4;
432+
if (buf->event_buf == NULL || buf->event_buf_size < size) {
433+
flb_free(buf->event_buf);
434+
buf->event_buf = flb_malloc(size);
435+
buf->event_buf_size = size;
436+
if (buf->event_buf == NULL) {
437+
flb_errno();
438+
return -1;
439+
}
440+
}
441+
442+
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
443+
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
444+
if (ret != 0) {
445+
flb_errno();
446+
return -1;
447+
}
448+
agg_size = b64_len;
449+
}
450+
451+
/* Copy to tmp_buf for sending */
452+
if (buf->tmp_buf_size < agg_size) {
453+
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
454+
flb_aws_aggregation_reset(&buf->agg_buf);
455+
return 0;
456+
}
457+
458+
memcpy(buf->tmp_buf, buf->event_buf, agg_size);
459+
460+
/* Create event record */
461+
event = &buf->events[0];
462+
event->json = buf->tmp_buf;
463+
event->len = agg_size;
464+
event->timestamp.tv_sec = 0;
465+
event->timestamp.tv_nsec = 0;
466+
buf->event_index = 1;
467+
468+
/* Calculate data_size for the payload */
469+
buf->data_size = PUT_RECORD_BATCH_HEADER_LEN + PUT_RECORD_BATCH_FOOTER_LEN;
470+
buf->data_size += strlen(ctx->delivery_stream);
471+
buf->data_size += agg_size + PUT_RECORD_BATCH_PER_RECORD_LEN;
472+
473+
/* Send the aggregated record */
474+
ret = send_log_events(ctx, buf);
475+
476+
/* Reset aggregation buffer */
477+
flb_aws_aggregation_reset(&buf->agg_buf);
478+
479+
return ret;
480+
}
481+
359482
/* constructs a put payload, and then sends */
360483
static int send_log_events(struct flb_firehose *ctx, struct flush *buf) {
361484
int ret;
@@ -445,6 +568,46 @@ static int add_event(struct flb_firehose *ctx, struct flush *buf,
445568
reset_flush_buf(ctx, buf);
446569
}
447570

571+
/* Use simple aggregation if enabled */
572+
if (ctx->simple_aggregation) {
573+
retry_add_event_agg:
574+
retry_add = FLB_FALSE;
575+
ret = process_event_simple_aggregation(ctx, buf, obj, tms, config);
576+
if (ret < 0) {
577+
return -1;
578+
}
579+
else if (ret == 1) {
580+
/* Buffer full - check if buffer was empty before sending (record too large) */
581+
if (buf->agg_buf.agg_buf_offset == 0) {
582+
flb_plg_warn(ctx->ins, "Discarding unprocessable record (too large for aggregation buffer), %s",
583+
ctx->delivery_stream);
584+
reset_flush_buf(ctx, buf);
585+
return 0;
586+
}
587+
588+
/* Send aggregated record and retry */
589+
ret = send_aggregated_record(ctx, buf);
590+
reset_flush_buf(ctx, buf);
591+
if (ret < 0) {
592+
return -1;
593+
}
594+
retry_add = FLB_TRUE;
595+
}
596+
else if (ret == 2) {
597+
/* Discard this record */
598+
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
599+
ctx->delivery_stream);
600+
return 0;
601+
}
602+
603+
if (retry_add == FLB_TRUE) {
604+
goto retry_add_event_agg;
605+
}
606+
607+
return 0;
608+
}
609+
610+
/* Normal processing without aggregation */
448611
retry_add_event:
449612
retry_add = FLB_FALSE;
450613
ret = process_event(ctx, buf, obj, tms, config);
@@ -603,7 +766,13 @@ int process_and_send_records(struct flb_firehose *ctx, struct flush *buf,
603766
flb_log_event_decoder_destroy(&log_decoder);
604767

605768
/* send any remaining events */
606-
ret = send_log_events(ctx, buf);
769+
if (ctx->simple_aggregation) {
770+
/* Send any remaining aggregated data */
771+
ret = send_aggregated_record(ctx, buf);
772+
}
773+
else {
774+
ret = send_log_events(ctx, buf);
775+
}
607776
reset_flush_buf(ctx, buf);
608777

609778
if (ret < 0) {
@@ -953,6 +1122,9 @@ int put_record_batch(struct flb_firehose *ctx, struct flush *buf,
9531122
void flush_destroy(struct flush *buf)
9541123
{
9551124
if (buf) {
1125+
if (buf->agg_buf_initialized) {
1126+
flb_aws_aggregation_destroy(&buf->agg_buf);
1127+
}
9561128
flb_free(buf->tmp_buf);
9571129
flb_free(buf->out_buf);
9581130
flb_free(buf->events);

0 commit comments

Comments
 (0)