55
66module Fluent ::Plugin
77 class KubernetesObjectsInput < Fluent ::Plugin ::Input
8- VERSION = '1.1.0 ' . freeze
8+ VERSION = '1.1.1 ' . freeze
99
1010 Fluent ::Plugin . register_input ( 'kubernetes_objects' , self )
1111
@@ -72,6 +72,9 @@ class KubernetesObjectsInput < Fluent::Plugin::Input
7272
7373 desc 'A selector to restrict the list of returned objects by fields.'
7474 config_param :field_selector , :string , default : nil
75+
76+ desc 'The interval at which the objects will be watched.'
77+ config_param :interval , :time , default : 15 * 60
7578 end
7679
7780 config_section :storage do
@@ -140,7 +143,7 @@ def initialize_client
140143 if @bearer_token_file . nil? && File . exist? ( secret_token_file )
141144 @bearer_token_file = secret_token_file
142145 end
143- end
146+ end
144147
145148 ssl_options = {
146149 client_cert : @client_cert && OpenSSL ::X509 ::Certificate . new ( File . read ( @client_cert ) ) ,
@@ -174,10 +177,12 @@ def start_watchers
174177 o = o . to_h . dup
175178 o [ :as ] = :raw
176179 resource_name = o . delete ( :resource_name )
180+ watch_interval = o . delete ( :interval )
181+
177182 version = @storage . get ( resource_name )
178183 o [ :resource_version ] = version if version
179184 @client . public_send ( "watch_#{ resource_name } " , o ) . tap do |watcher |
180- create_watcher_thread resource_name , watcher
185+ create_watcher_thread resource_name , watcher , watch_interval
181186 end
182187 end
183188 end
@@ -209,7 +214,7 @@ def create_pull_thread(conf)
209214 -> ( item ) { item [ 'metadata' ] . update requestResourceVersion : resource_version }
210215 else
211216 -> ( item ) { }
212- end
217+ end
213218
214219 # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
215220 items = result [ 'items' ] . to_a
@@ -222,14 +227,17 @@ def create_pull_thread(conf)
222227 end
223228 end
224229
225- def create_watcher_thread ( object_name , watcher )
230+ def create_watcher_thread ( object_name , watcher , interval )
226231 thread_create ( :"watch_#{ object_name } " ) do
227232 tag = generate_tag "#{ object_name } .watch"
228- watcher . each do |entity |
229- log . trace { "Received new object from watching #{ object_name } " }
230- entity = JSON . parse ( entity )
231- router . emit tag , Fluent ::Engine . now , entity
232- @storage . put object_name , entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
233+ while thread_current_running?
234+ watcher . each do |entity |
235+ log . trace { "Received new object from watching #{ object_name } " }
236+ entity = JSON . parse ( entity )
237+ router . emit tag , Fluent ::Engine . now , entity
238+ @storage . put object_name , entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
239+ sleep ( interval )
240+ end
233241 end
234242 end
235243 end
0 commit comments