1616# See the License for the specific language governing permissions and
1717# limitations under the License.
1818#
19+ require_relative 'kubernetes_metadata_stats'
1920module Fluent
2021 class KubernetesMetadataFilter < Fluent ::Filter
2122 K8_POD_CA_CERT = 'ca.crt'
@@ -57,6 +58,7 @@ class KubernetesMetadataFilter < Fluent::Filter
5758 :default => '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)(\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace>[^_]+)_[^_]+_[^_]+$'
5859
5960 config_param :annotation_match , :array , default : [ ]
61+ config_param :stats_interval , :integer , default : 30
6062
6163 def syms_to_strs ( hsh )
6264 newhsh = { }
@@ -110,15 +112,63 @@ def parse_namespace_metadata(namespace_object)
110112 def get_pod_metadata ( namespace_name , pod_name )
111113 begin
112114 metadata = @client . get_pod ( pod_name , namespace_name )
113- return if !metadata
114- return parse_pod_metadata ( metadata )
115- rescue KubeException
115+ unless metadata
116+ @stats . bump ( :pod_cache_api_nil_not_found )
117+ else
118+ begin
119+ metadata = parse_pod_metadata ( metadata )
120+ @stats . bump ( :pod_cache_api_updates )
121+ return metadata
122+ rescue Exception => e
123+ log . debug ( e )
124+ @stats . bump ( :pod_cache_api_nil_bad_resp_payload )
125+ nil
126+ end
127+ end
128+ rescue KubeException => e
129+ @stats . bump ( :pod_cache_api_nil_error )
130+ log . debug "Exception encountered fetching pod metadata from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ e . message } "
131+ nil
132+ end
133+ end
134+
135+ def dump_stats
136+ @curr_time = Time . now
137+ return if @curr_time . to_i - @prev_time . to_i < @stats_interval
138+ @prev_time = @curr_time
139+ @stats . set ( :pod_cache_size , @cache . count )
140+ @stats . set ( :namespace_cache_size , @namespace_cache . count )
141+ log . info ( @stats )
142+ end
143+
144+ def get_namespace_metadata ( namespace_name )
145+ begin
146+ metadata = @client . get_namespace ( namespace_name )
147+ unless metadata
148+ @stats . bump ( :namespace_cache_api_nil_not_found )
149+ else
150+ begin
151+ metadata = parse_namespace_metadata ( metadata )
152+ @stats . bump ( :namespace_cache_api_updates )
153+ return metadata
154+ rescue Exception => e
155+ log . debug ( e )
156+ @stats . bump ( :namespace_cache_api_nil_bad_resp_payload )
157+ nil
158+ end
159+ end
160+ rescue KubeException => kube_error
161+ @stats . bump ( :namespace_cache_api_nil_error )
162+ log . debug "Exception encountered fetching namespace metadata from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ kube_error . message } "
116163 nil
117164 end
118165 end
119166
120167 def initialize
121168 super
169+ @stats = KubernetesMetadata ::Stats . new
170+ @prev_time = Time . now
171+
122172 end
123173
124174 def configure ( conf )
@@ -235,6 +285,7 @@ def get_metadata_for_record(namespace_name, pod_name, container_name)
235285
236286 this = self
237287 pod_metadata = @cache . getset ( cache_key ) {
288+ @stats . bump ( :pod_cache_miss )
238289 md = this . get_pod_metadata (
239290 namespace_name ,
240291 pod_name ,
@@ -245,14 +296,8 @@ def get_metadata_for_record(namespace_name, pod_name, container_name)
245296
246297 if @include_namespace_metadata
247298 namespace_metadata = @namespace_cache . getset ( namespace_name ) {
248- begin
249- namespace = @client . get_namespace ( namespace_name )
250- if namespace
251- parse_namespace_metadata ( namespace )
252- end
253- rescue KubeException
254- nil
255- end
299+ @stats . bump ( :namespace_cache_miss )
300+ get_namespace_metadata ( namespace_name )
256301 }
257302 metadata . merge! ( namespace_metadata ) if namespace_metadata
258303 end
@@ -289,7 +334,7 @@ def filter_stream_from_files(tag, es)
289334
290335 new_es . add ( time , record )
291336 }
292-
337+ dump_stats
293338 new_es
294339 end
295340
@@ -317,9 +362,11 @@ def filter_stream_from_journal(tag, es)
317362 end
318363 unless metadata
319364 log . debug "Error: could not match CONTAINER_NAME from record #{ record } "
365+ @stats . dump ( :container_name_match_failed )
320366 end
321367 elsif record . has_key? ( 'CONTAINER_NAME' ) && record [ 'CONTAINER_NAME' ] . start_with? ( 'k8s_' )
322368 log . debug "Error: no container name and id in record #{ record } "
369+ @stats . dump ( :container_name_id_missing )
323370 end
324371
325372 if metadata
@@ -329,6 +376,7 @@ def filter_stream_from_journal(tag, es)
329376 new_es . add ( time , record )
330377 }
331378
379+ dump_stats
332380 new_es
333381 end
334382
@@ -341,7 +389,9 @@ def merge_json_log(record)
341389 unless @preserve_json_log
342390 record . delete ( @merge_json_log_key )
343391 end
344- rescue JSON ::ParserError
392+ rescue JSON ::ParserError => e
393+ @stats . bump ( :merge_json_parse_errors )
394+ log . debug ( e )
345395 end
346396 end
347397 end
@@ -376,9 +426,7 @@ def start_watch
376426 watcher = @client . watch_pods ( resource_version )
377427 rescue Exception => e
378428 message = "Exception encountered fetching metadata from Kubernetes API endpoint: #{ e . message } "
379- if e . respond_to? ( :response )
380- message += " (#{ e . response } )"
381- end
429+ message += " (#{ e . response } )" if e . respond_to? ( :response )
382430
383431 raise Fluent ::ConfigError , message
384432 end
@@ -390,33 +438,50 @@ def start_watch
390438 cached = @cache [ cache_key ]
391439 if cached
392440 @cache [ cache_key ] = parse_pod_metadata ( notice . object )
441+ @stats . bump ( :pod_cache_watch_updates )
442+ else
443+ @stats . bump ( :pod_cache_watch_misses )
393444 end
394445 when 'DELETED'
395446 cache_key = "#{ notice . object [ 'metadata' ] [ 'namespace' ] } _#{ notice . object [ 'metadata' ] [ 'name' ] } "
396447 @cache . delete ( cache_key )
448+ @stats . bump ( :pod_cache_watch_deletes )
397449 else
398450 # Don't pay attention to creations, since the created pod may not
399451 # end up on this node.
452+ @stats . bump ( :pod_cache_watch_ignored )
400453 end
401454 end
402455 end
403456
404457 def start_namespace_watch
405- resource_version = @client . get_namespaces . resourceVersion
406- watcher = @client . watch_namespaces ( resource_version )
458+ begin
459+ resource_version = @client . get_namespaces . resourceVersion
460+ watcher = @client . watch_namespaces ( resource_version )
461+ rescue Exception => e
462+ message = "start_namespace_watch: Exception encountered setting up namespace watch from Kubernetes API #{ @apiVersion } endpoint #{ @kubernetes_url } : #{ e . message } "
463+ message += " (#{ e . response } )" if e . respond_to? ( :response )
464+ log . debug ( message )
465+ raise Fluent ::ConfigError , message
466+ end
407467 watcher . each do |notice |
408468 case notice . type
409469 when 'MODIFIED'
410470 cache_key = notice . object [ 'metadata' ] [ 'name' ]
411471 cached = @namespace_cache [ cache_key ]
412472 if cached
413473 @namespace_cache [ cache_key ] = parse_namespace_metadata ( notice . object )
474+ @stats . bump ( :namespace_cache_watch_updates )
475+ else
476+ @stats . bump ( :namespace_cache_watch_misses )
414477 end
415478 when 'DELETED'
416479 @namespace_cache . delete ( notice . object [ 'metadata' ] [ 'name' ] )
480+ @stats . bump ( :namespace_cache_watch_deletes )
417481 else
418482 # Don't pay attention to creations, since the created namespace may not
419483 # be used by any pod on this node.
484+ @stats . bump ( :namespace_cache_watch_ignored )
420485 end
421486 end
422487 end
0 commit comments