Skip to content

Commit 5c373e7

Browse files
committed
out_kinesis_streams: Add simple_aggregation operation
Signed-off-by: Shelby Hagman <[email protected]>
1 parent 5daf324 commit 5c373e7

File tree

3 files changed

+168
-13
lines changed

3 files changed

+168
-13
lines changed

plugins/out_kinesis_streams/kinesis.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,10 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
308308
return -1;
309309
}
310310

311-
static struct flush *new_flush_buffer(const char *tag, int tag_len)
311+
static struct flush *new_flush_buffer(struct flb_kinesis *ctx, const char *tag, int tag_len)
312312
{
313313
struct flush *buf;
314-
314+
int ret;
315315

316316
buf = flb_calloc(1, sizeof(struct flush));
317317
if (!buf) {
@@ -338,6 +338,18 @@ static struct flush *new_flush_buffer(const char *tag, int tag_len)
338338
buf->tag = tag;
339339
buf->tag_len = tag_len;
340340

341+
/* Initialize aggregation buffer if simple_aggregation is enabled */
342+
buf->agg_buf_initialized = FLB_FALSE;
343+
if (ctx->simple_aggregation) {
344+
ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE);
345+
if (ret < 0) {
346+
flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer");
347+
kinesis_flush_destroy(buf);
348+
return NULL;
349+
}
350+
buf->agg_buf_initialized = FLB_TRUE;
351+
}
352+
341353
return buf;
342354
}
343355

@@ -353,7 +365,7 @@ static void cb_kinesis_flush(struct flb_event_chunk *event_chunk,
353365
(void) i_ins;
354366
(void) config;
355367

356-
buf = new_flush_buffer(event_chunk->tag, flb_sds_len(event_chunk->tag));
368+
buf = new_flush_buffer(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag));
357369
if (!buf) {
358370
flb_plg_error(ctx->ins, "Failed to construct flush buffer");
359371
FLB_OUTPUT_RETURN(FLB_RETRY);
@@ -503,6 +515,13 @@ static struct flb_config_map config_map[] = {
503515
"$HOME/.aws/ directory."
504516
},
505517

518+
{
519+
FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false",
520+
0, FLB_TRUE, offsetof(struct flb_kinesis, simple_aggregation),
521+
"Enable simple aggregation to combine multiple records into single API calls. "
522+
"This reduces the number of requests and can improve throughput."
523+
},
524+
506525
/* EOF */
507526
{0}
508527
};

plugins/out_kinesis_streams/kinesis.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <fluent-bit/flb_http_client.h>
2727
#include <fluent-bit/flb_aws_util.h>
2828
#include <fluent-bit/flb_signv4.h>
29+
#include <fluent-bit/aws/flb_aws_aggregation.h>
2930

3031
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3132

