Skip to content

Commit a181fb7

Browse files
authored
Merge pull request #155 from josefkarasek/master_detect-metadata-source
deprecate use_journal - add lookup_from_k8s_field - detect input
2 parents 20f064b + 33b403f commit a181fb7

File tree

5 files changed

+1160
-108
lines changed

5 files changed

+1160
-108
lines changed

README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,33 @@ This must used named capture groups for `container_name`, `pod_name` & `namespac
4444
* `watch` - set up a watch on pods on the API server for updates to metadata (default: `true`)
4545
* `de_dot` - replace dots in labels and annotations with configured `de_dot_separator`, required for ElasticSearch 2.x compatibility (default: `true`)
4646
* `de_dot_separator` - separator to use if `de_dot` is enabled (default: `_`)
47-
* `use_journal` - If false (default), messages are expected to be formatted and tagged as if read by the fluentd in\_tail plugin with wildcard filename. If true, messages are expected to be formatted as if read from the systemd journal. The `MESSAGE` field has the full message. The `CONTAINER_NAME` field has the encoded k8s metadata (see below). The `CONTAINER_ID_FULL` field has the full container uuid. This requires docker to use the `--log-driver=journald` log driver.
47+
* *DEPRECATED* `use_journal` - If false, messages are expected to be formatted and tagged as if read by the fluentd in\_tail plugin with wildcard filename. If true, messages are expected to be formatted as if read from the systemd journal. The `MESSAGE` field has the full message. The `CONTAINER_NAME` field has the encoded k8s metadata (see below). The `CONTAINER_ID_FULL` field has the full container uuid. This requires docker to use the `--log-driver=journald` log driver. If unset (the default), the plugin will use the `CONTAINER_NAME` and `CONTAINER_ID_FULL` fields
48+
if available, otherwise, will use the tag in the `tag_to_kubernetes_name_regexp` format.
4849
* `container_name_to_kubernetes_regexp` - The regular expression used to extract the k8s metadata encoded in the journal `CONTAINER_NAME` field (default: `'^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)(\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace>[^_]+)_[^_]+_[^_]+$'`
4950
* This corresponds to the definition [in the source](https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L317)
5051
* `annotation_match` - Array of regular expressions matching annotation field names. Matched annotations are added to a log record.
5152
* `allow_orphans` - Modify the namespace and namespace id to the values of `orphaned_namespace_name` and `orphaned_namespace_id`
5253
when true (default: `true`)
5354
* `orphaned_namespace_name` - The namespace to associate with records where the namespace can not be determined (default: `.orphaned`)
5455
* `orphaned_namespace_id` - The namespace id to associate with records where the namespace can not be determined (default: `orphaned`)
56+
* `lookup_from_k8s_field` - If the field `kubernetes` is present, lookup the metadata from the given subfields such as `kubernetes.namespace_name`, `kubernetes.pod_name`, etc. This allows you to avoid having to pass in metadata to lookup in an explicitly formatted tag name or in an explicitly formatted `CONTAINER_NAME` value. For example, set `kubernetes.namespace_name`, `kubernetes.pod_name`, `kubernetes.container_name`, and `docker.id` in the record, and the filter will fill in the rest. (default: `true`)
5557

5658
**NOTE:** As of the release 2.1.x of this plugin, it no longer supports parsing the source message into JSON and attaching it to the
5759
payload. The following configuration options are removed:
5860

5961
* `merge_json_log`
6062
* `preserve_json_log`
6163

64+
**NOTE** As of this release, the use of `use_journal` is **DEPRECATED**. If this setting is not present, the plugin will
65+
attempt to figure out the source of the metadata fields from the following:
66+
- If `lookup_from_k8s_field true` (the default) and the following fields are present in the record:
67+
`docker.container_id`, `kubernetes.namespace_name`, `kubernetes.pod_name`, `kubernetes.container_name`,
68+
then the plugin will use those values as the source to use to lookup the metadata
69+
- If `use_journal true`, or `use_journal` is unset, and the fields `CONTAINER_NAME` and `CONTAINER_ID_FULL` are present in the record,
70+
then the plugin will parse those values using `container_name_to_kubernetes_regexp` and use those as the source to lookup the metadata
71+
- Otherwise, if the tag matches `tag_to_kubernetes_name_regexp`, the plugin will parse the tag and use those values to
72+
lookup the metdata
73+
6274
Reading from the JSON formatted log files with `in_tail` and wildcard filenames:
6375
```
6476
<source>

