Skip to content

Commit 2ce33db

Browse files
committed
in_emitter: Add backpressure limitation for filesystem storage
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 3a501b2 commit 2ce33db

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

plugins/in_emitter/emitter.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <fluent-bit/flb_sds.h>
2626
#include <fluent-bit/flb_scheduler.h>
2727
#include <fluent-bit/flb_ring_buffer.h>
28+
#include <fluent-bit/flb_storage.h>
2829

2930
#include <sys/types.h>
3031
#include <sys/stat.h>
@@ -318,6 +319,29 @@ static int cb_emitter_init(struct flb_input_instance *in,
318319
return -1;
319320
}
320321

322+
/*
323+
* The emitter is used internally by filters such as rewrite_tag. When the
324+
* downstream outputs experience backpressure, the emitter needs to pause
325+
* its upstream senders to avoid holding an arbitrary number of "up"
326+
* chunks in memory. Without pausing on the filesystem storage limit, the
327+
* emitter can continue to accumulate in-memory chunks (for example, in a
328+
* rewrite_tag pipeline) even though storage.max_chunks_up intends to cap
329+
* usage. Enable pausing on the storage chunks limit by default when
330+
* filesystem storage is in use so the configured storage.max_chunks_up
331+
* limit is honored.
332+
*/
333+
if (in->storage_type == FLB_STORAGE_FS &&
334+
in->storage_pause_on_chunks_overlimit == FLB_FALSE) {
335+
in->storage_pause_on_chunks_overlimit = FLB_TRUE;
336+
flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter");
337+
}
338+
else if (in->storage_type != FLB_STORAGE_FS &&
339+
in->storage_pause_on_chunks_overlimit == FLB_TRUE) {
340+
flb_plg_debug(in,
341+
"storage.pause_on_chunks_overlimit reset: storage.type is not filesystem");
342+
in->storage_pause_on_chunks_overlimit = FLB_FALSE;
343+
}
344+
321345
if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) {
322346
ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;
323347
flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)",

0 commit comments

Comments
 (0)