@@ -295,6 +295,153 @@ void flb_test_kinesis_invalid_port(void)
295295 flb_destroy (ctx );
296296}
297297
298+ void flb_test_kinesis_simple_aggregation (void )
299+ {
300+ int ret ;
301+ flb_ctx_t * ctx ;
302+ int in_ffd ;
303+ int out_ffd ;
304+
305+ setenv ("FLB_KINESIS_PLUGIN_UNDER_TEST" , "true" , 1 );
306+
307+ ctx = flb_create ();
308+
309+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
310+ TEST_CHECK (in_ffd >= 0 );
311+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
312+
313+ out_ffd = flb_output (ctx , (char * ) "kinesis_streams" , NULL );
314+ TEST_CHECK (out_ffd >= 0 );
315+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
316+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
317+ flb_output_set (ctx , out_ffd , "stream" , "fluent" , NULL );
318+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
319+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
320+
321+ ret = flb_start (ctx );
322+ TEST_CHECK (ret == 0 );
323+
324+ /* Push multiple small records */
325+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test1\"}]" , 25 );
326+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test2\"}]" , 25 );
327+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"test3\"}]" , 25 );
328+
329+ sleep (2 );
330+ flb_stop (ctx );
331+ flb_destroy (ctx );
332+ }
333+
334+ void flb_test_kinesis_aggregation_with_time_key (void )
335+ {
336+ int ret ;
337+ flb_ctx_t * ctx ;
338+ int in_ffd ;
339+ int out_ffd ;
340+
341+ setenv ("FLB_KINESIS_PLUGIN_UNDER_TEST" , "true" , 1 );
342+
343+ ctx = flb_create ();
344+
345+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
346+ TEST_CHECK (in_ffd >= 0 );
347+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
348+
349+ out_ffd = flb_output (ctx , (char * ) "kinesis_streams" , NULL );
350+ TEST_CHECK (out_ffd >= 0 );
351+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
352+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
353+ flb_output_set (ctx , out_ffd , "stream" , "fluent" , NULL );
354+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
355+ flb_output_set (ctx , out_ffd , "time_key" , "timestamp" , NULL );
356+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
357+
358+ ret = flb_start (ctx );
359+ TEST_CHECK (ret == 0 );
360+
361+ /* Push records with time_key enabled */
362+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"with_time1\"}]" , 30 );
363+ flb_lib_push (ctx , in_ffd , (char * ) "[1, {\"message\":\"with_time2\"}]" , 30 );
364+
365+ sleep (2 );
366+ flb_stop (ctx );
367+ flb_destroy (ctx );
368+ }
369+
370+ void flb_test_kinesis_aggregation_with_log_key (void )
371+ {
372+ int ret ;
373+ flb_ctx_t * ctx ;
374+ int in_ffd ;
375+ int out_ffd ;
376+ const char * record = "[1, {\"message\":\"with_log_key\"}]" ;
377+
378+ setenv ("FLB_KINESIS_PLUGIN_UNDER_TEST" , "true" , 1 );
379+
380+ ctx = flb_create ();
381+
382+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
383+ TEST_CHECK (in_ffd >= 0 );
384+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
385+
386+ out_ffd = flb_output (ctx , (char * ) "kinesis_streams" , NULL );
387+ TEST_CHECK (out_ffd >= 0 );
388+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
389+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
390+ flb_output_set (ctx , out_ffd , "stream" , "fluent" , NULL );
391+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
392+ flb_output_set (ctx , out_ffd , "log_key" , "log" , NULL );
393+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
394+
395+ ret = flb_start (ctx );
396+ TEST_CHECK (ret == 0 );
397+
398+ /* Push records with log_key enabled */
399+ flb_lib_push (ctx , in_ffd , (char * ) record , strlen (record ));
400+
401+ sleep (2 );
402+ flb_stop (ctx );
403+ flb_destroy (ctx );
404+ }
405+
406+ void flb_test_kinesis_aggregation_many_records (void )
407+ {
408+ int ret ;
409+ flb_ctx_t * ctx ;
410+ int in_ffd ;
411+ int out_ffd ;
412+ int i ;
413+ char record [100 ];
414+
415+ setenv ("FLB_KINESIS_PLUGIN_UNDER_TEST" , "true" , 1 );
416+
417+ ctx = flb_create ();
418+
419+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
420+ TEST_CHECK (in_ffd >= 0 );
421+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
422+
423+ out_ffd = flb_output (ctx , (char * ) "kinesis_streams" , NULL );
424+ TEST_CHECK (out_ffd >= 0 );
425+ flb_output_set (ctx , out_ffd , "match" , "*" , NULL );
426+ flb_output_set (ctx , out_ffd , "region" , "us-west-2" , NULL );
427+ flb_output_set (ctx , out_ffd , "stream" , "fluent" , NULL );
428+ flb_output_set (ctx , out_ffd , "simple_aggregation" , "On" , NULL );
429+ flb_output_set (ctx , out_ffd , "Retry_Limit" , "1" , NULL );
430+
431+ ret = flb_start (ctx );
432+ TEST_CHECK (ret == 0 );
433+
434+ /* Push many small records to test aggregation efficiency */
435+ for (i = 0 ; i < 50 ; i ++ ) {
436+ snprintf (record , sizeof (record ), "[1, {\"id\":%d,\"msg\":\"test\"}]" , i );
437+ flb_lib_push (ctx , in_ffd , record , strlen (record ));
438+ }
439+
440+ sleep (3 );
441+ flb_stop (ctx );
442+ flb_destroy (ctx );
443+ }
444+
298445/* Test list */
299446TEST_LIST = {
300447 {"success" , flb_test_firehose_success },
@@ -305,5 +452,9 @@ TEST_LIST = {
305452 {"default_port" , flb_test_kinesis_default_port },
306453 {"custom_port" , flb_test_kinesis_custom_port },
307454 {"invalid_port" , flb_test_kinesis_invalid_port },
455+ {"simple_aggregation" , flb_test_kinesis_simple_aggregation },
456+ {"aggregation_with_time_key" , flb_test_kinesis_aggregation_with_time_key },
457+ {"aggregation_with_log_key" , flb_test_kinesis_aggregation_with_log_key },
458+ {"aggregation_many_records" , flb_test_kinesis_aggregation_many_records },
308459 {NULL , NULL }
309460};
0 commit comments