Skip to content

Commit f968720

Browse files
authored
Merge pull request #144 from osela/msgpackeventstream-issue
Fix MessagePackEventStream issue
2 parents 8a75c3d + aeebcc0 commit f968720

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

lib/fluent/plugin/filter_kubernetes_metadata.rb

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -304,17 +304,19 @@ def filter_stream_from_files(tag, es)
304304

305305
match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled)
306306
batch_miss_cache = {}
307-
if match_data
308-
container_id = match_data['docker_id']
309-
metadata = {
310-
'docker' => {
311-
'container_id' => container_id
312-
},
313-
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(es.first[1]), batch_miss_cache)
314-
}
315-
end
307+
metadata = nil
316308

317309
es.each do |time, record|
310+
if match_data && metadata.nil?
311+
container_id = match_data['docker_id']
312+
metadata = {
313+
'docker' => {
314+
'container_id' => container_id
315+
},
316+
'kubernetes' => get_metadata_for_record(match_data, container_id, create_time_from_record(record), batch_miss_cache)
317+
}
318+
end
319+
318320
record = record.merge(Marshal.load(Marshal.dump(metadata))) if metadata
319321
new_es.add(time, record)
320322
end

test/plugin/test_filter_kubernetes_metadata.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,5 +773,47 @@ def emit_with_tag(tag, msg={}, config='
773773
assert_equal(expected_kube_metadata, filtered[0])
774774
end
775775
end
776+
777+
test 'processes all events when reading from MessagePackEventStream' do
778+
VCR.use_cassette('kubernetes_docker_metadata') do
779+
entries = [[@time, {'time'=>'2015-05-08T09:22:01Z'}], [@time, {'time'=>'2015-05-08T09:22:01Z'}]]
780+
array_stream = Fluent::ArrayEventStream.new(entries)
781+
msgpack_stream = Fluent::MessagePackEventStream.new(array_stream.to_msgpack_stream)
782+
783+
d = create_driver('
784+
kubernetes_url https://localhost:8443
785+
watch false
786+
cache_size 1
787+
')
788+
d.run {
789+
d.feed(DEFAULT_TAG, msgpack_stream)
790+
}
791+
filtered = d.filtered.map{|e| e.last}
792+
793+
expected_kube_metadata = {
794+
'time'=>'2015-05-08T09:22:01Z',
795+
'docker' => {
796+
'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459'
797+
},
798+
'kubernetes' => {
799+
'host' => 'jimmi-redhat.localnet',
800+
'pod_name' => 'fabric8-console-controller-98rqc',
801+
'container_name' => 'fabric8-console-container',
802+
'container_image' => 'fabric8/hawtio-kubernetes:latest',
803+
'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303',
804+
'namespace_name' => 'default',
805+
'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c',
806+
'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d',
807+
'master_url' => 'https://localhost:8443',
808+
'labels' => {
809+
'component' => 'fabric8Console'
810+
}
811+
}
812+
}
813+
814+
assert_equal(expected_kube_metadata, filtered[0])
815+
assert_equal(expected_kube_metadata, filtered[1])
816+
end
817+
end
776818
end
777819
end

0 commit comments

Comments
 (0)