Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_ring_buffer.h>
#include <fluent-bit/flb_storage.h>

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -297,6 +298,7 @@ static int cb_emitter_init(struct flb_input_instance *in,
{
struct flb_sched *scheduler;
struct flb_emitter *ctx;
char *pause_prop = NULL;
int ret;

scheduler = flb_sched_ctx_get();
Expand All @@ -318,6 +320,26 @@ static int cb_emitter_init(struct flb_input_instance *in,
return -1;
}

/*
* The emitter is used internally by filters such as rewrite_tag. When the
* downstream outputs experience backpressure, the emitter needs to pause
* its upstream senders to avoid holding an arbitrary number of "up"
* chunks in memory. Without pausing on the filesystem storage limit, the
* emitter can continue to accumulate in-memory chunks (for example, in a
* rewrite_tag pipeline) even though storage.max_chunks_up intends to cap
* usage. Enable pausing on the storage chunks limit by default when
* filesystem storage is in use so the configured storage.max_chunks_up
* limit is honored.
*/
pause_prop = flb_input_get_property("storage.pause_on_chunks_overlimit", in);
if (pause_prop == NULL) {
if (in->storage_type == FLB_STORAGE_FS &&
in->storage_pause_on_chunks_overlimit == FLB_FALSE) {
in->storage_pause_on_chunks_overlimit = FLB_TRUE;
flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter");
}
}

if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) {
ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;
flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)",
Expand Down
Loading