Skip to content

Commit 6cbfaff

Browse files
authored
Merge pull request #147 from richm/issue-146
[master] test for eventstream has empty method and if class is EventStream
2 parents f968720 + a3b458e commit 6cbfaff

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

lib/fluent/plugin/filter_kubernetes_metadata.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def filter_stream(tag, es)
299299
end
300300

301301
def filter_stream_from_files(tag, es)
302-
return es if es.nil? || es.empty?
302+
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
303303
new_es = Fluent::MultiEventStream.new
304304

305305
match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled)
@@ -325,7 +325,7 @@ def filter_stream_from_files(tag, es)
325325
end
326326

327327
def filter_stream_from_journal(tag, es)
328-
return es if es.nil? || es.empty?
328+
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
329329
new_es = Fluent::MultiEventStream.new
330330
batch_miss_cache = {}
331331
es.each do |time, record|

test/plugin/test_filter_kubernetes_metadata.rb

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,40 @@ def emit_with_tag(tag, msg={}, config='
160160

161161
test 'nil event stream from journal' do
162162
#not certain how this is possible but adding test to properly
163-
#guard against this condition we have seen
164-
163+
#guard against this condition we have seen - test for nil,
164+
#empty, no empty method, not an event stream
165165
plugin = create_driver.instance
166-
plugin.filter_stream_from_journal('tag', nil)
167-
plugin.filter_stream_from_journal('tag', Fluent::MultiEventStream.new)
166+
[nil, Fluent::MultiEventStream.new, 1, [1]].each do |es|
167+
assert_equal es, plugin.filter_stream_from_journal('tag', es)
168+
end
169+
# and make sure OneEventStream works
170+
ts = Time.now()
171+
rec = {"message"=>"hello"}
172+
es = Fluent::OneEventStream.new(ts, rec)
173+
newes = plugin.filter_stream_from_journal('tag', es)
174+
newes.each do |newts, newrec|
175+
assert_equal ts, newts
176+
assert_equal rec, newrec
177+
end
168178
end
169179

170180
test 'nil event stream from files' do
171181
#not certain how this is possible but adding test to properly
172182
#guard against this condition we have seen
173183

174184
plugin = create_driver.instance
175-
plugin.filter_stream_from_files('tag', nil)
176-
plugin.filter_stream_from_files('tag', Fluent::MultiEventStream.new)
185+
[nil, Fluent::MultiEventStream.new, 1, [1]].each do |es|
186+
assert_equal es, plugin.filter_stream_from_files('tag', es)
187+
end
188+
# and make sure OneEventStream works
189+
ts = Time.now()
190+
rec = {"message"=>"hello"}
191+
es = Fluent::OneEventStream.new(ts, rec)
192+
newes = plugin.filter_stream_from_journal('tag', es)
193+
newes.each do |newts, newrec|
194+
assert_equal ts, newts
195+
assert_equal rec, newrec
196+
end
177197
end
178198

179199
test 'inability to connect to the api server handles exception and doensnt block pipeline' do

0 commit comments

Comments
 (0)