@@ -56,6 +57,10 @@ struct flush {
5657
char *event_buf;
5758
size_t event_buf_size;
5859

60+
/* aggregation buffer for simple_aggregation mode */
61+
struct flb_aws_agg_buffer agg_buf;
62+
int agg_buf_initialized;
63+
5964
int records_sent;
6065
int records_processed;
6166

@@ -92,6 +97,7 @@ struct flb_kinesis {
9297
const char *log_key;
9398
const char *external_id;
9499
int retry_requests;
100+
int simple_aggregation;
95101
char *sts_endpoint;
96102
int custom_endpoint;
97103
uint16_t port;

plugins/out_kinesis_streams/kinesis_api.c

Lines changed: 140 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include <fluent-bit/flb_http_client.h>
3737
#include <fluent-bit/flb_utils.h>
3838
#include <fluent-bit/flb_base64.h>
39+
#include <fluent-bit/aws/flb_aws_aggregation.h>
3940

4041
#include <monkey/mk_core.h>
4142
#include <msgpack.h>
@@ -51,6 +52,9 @@
5152

5253
#define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException"
5354

55+
/* Forward declarations */
56+
static int send_log_events(struct flb_kinesis *ctx, struct flush *buf);
57+
5458
static struct flb_aws_header put_records_target_header = {
5559
.key = "X-Amz-Target",
5660
.key_len = 12,
@@ -202,6 +206,29 @@ static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf,
202206
}
203207

204208

209+
/*
210+
* Process event with simple aggregation (Kinesis Streams version)
211+
* Uses shared aggregation implementation
212+
*/
213+
static int process_event_simple_aggregation(struct flb_kinesis *ctx, struct flush *buf,
214+
const msgpack_object *obj, struct flb_time *tms,
215+
struct flb_config *config)
216+
{
217+
return flb_aws_aggregation_process_event(&buf->agg_buf,
218+
buf->tmp_buf,
219+
buf->tmp_buf_size,
220+
&buf->tmp_buf_offset,
221+
obj,
222+
tms,
223+
config,
224+
ctx->ins,
225+
ctx->stream_name,
226+
ctx->log_key,
227+
ctx->time_key,
228+
ctx->time_key_format,
229+
MAX_EVENT_SIZE);
230+
}
231+
205232
/*
206233
* Processes the msgpack object
207234
* -1 = failure, record not added
@@ -391,6 +418,70 @@ static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) {
391418
buf->data_size += strlen(ctx->stream_name);
392419
}
393420

421+
/* Finalize and send aggregated record (Kinesis Streams version - no final newline) */
422+
static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) {
423+
int ret;
424+
size_t agg_size;
425+
size_t b64_len;
426+
struct kinesis_event *event;
427+
428+
/* Finalize without final newline (Kinesis Streams doesn't need it) */
429+
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 0, &agg_size);
430+
if (ret < 0) {
431+
return 0;
432+
}
433+
434+
/* Base64 encode the aggregated record */
435+
size_t size = (agg_size * 1.5) + 4;
436+
if (buf->event_buf == NULL || buf->event_buf_size < size) {
437+
flb_free(buf->event_buf);
438+
buf->event_buf = flb_malloc(size);
439+
buf->event_buf_size = size;
440+
if (buf->event_buf == NULL) {
441+
flb_errno();
442+
return -1;
443+
}
444+
}
445+
446+
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
447+
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
448+
if (ret != 0) {
449+
flb_errno();
450+
return -1;
451+
}
452+
agg_size = b64_len;
453+
454+
/* Copy to tmp_buf */
455+
if (buf->tmp_buf_size < agg_size) {
456+
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
457+
flb_aws_aggregation_reset(&buf->agg_buf);
458+
return 0;
459+
}
460+
461+
memcpy(buf->tmp_buf, buf->event_buf, agg_size);
462+
463+
/* Create event record */
464+
event = &buf->events[0];
465+
event->json = buf->tmp_buf;
466+
event->len = agg_size;
467+
event->timestamp.tv_sec = 0;
468+
event->timestamp.tv_nsec = 0;
469+
buf->event_index = 1;
470+
471+
/* Calculate data_size for the payload */
472+
buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN;
473+
buf->data_size += strlen(ctx->stream_name);
474+
buf->data_size += agg_size + PUT_RECORDS_PER_RECORD_LEN;
475+
476+
/* Send the aggregated record */
477+
ret = send_log_events(ctx, buf);
478+
479+
/* Reset aggregation buffer */
480+
flb_aws_aggregation_reset(&buf->agg_buf);
481+
482+
return ret;
483+
}
484+
394485
/* constructs a put payload, and then sends */
395486
static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) {
396487
int ret;
@@ -476,10 +567,47 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
476567
size_t event_bytes = 0;
477568

478569
if (buf->event_index == 0) {
479-
/* init */
480570
reset_flush_buf(ctx, buf);
481571
}
482572

573+
/* Use simple aggregation if enabled */
574+
if (ctx->simple_aggregation) {
575+
retry_add_event_agg:
576+
retry_add = FLB_FALSE;
577+
ret = process_event_simple_aggregation(ctx, buf, obj, tms, config);
578+
if (ret < 0) {
579+
return -1;
580+
}
581+
else if (ret == 1) {
582+
/* Buffer full - send aggregated record and retry */
583+
ret = send_aggregated_record(ctx, buf);
584+
if (ret == 0 && buf->agg_buf.agg_buf_offset == 0) {
585+
/* No data was aggregated - discard this record to avoid infinite loop */
586+
flb_plg_warn(ctx->ins, "Discarding unprocessable record (aggregation buffer empty), %s",
587+
ctx->stream_name);
588+
reset_flush_buf(ctx, buf);
589+
return 0;
590+
}
591+
reset_flush_buf(ctx, buf);
592+
if (ret < 0) {
593+
return -1;
594+
}
595+
retry_add = FLB_TRUE;
596+
}
597+
else if (ret == 2) {
598+
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
599+
ctx->stream_name);
600+
return 0;
601+
}
602+
603+
if (retry_add == FLB_TRUE) {
604+
goto retry_add_event_agg;
605+
}
606+
607+
return 0;
608+
}
609+
610+
/* Normal processing without aggregation */
483611
retry_add_event:
484612
retry_add = FLB_FALSE;
485613
ret = process_event(ctx, buf, obj, tms, config);
@@ -488,16 +616,13 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
488616
}
489617
else if (ret == 1) {
490618
if (buf->event_index <= 0) {
491-
/* somehow the record was larger than our entire request buffer */
492619
flb_plg_warn(ctx->ins, "Discarding massive log record, %s",
493620
ctx->stream_name);
494-
return 0; /* discard this record and return to caller */
621+
return 0;
495622
}
496-
/* send logs and then retry the add */
497623
retry_add = FLB_TRUE;
498624
goto send;
499625
} else if (ret == 2) {
500-
/* discard this record and return to caller */
501626
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
502627
ctx->stream_name);
503628
return 0;
@@ -508,17 +633,14 @@ static int add_event(struct flb_kinesis *ctx, struct flush *buf,
508633

509634
if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) {
510635
if (buf->event_index <= 0) {
511-
/* somehow the record was larger than our entire request buffer */
512636
flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s",
513637
event_bytes, ctx->stream_name);
514-
return 0; /* discard this record and return to caller */
638+
return 0;
515639
}
516-
/* do not send this event */
517640
retry_add = FLB_TRUE;
518641
goto send;
519642
}
520643

521-
/* send is not needed yet, return to caller */
522644
buf->data_size += event_bytes;
523645
buf->event_index++;
524646

@@ -633,7 +755,12 @@ int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf,
633755
flb_log_event_decoder_destroy(&log_decoder);
634756

635757
/* send any remaining events */
636-
ret = send_log_events(ctx, buf);
758+
if (ctx->simple_aggregation) {
759+
ret = send_aggregated_record(ctx, buf);
760+
}
761+
else {
762+
ret = send_log_events(ctx, buf);
763+
}
637764
reset_flush_buf(ctx, buf);
638765
if (ret < 0) {
639766
return -1;
@@ -981,6 +1108,9 @@ int put_records(struct flb_kinesis *ctx, struct flush *buf,
9811108
void kinesis_flush_destroy(struct flush *buf)
9821109
{
9831110
if (buf) {
1111+
if (buf->agg_buf_initialized) {
1112+
flb_aws_aggregation_destroy(&buf->agg_buf);
1113+
}
9841114
flb_free(buf->tmp_buf);
9851115
flb_free(buf->out_buf);
9861116
flb_free(buf->events);

0 commit comments

Comments
 (0)