Skip to content

Commit 158fb26

Browse files
committed
Add simple_aggregation to kinesis/firehose plugins
- Bring over simple_aggregation from golang plugin versions - Add unit/runtime tests
1 parent c06c124 commit 158fb26

File tree

13 files changed

+1717
-17
lines changed

13 files changed

+1717
-17
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2024 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLB_AWS_AGGREGATION_H
21+
#define FLB_AWS_AGGREGATION_H
22+
23+
#include <fluent-bit/flb_output_plugin.h>
24+
#include <fluent-bit/flb_time.h>
25+
26+
/* Aggregation buffer structure */
27+
struct flb_aws_agg_buffer {
28+
char *agg_buf; /* aggregated records buffer */
29+
size_t agg_buf_size; /* total size of aggregation buffer */
30+
size_t agg_buf_offset; /* current offset in aggregation buffer */
31+
};
32+
33+
/* Initialize aggregation buffer
34+
* Returns:
35+
* 0 = success
36+
* -1 = error
37+
*/
38+
int flb_aws_aggregation_init(struct flb_aws_agg_buffer *buf, size_t max_record_size);
39+
40+
/* Destroy aggregation buffer */
41+
void flb_aws_aggregation_destroy(struct flb_aws_agg_buffer *buf);
42+
43+
/* Try to add event data to aggregation buffer
44+
* Returns:
45+
* 0 = success, event added to aggregation buffer
46+
* 1 = buffer full, caller should finalize and retry
47+
*/
48+
int flb_aws_aggregation_add(struct flb_aws_agg_buffer *buf,
49+
const char *data, size_t data_len,
50+
size_t max_record_size);
51+
52+
/* Finalize aggregated record
53+
* Returns:
54+
* 0 = success
55+
* -1 = error (no data to finalize)
56+
*
57+
* Output is written to buf->agg_buf and the size is returned via out_size parameter
58+
*/
59+
int flb_aws_aggregation_finalize(struct flb_aws_agg_buffer *buf,
60+
int add_final_newline,
61+
size_t *out_size);
62+
63+
/* Reset aggregation buffer for reuse */
64+
void flb_aws_aggregation_reset(struct flb_aws_agg_buffer *buf);
65+
66+
/* Process event with simple aggregation
67+
* Converts msgpack to JSON, optionally adds log_key and time_key,
68+
* then adds to aggregation buffer
69+
*
70+
* Returns:
71+
* -1 = failure, record not added
72+
* 0 = success, record added
73+
* 1 = buffer full, caller should finalize and retry
74+
* 2 = record could not be processed, discard it
75+
*/
76+
int flb_aws_aggregation_process_event(struct flb_aws_agg_buffer *agg_buf,
77+
char *tmp_buf,
78+
size_t tmp_buf_size,
79+
size_t *tmp_buf_offset,
80+
const msgpack_object *obj,
81+
struct flb_time *tms,
82+
struct flb_config *config,
83+
struct flb_output_instance *ins,
84+
const char *stream_name,
85+
const char *log_key,
86+
const char *time_key,
87+
const char *time_key_format,
88+
size_t max_event_size);
89+
90+
#endif

plugins/out_kinesis_firehose/firehose.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,10 @@ static int cb_firehose_init(struct flb_output_instance *ins,
314314
return -1;
315315
}
316316

317-
struct flush *new_flush_buffer()
317+
struct flush *new_flush_buffer(struct flb_firehose *ctx)
318318
{
319319
struct flush *buf;
320-
320+
int ret;
321321

322322
buf = flb_calloc(1, sizeof(struct flush));
323323
if (!buf) {
@@ -341,6 +341,18 @@ struct flush *new_flush_buffer()
341341
}
342342
buf->events_capacity = MAX_EVENTS_PER_PUT;
343343

344+
/* Initialize aggregation buffer if simple_aggregation is enabled */
345+
buf->agg_buf_initialized = FLB_FALSE;
346+
if (ctx->simple_aggregation) {
347+
ret = flb_aws_aggregation_init(&buf->agg_buf, MAX_EVENT_SIZE);
348+
if (ret < 0) {
349+
flb_plg_error(ctx->ins, "Failed to initialize aggregation buffer");
350+
flush_destroy(buf);
351+
return NULL;
352+
}
353+
buf->agg_buf_initialized = FLB_TRUE;
354+
}
355+
344356
return buf;
345357
}
346358