lib/fluent/plugin/filter_kubernetes_metadata.rb

Lines changed: 64 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter
5757
# format:
5858
# CONTAINER_NAME=k8s_$containername.$containerhash_$podname_$namespacename_$poduuid_$rand32bitashex
5959
# CONTAINER_FULL_ID=dockeridassha256hexvalue
60-
config_param :use_journal, :bool, default: false
60+
config_param :use_journal, :bool, default: nil
6161
# Field 2 is the container_hash, field 5 is the pod_id, and field 6 is the pod_randhex
6262
# I would have included them as named groups, but you can't have named groups that are
6363
# non-capturing :P
@@ -71,6 +71,7 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter
7171
config_param :allow_orphans, :bool, default: true
7272
config_param :orphaned_namespace_name, :string, default: '.orphaned'
7373
config_param :orphaned_namespace_id, :string, default: 'orphaned'
74+
config_param :lookup_from_k8s_field, :bool, default: true
7475

7576
def fetch_pod_metadata(namespace_name, pod_name)
7677
log.trace("fetching pod metadata: #{namespace_name}/#{pod_name}") if log.trace?
@@ -243,13 +244,10 @@ def log.trace?
243244
namespace_thread.abort_on_exception = true
244245
end
245246
end
246-
if @use_journal
247-
log.debug "Will stream from the journal"
248-
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_journal }
249-
else
250-
log.debug "Will stream from the files"
251-
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_files }
252-
end
247+
@time_fields = []
248+
@time_fields.push('_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP') if @use_journal || @use_journal.nil?
249+
@time_fields.push('time') unless @use_journal
250+
@time_fields.push('@timestamp') if @lookup_from_k8s_field
253251

254252
@annotations_regexps = []
255253
@annotation_match.each do |regexp|
@@ -262,102 +260,93 @@ def log.trace?
262260

263261
end
264262

265-
def get_metadata_for_record(match_data, container_id, create_time, batch_miss_cache)
266-
namespace_name = match_data['namespace']
267-
pod_name = match_data['pod_name']
268-
container_name = match_data['container_name']
263+
def get_metadata_for_record(namespace_name, pod_name, container_name, container_id, create_time, batch_miss_cache)
269264
metadata = {
270-
'container_name' => container_name,
271-
'namespace_name' => namespace_name,
272-
'pod_name' => pod_name
265+
'docker' => {'container_id' => container_id},
266+
'kubernetes' => {
267+
'container_name' => container_name,
268+
'namespace_name' => namespace_name,
269+
'pod_name' => pod_name
270+
}
273271
}
274272
if @kubernetes_url.present?
275273
pod_metadata = get_pod_metadata(container_id, namespace_name, pod_name, create_time, batch_miss_cache)
276274

277275
if (pod_metadata.include? 'containers') && (pod_metadata['containers'].include? container_id)
278-
metadata['container_image'] = pod_metadata['containers'][container_id]['image']
279-
metadata['container_image_id'] = pod_metadata['containers'][container_id]['image_id']
276+
metadata['kubernetes']['container_image'] = pod_metadata['containers'][container_id]['image']
277+
metadata['kubernetes']['container_image_id'] = pod_metadata['containers'][container_id]['image_id']
280278
end
281279

282-
metadata.merge!(pod_metadata) if pod_metadata
283-
metadata.delete('containers')
280+
metadata['kubernetes'].merge!(pod_metadata) if pod_metadata
281+
metadata['kubernetes'].delete('containers')
284282
end
285283
metadata
286284
end
287285

