diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 5d1bd44e876..92803b69e78 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -297,6 +298,7 @@ static int cb_emitter_init(struct flb_input_instance *in, { struct flb_sched *scheduler; struct flb_emitter *ctx; + char *pause_prop = NULL; int ret; scheduler = flb_sched_ctx_get(); @@ -318,6 +320,26 @@ static int cb_emitter_init(struct flb_input_instance *in, return -1; } + /* + * The emitter is used internally by filters such as rewrite_tag. When the + * downstream outputs experience backpressure, the emitter needs to pause + * its upstream senders to avoid holding an arbitrary number of "up" + * chunks in memory. Without pausing on the filesystem storage limit, the + * emitter can continue to accumulate in-memory chunks (for example, in a + * rewrite_tag pipeline) even though storage.max_chunks_up intends to cap + * usage. Enable pausing on the storage chunks limit by default when + * filesystem storage is in use so the configured storage.max_chunks_up + * limit is honored. + */ + pause_prop = flb_input_get_property("storage.pause_on_chunks_overlimit", in); + if (pause_prop == NULL) { + if (in->storage_type == FLB_STORAGE_FS && + in->storage_pause_on_chunks_overlimit == FLB_FALSE) { + in->storage_pause_on_chunks_overlimit = FLB_TRUE; + flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter"); + } + } + if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) { ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY; flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)",