@@ -356,7 +368,7 @@ static void cb_firehose_flush(struct flb_event_chunk *event_chunk,
356368
(void) i_ins;
357369
(void) config;
358370

359-
buf = new_flush_buffer();
371+
buf = new_flush_buffer(ctx);
360372
if (!buf) {
361373
flb_plg_error(ctx->ins, "Failed to construct flush buffer");
362374
FLB_OUTPUT_RETURN(FLB_RETRY);
@@ -508,6 +520,13 @@ static struct flb_config_map config_map[] = {
508520
"AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
509521
"$HOME/.aws/ directory."
510522
},
523+
524+
{
525+
FLB_CONFIG_MAP_BOOL, "simple_aggregation", "false",
526+
0, FLB_TRUE, offsetof(struct flb_firehose, simple_aggregation),
527+
"Enable simple aggregation to combine multiple records into single API calls. "
528+
"This reduces the number of requests and can improve throughput."
529+
},
511530
/* EOF */
512531
{0}
513532
};

plugins/out_kinesis_firehose/firehose.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <fluent-bit/flb_http_client.h>
2727
#include <fluent-bit/flb_aws_util.h>
2828
#include <fluent-bit/flb_signv4.h>
29+
#include <fluent-bit/aws/flb_aws_aggregation.h>
2930

3031
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3132

@@ -56,6 +57,10 @@ struct flush {
5657
char *event_buf;
5758
size_t event_buf_size;
5859

60+
/* aggregation buffer for simple_aggregation mode */
61+
struct flb_aws_agg_buffer agg_buf;
62+
int agg_buf_initialized;
63+
5964
int records_sent;
6065
int records_processed;
6166
};
@@ -94,6 +99,7 @@ struct flb_firehose {
9499
int custom_endpoint;
95100
int retry_requests;
96101
int compression;
102+
int simple_aggregation;
97103

98104
/* must be freed on shutdown if custom_endpoint is not set */
99105
char *endpoint;

plugins/out_kinesis_firehose/firehose_api.c

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
#include <fluent-bit/flb_base64.h>
4040
#include <fluent-bit/aws/flb_aws_compress.h>
41+
#include <fluent-bit/aws/flb_aws_aggregation.h>
4142

4243
#include <monkey/mk_core.h>
4344
#include <msgpack.h>
@@ -53,6 +54,9 @@
5354

5455
#define ERR_CODE_SERVICE_UNAVAILABLE "ServiceUnavailableException"
5556

57+
/* Forward declarations */
58+
static int send_log_events(struct flb_firehose *ctx, struct flush *buf);
59+
5660
static struct flb_aws_header put_record_batch_header = {
5761
.key = "X-Amz-Target",
5862
.key_len = 12,
@@ -141,6 +145,29 @@ static int end_put_payload(struct flb_firehose *ctx, struct flush *buf,
141145
}
142146

143147

148+
/*
149+
* Process event with simple aggregation (Firehose version)
150+
* Uses shared aggregation implementation
151+
*/
152+
static int process_event_simple_aggregation(struct flb_firehose *ctx, struct flush *buf,
153+
const msgpack_object *obj, struct flb_time *tms,
154+
struct flb_config *config)
155+
{
156+
return flb_aws_aggregation_process_event(&buf->agg_buf,
157+
buf->tmp_buf,
158+
buf->tmp_buf_size,
159+
&buf->tmp_buf_offset,
160+
obj,
161+
tms,
162+
config,
163+
ctx->ins,
164+
ctx->delivery_stream,
165+
ctx->log_key,
166+
ctx->time_key,
167+
ctx->time_key_format,
168+
MAX_EVENT_SIZE);
169+
}
170+
144171
/*
145172
* Processes the msgpack object
146173
* -1 = failure, record not added
@@ -356,6 +383,102 @@ static void reset_flush_buf(struct flb_firehose *ctx, struct flush *buf) {
356383
buf->data_size += strlen(ctx->delivery_stream);
357384
}
358385

386+
/* Finalize and send aggregated record */
387+
static int send_aggregated_record(struct flb_firehose *ctx, struct flush *buf) {
388+
int ret;
389+
size_t agg_size;
390+
size_t b64_len;
391+
struct firehose_event *event;
392+
void *compressed_tmp_buf;
393+
size_t compressed_size;
394+
395+
/* Finalize the aggregated record */
396+
ret = flb_aws_aggregation_finalize(&buf->agg_buf, 1, &agg_size);
397+
if (ret < 0) {
398+
/* No data to finalize */
399+
return 0;
400+
}
401+
402+
/* Handle compression if enabled */
403+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
404+
ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
405+
MAX_B64_EVENT_SIZE,
406+
buf->agg_buf.agg_buf,
407+
agg_size,
408+
&compressed_tmp_buf,
409+
&compressed_size);
410+
if (ret == -1) {
411+
flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s",
412+
ctx->delivery_stream);
413+
flb_aws_aggregation_reset(&buf->agg_buf);
414+
return 0;
415+
}
416+
417+
/* Ensure event_buf is large enough */
418+
if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) {
419+
flb_free(buf->event_buf);
420+
buf->event_buf = compressed_tmp_buf;
421+
buf->event_buf_size = compressed_size;
422+
compressed_tmp_buf = NULL;
423+
} else {
424+
memcpy(buf->event_buf, compressed_tmp_buf, compressed_size);
425+
flb_free(compressed_tmp_buf);
426+
}
427+
agg_size = compressed_size;
428+
}
429+
else {
430+
/* Base64 encode the aggregated record */
431+
size_t size = (agg_size * 1.5) + 4;
432+
if (buf->event_buf == NULL || buf->event_buf_size < size) {
433+
flb_free(buf->event_buf);
434+
buf->event_buf = flb_malloc(size);
435+
buf->event_buf_size = size;
436+
if (buf->event_buf == NULL) {
437+
flb_errno();
438+
return -1;
439+
}
440+
}
441+
442+
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
443+
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
444+
if (ret != 0) {
445+
flb_errno();
446+
return -1;
447+
}
448+
agg_size = b64_len;
449+
}
450+
451+
/* Copy to tmp_buf for sending */
452+
if (buf->tmp_buf_size < agg_size) {
453+
flb_plg_error(ctx->ins, "Aggregated record too large for buffer");
454+
flb_aws_aggregation_reset(&buf->agg_buf);
455+
return 0;
456+
}
457+
458+
memcpy(buf->tmp_buf, buf->event_buf, agg_size);
459+
460+
/* Create event record */
461+
event = &buf->events[0];
462+
event->json = buf->tmp_buf;
463+
event->len = agg_size;
464+
event->timestamp.tv_sec = 0;
465+
event->timestamp.tv_nsec = 0;
466+
buf->event_index = 1;
467+
468+
/* Calculate data_size for the payload */
469+
buf->data_size = PUT_RECORD_BATCH_HEADER_LEN + PUT_RECORD_BATCH_FOOTER_LEN;
470+
buf->data_size += strlen(ctx->delivery_stream);
471+
buf->data_size += agg_size + PUT_RECORD_BATCH_PER_RECORD_LEN;
472+
473+
/* Send the aggregated record */
474+
ret = send_log_events(ctx, buf);
475+
476+
/* Reset aggregation buffer */
477+
flb_aws_aggregation_reset(&buf->agg_buf);
478+
479+
return ret;
480+
}
481+
359482
/* constructs a put payload, and then sends */
360483
static int send_log_events(struct flb_firehose *ctx, struct flush *buf) {
361484
int ret;
@@ -445,6 +568,38 @@ static int add_event(struct flb_firehose *ctx, struct flush *buf,
445568
reset_flush_buf(ctx, buf);
446569
}
447570

571+
/* Use simple aggregation if enabled */
572+
if (ctx->simple_aggregation) {
573+
retry_add_event_agg:
574+
retry_add = FLB_FALSE;
575+
ret = process_event_simple_aggregation(ctx, buf, obj, tms, config);
576+
if (ret < 0) {
577+
return -1;
578+
}
579+
else if (ret == 1) {
580+
/* Buffer full - send aggregated record and retry */
581+
ret = send_aggregated_record(ctx, buf);
582+
reset_flush_buf(ctx, buf);
583+
if (ret < 0) {
584+
return -1;
585+
}
586+
retry_add = FLB_TRUE;
587+
}
588+
else if (ret == 2) {
589+
/* Discard this record */
590+
flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s",
591+
ctx->delivery_stream);
592+
return 0;
593+
}
594+
595+
if (retry_add == FLB_TRUE) {
596+
goto retry_add_event_agg;
597+
}
598+
599+
return 0;
600+
}
601+
602+
/* Normal processing without aggregation */
448603
retry_add_event:
449604
retry_add = FLB_FALSE;
450605
ret = process_event(ctx, buf, obj, tms, config);
@@ -603,7 +758,13 @@ int process_and_send_records(struct flb_firehose *ctx, struct flush *buf,
603758
flb_log_event_decoder_destroy(&log_decoder);
604759

605760
/* send any remaining events */
606-
ret = send_log_events(ctx, buf);
761+
if (ctx->simple_aggregation) {
762+
/* Send any remaining aggregated data */
763+
ret = send_aggregated_record(ctx, buf);
764+
}
765+
else {
766+
ret = send_log_events(ctx, buf);
767+
}
607768
reset_flush_buf(ctx, buf);
608769

609770
if (ret < 0) {
@@ -953,6 +1114,9 @@ int put_record_batch(struct flb_firehose *ctx, struct flush *buf,
9531114
void flush_destroy(struct flush *buf)
9541115
{
9551116
if (buf) {
1117+
if (buf->agg_buf_initialized) {
1118+
flb_aws_aggregation_destroy(&buf->agg_buf);
1119+
}
9561120
flb_free(buf->tmp_buf);
9571121
flb_free(buf->out_buf);
9581122
flb_free(buf->events);

0 commit comments

Comments
 (0)