288-
def create_time_from_record(record)
289-
time = if @use_journal
290-
record['_SOURCE_REALTIME_TIMESTAMP'].nil? ? record['_SOURCE_REALTIME_TIMESTAMP'] : record['__REALTIME_TIMESTAMP']
291-
else
292-
record['time']
293-
end
294-
(time.nil? || time.chop.empty?) ? Time.now : Time.parse(time)
286+
def create_time_from_record(record, internal_time)
287+
time_key = @time_fields.detect{ |ii| record.has_key?(ii) }
288+
time = record[time_key]
289+
if time.nil? || time.chop.empty?
290+
return internal_time
291+
end
292+
if ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'].include?(time_key)
293+
timei= time.to_i
294+
return Time.at(timei / 1000000, timei % 1000000)
295+
end
296+
return Time.parse(time)
295297
end
296298

297299
def filter_stream(tag, es)
298-
es
299-
end
300-
301-
def filter_stream_from_files(tag, es)
302300
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
303301
new_es = Fluent::MultiEventStream.new
304-
305-
match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled)
302+
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
303+
tag_metadata = nil
306304
batch_miss_cache = {}
307-
metadata = nil
308-
309305
es.each do |time, record|
310-
if match_data && metadata.nil?
311-
container_id = match_data['docker_id']
312-
metadata = {
313-
'docker' => {
314-
'container_id' => container_id
315-
},
316-
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(record), batch_miss_cache)
317-
}
306+
if tag_match_data && tag_metadata.nil?
307+
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
308+
tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache)
309+
end
310+
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
311+
if (@use_journal || @use_journal.nil?) &&
312+
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
313+
metadata = j_metadata
314+
end
315+
if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') &&
316+
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
317+
record['kubernetes'].has_key?('namespace_name') &&
318+
record['kubernetes'].has_key?('pod_name') &&
319+
record['kubernetes'].has_key?('container_name') &&
320+
record['docker'].has_key?('container_id') &&
321+
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
322+
record['kubernetes']['container_name'], record['docker']['container_id'],
323+
create_time_from_record(record, time), batch_miss_cache))
324+
metadata = k_metadata
318325
end
319326

320-
record = record.merge(Marshal.load(Marshal.dump(metadata))) if metadata
327+
record = record.merge(metadata) if metadata
321328
new_es.add(time, record)
322329
end
323330
dump_stats
324331
new_es
325332
end
326333

327-
def filter_stream_from_journal(tag, es)
328-
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
329-
new_es = Fluent::MultiEventStream.new
330-
batch_miss_cache = {}
331-
es.each do |time, record|
332-
metadata = nil
333-
if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL')
334-
metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data|
335-
container_id = record['CONTAINER_ID_FULL']
336-
metadata = {
337-
'docker' => {
338-
'container_id' => container_id
339-
},
340-
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(record), batch_miss_cache)
341-
}
342-
343-
metadata
344-
end
345-
unless metadata
346-
log.debug "Error: could not match CONTAINER_NAME from record #{record}"
347-
@stats.bump(:container_name_match_failed)
348-
end
349-
elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_')
350-
log.debug "Error: no container name and id in record #{record}"
351-
@stats.bump(:container_name_id_missing)
334+
def get_metadata_for_journal_record(record, time, batch_miss_cache)
335+
metadata = nil
336+
if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL')
337+
metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data|
338+
get_metadata_for_record(match_data['namespace'], match_data['pod_name'], match_data['container_name'],
339+
record['CONTAINER_ID_FULL'], create_time_from_record(record, time), batch_miss_cache)
352340
end
353-
354-
record = record.merge(metadata) if metadata
355-
356-
new_es.add(time, record)
341+
unless metadata
342+
log.debug "Error: could not match CONTAINER_NAME from record #{record}"
343+
@stats.bump(:container_name_match_failed)
344+
end
345+
elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_')
346+
log.debug "Error: no container name and id in record #{record}"
347+
@stats.bump(:container_name_id_missing)
357348
end
358-
359-
dump_stats
360-
new_es
349+
metadata
361350
end
362351

363352
def de_dot!(h)

0 commit comments

Comments
 (0)