@@ -200,7 +200,12 @@ def create_pull_thread(conf)
200200 tag = generate_tag resource_name
201201 while thread_current_running?
202202 log . debug "Going to pull #{ resource_name } "
203- response = @client . public_send "get_#{ resource_name } " , options
203+ begin
204+ response = @client . public_send "get_#{ resource_name } " , options
205+ rescue Kubeclient ::ResourceNotFoundError , NoMethodError
206+ log . error "resource '#{ resource_name } ' not found. Stopped pulling it"
207+ break
208+ end
204209 now = Fluent ::Engine . now
205210 es = Fluent ::MultiEventStream . new
206211
@@ -243,22 +248,27 @@ def create_watcher_thread(conf)
243248
244249 thread_create :"watch_#{ resource_name } " do
245250 while thread_current_running?
246- @client . public_send ( "watch_#{ resource_name } " , options ) . tap do |watcher |
247- tag = generate_tag "#{ resource_name } "
248- begin
249- watcher . each do |entity |
250- begin
251- entity = JSON . parse ( entity )
252- router . emit tag , Fluent ::Engine . now , entity
253- options [ :resource_version ] = entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
254- @storage . put resource_name , entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
255- rescue => e
256- log . info "Got exception #{ e } parsing entity #{ entity } . Resetting watcher."
251+ begin
252+ @client . public_send ( "watch_#{ resource_name } " , options ) . tap do |watcher |
253+ tag = generate_tag "#{ resource_name } "
254+ begin
255+ watcher . each do |entity |
256+ begin
257+ entity = JSON . parse ( entity )
258+ router . emit tag , Fluent ::Engine . now , entity
259+ options [ :resource_version ] = entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
260+ @storage . put resource_name , entity [ 'object' ] [ 'metadata' ] [ 'resourceVersion' ]
261+ rescue => e
262+ log . info "Got exception #{ e } parsing entity #{ entity } . Resetting watcher."
263+ end
257264 end
265+ rescue => e
266+ log . info "Got exception #{ e } . Resetting watcher."
258267 end
259- rescue => e
260- log . info "Got exception #{ e } . Resetting watcher."
261268 end
269+ rescue Kubeclient ::ResourceNotFoundError , NoMethodError
270+ log . error "resource '#{ resource_name } ' not found. Stopped watching it"
271+ break
262272 end
263273 end
264274 end
0 commit comments