@@ -189,12 +189,356 @@ void flb_test_firehose_nonsense_error(void)
189189}
190190
191191
192+ void flb_test_firehose_simple_aggregation (void )
193+ {
194+ int ret ;
195+ flb_ctx_t * ctx ;
196+ int in_ffd ;
197+ int out_ffd ;
198+
199+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
200+
201+ ctx = flb_create ();
202+
203+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
204+ TEST_CHECK (in_ffd >= 0 );
205+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
206+
207+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
208+ TEST_CHECK (out_ffd >= 0 );
209+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
210+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
211+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
212+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
213+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
214+
215+ ret = flb_start (ctx );
216+ TEST_CHECK (ret == 0 );
217+
218+ /* Push multiple small records */
219+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test1\"}]" , 25 );
220+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test2\"}]" , 25 );
221+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test3\"}]" , 25 );
222+
223+ sleep (2 );
224+ flb_stop (ctx );
225+ flb_destroy (ctx );
226+ }
227+
228+ void flb_test_firehose_aggregation_with_time_key (void )
229+ {
230+ int ret ;
231+ flb_ctx_t * ctx ;
232+ int in_ffd ;
233+ int out_ffd ;
234+
235+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
236+
237+ ctx = flb_create ();
238+
239+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
240+ TEST_CHECK (in_ffd >= 0 );
241+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
242+
243+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
244+ TEST_CHECK (out_ffd >= 0 );
245+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
246+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
247+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
248+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
249+ flb_output_set (ctx , out_ffd , "time_key" , "timestamp" , NULL );
250+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
251+
252+ ret = flb_start (ctx );
253+ TEST_CHECK (ret == 0 );
254+
255+ /* Push records with time_key enabled */
256+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"with_time1\"}]" , 30 );
257+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"with_time2\"}]" , 30 );
258+
259+ sleep (2 );
260+ flb_stop (ctx );
261+ flb_destroy (ctx );
262+ }
263+
264+ void flb_test_firehose_aggregation_with_log_key (void )
265+ {
266+ int ret ;
267+ flb_ctx_t * ctx ;
268+ int in_ffd ;
269+ int out_ffd ;
270+ const char * record = "[1, {\"message\":\"with_log_key\"}]" ;
271+
272+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
273+
274+ ctx = flb_create ();
275+
276+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
277+ TEST_CHECK (in_ffd >= 0 );
278+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
279+
280+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
281+ TEST_CHECK (out_ffd >= 0 );
282+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
283+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
284+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
285+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
286+ flb_output_set (ctx , out_ffd , "log_key" , "log" , NULL );
287+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
288+
289+ ret = flb_start (ctx );
290+ TEST_CHECK (ret == 0 );
291+
292+ /* Push records with log_key enabled */
293+ flb_lib_push (ctx , in_ffd , (char * ) record , strlen (record ));
294+
295+ sleep (2 );
296+ flb_stop (ctx );
297+ flb_destroy (ctx );
298+ }
299+
300+ void flb_test_firehose_aggregation_many_records (void )
301+ {
302+ int ret ;
303+ flb_ctx_t * ctx ;
304+ int in_ffd ;
305+ int out_ffd ;
306+ int i ;
307+ char record [100 ];
308+
309+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
310+
311+ ctx = flb_create ();
312+
313+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
314+ TEST_CHECK (in_ffd >= 0 );
315+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
316+
317+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
318+ TEST_CHECK (out_ffd >= 0 );
319+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
320+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
321+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
322+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
323+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
324+
325+ ret = flb_start (ctx );
326+ TEST_CHECK (ret == 0 );
327+
328+ /* Push many small records to test aggregation efficiency */
329+ for (i = 0 ; i < 50 ; i ++ ) {
330+ ret = snprintf (record , sizeof (record ), "[1, {\"id\":%d,\"msg\":\"test\"}]" , i );
331+ TEST_CHECK (ret < sizeof (record ));
332+ flb_lib_push (ctx , in_ffd , record , strlen (record ));
333+ }
334+
335+ sleep (3 );
336+ flb_stop (ctx );
337+ flb_destroy (ctx );
338+ }
339+
340+ void flb_test_firehose_aggregation_with_compression (void )
341+ {
342+ int ret ;
343+ flb_ctx_t * ctx ;
344+ int in_ffd ;
345+ int out_ffd ;
346+ const char * record1 = "[1, {\"message\":\"compressed1\"}]" ;
347+ const char * record2 = "[1, {\"message\":\"compressed2\"}]" ;
348+
349+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
350+
351+ ctx = flb_create ();
352+
353+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
354+ TEST_CHECK (in_ffd >= 0 );
355+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
356+
357+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
358+ TEST_CHECK (out_ffd >= 0 );
359+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
360+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
361+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
362+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
363+ flb_output_set (ctx , out_ffd , "compression" , "gzip" , NULL );
364+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
365+
366+ ret = flb_start (ctx );
367+ TEST_CHECK (ret == 0 );
368+
369+ /* Push records with compression enabled */
370+ flb_lib_push (ctx , in_ffd , (char * ) record1 , strlen (record1 ));
371+ flb_lib_push (ctx , in_ffd , (char * ) record2 , strlen (record2 ));
372+
373+ sleep (2 );
374+ flb_stop (ctx );
375+ flb_destroy (ctx );
376+ }
377+
378+ void flb_test_firehose_aggregation_combined_params (void )
379+ {
380+ int ret ;
381+ flb_ctx_t * ctx ;
382+ int in_ffd ;
383+ int out_ffd ;
384+ const char * record = "[1, {\"message\":\"combined_test\"}]" ;
385+
386+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
387+
388+ ctx = flb_create ();
389+
390+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
391+ TEST_CHECK (in_ffd >= 0 );
392+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
393+
394+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
395+ TEST_CHECK (out_ffd >= 0 );
396+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
397+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
398+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
399+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
400+ flb_output_set (ctx , out_ffd , "time_key" , "timestamp" , NULL );
401+ flb_output_set (ctx , out_ffd , "time_key_format" , "%Y-%m-%d %H:%M:%S" , NULL );
402+ flb_output_set (ctx , out_ffd , "compression" , "gzip" , NULL );
403+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
404+
405+ ret = flb_start (ctx );
406+ TEST_CHECK (ret == 0 );
407+
408+ /* Test with all features combined */
409+ flb_lib_push (ctx , in_ffd , (char * ) record , strlen (record ));
410+
411+ sleep (2 );
412+ flb_stop (ctx );
413+ flb_destroy (ctx );
414+ }
415+
416+ void flb_test_firehose_aggregation_empty_records (void )
417+ {
418+ int ret ;
419+ flb_ctx_t * ctx ;
420+ int in_ffd ;
421+ int out_ffd ;
422+
423+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
424+
425+ ctx = flb_create ();
426+
427+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
428+ TEST_CHECK (in_ffd >= 0 );
429+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
430+
431+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
432+ TEST_CHECK (out_ffd >= 0 );
433+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
434+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
435+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
436+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
437+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
438+
439+ ret = flb_start (ctx );
440+ TEST_CHECK (ret == 0 );
441+
442+ /* Push empty and minimal records */
443+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {}]" , 7 );
444+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"a\":\"\"}]" , 13 );
445+
446+ sleep (2 );
447+ flb_stop (ctx );
448+ flb_destroy (ctx );
449+ }
450+
451+ void flb_test_firehose_aggregation_error_handling (void )
452+ {
453+ int ret ;
454+ flb_ctx_t * ctx ;
455+ int in_ffd ;
456+ int out_ffd ;
457+ const char * record1 = "[1, {\"message\":\"error_test1\"}]" ;
458+ const char * record2 = "[1, {\"message\":\"error_test2\"}]" ;
459+
460+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
461+ setenv ("TEST_PUT_RECORD_BATCH_ERROR" , ERROR_THROUGHPUT , 1 );
462+
463+ ctx = flb_create ();
464+
465+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
466+ TEST_CHECK (in_ffd >= 0 );
467+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
468+
469+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
470+ TEST_CHECK (out_ffd >= 0 );
471+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
472+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
473+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
474+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
475+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
476+
477+ ret = flb_start (ctx );
478+ TEST_CHECK (ret == 0 );
479+
480+ /* Test error handling with aggregation enabled */
481+ flb_lib_push (ctx , in_ffd , (char * ) record1 , strlen (record1 ));
482+ flb_lib_push (ctx , in_ffd , (char * ) record2 , strlen (record2 ));
483+
484+ sleep (2 );
485+ flb_stop (ctx );
486+ flb_destroy (ctx );
487+ unsetenv ("TEST_PUT_RECORD_BATCH_ERROR" );
488+ }
489+
490+ void flb_test_firehose_aggregation_custom_time_format (void )
491+ {
492+ int ret ;
493+ flb_ctx_t * ctx ;
494+ int in_ffd ;
495+ int out_ffd ;
496+ const char * record = "[1, {\"message\":\"unix_time\"}]" ;
497+
498+ setenv ("FLB_FIREHOSE_PLUGIN_UNDER_TEST" , "true" , 1 );
499+
500+ ctx = flb_create ();
501+
502+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
503+ TEST_CHECK (in_ffd >= 0 );
504+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
505+
506+ out_ffd = flb_output (ctx , (char * ) "kinesis_firehose" , NULL );
507+ TEST_CHECK (out_ffd >= 0 );
508+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
509+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
510+ flb_output_set (ctx , out_ffd , "delivery_stream" , "fluent" , NULL );
511+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
512+ flb_output_set (ctx , out_ffd , "time_key" , "ts" , NULL );
513+ flb_output_set (ctx , out_ffd , "time_key_format" , "%s" , NULL );
514+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
515+
516+ ret = flb_start (ctx );
517+ TEST_CHECK (ret == 0 );
518+
519+ /* Test with Unix timestamp format */
520+ flb_lib_push (ctx , in_ffd , (char * ) record , strlen (record ));
521+
522+ sleep (2 );
523+ flb_stop (ctx );
524+ flb_destroy (ctx );
525+ }
526+
192527/* Test list */
193528TEST_LIST = {
194529 {"success" , flb_test_firehose_success },
195530 {"partial_success" , flb_test_firehose_partial_success },
196531 {"throughput_error" , flb_test_firehose_throughput_error },
197532 {"unknown_error" , flb_test_firehose_error_unknown },
198533 {"nonsense_error" , flb_test_firehose_nonsense_error },
534+ {"simple_aggregation" , flb_test_firehose_simple_aggregation },
535+ {"aggregation_with_time_key" , flb_test_firehose_aggregation_with_time_key },
536+ {"aggregation_with_log_key" , flb_test_firehose_aggregation_with_log_key },
537+ {"aggregation_many_records" , flb_test_firehose_aggregation_many_records },
538+ {"aggregation_with_compression" , flb_test_firehose_aggregation_with_compression },
539+ {"aggregation_combined_params" , flb_test_firehose_aggregation_combined_params },
540+ {"aggregation_empty_records" , flb_test_firehose_aggregation_empty_records },
541+ {"aggregation_error_handling" , flb_test_firehose_aggregation_error_handling },
542+ {"aggregation_custom_time_format" , flb_test_firehose_aggregation_custom_time_format },
199543 {NULL , NULL }
200544};
0 commit comments