Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,12 @@ def queue_event(msg, output_queue, channel=nil)
@codec.decode(msg) do |event|
decorate(event)
event.set("[@metadata][redis_channel]", channel) if !channel.nil?
puts "enqueueing event #{event.to_json} onto queue #{output_queue.object_id}" if msg['c'] || msg['p']
output_queue << event
puts "event #{event.to_json} successfully enqueued, queue: #{output_queue.object_id}" if msg['c'] || msg['p']
end
rescue => e # parse or event creation error
puts "couldn't queue event onto queue #{queue.object_id}"
@logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);
end
end
Expand Down Expand Up @@ -314,7 +317,9 @@ def channel_listener(output_queue)
end

on.message do |channel, message|
puts "received message #{message}"
queue_event(message, output_queue, channel)
puts "successfully enqueued message #{message}"
end

on.unsubscribe do |channel, count|
Expand All @@ -337,7 +342,9 @@ def pattern_channel_listener(output_queue)
end

on.pmessage do |pattern, channel, message|
puts "received message #{message}"
queue_event(message, output_queue, channel)
puts "successfully enqueued message #{message}"
end

on.punsubscribe do |channel, count|
Expand Down
52 changes: 40 additions & 12 deletions spec/inputs/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def process(conf, event_count)
end

describe LogStash::Inputs::Redis do
let(:queue) { Queue.new }
#let(:queue) { Queue.new }

let(:data_type) { 'list' }
let(:batch_count) { 1 }
Expand Down Expand Up @@ -141,6 +141,8 @@ def process(conf, event_count)

context 'runtime for list data_type' do

let(:queue) { Queue.new }

before do
subject.register
allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true
Expand Down Expand Up @@ -314,7 +316,7 @@ def process(conf, event_count)

before { subject.register }

def run_it_thread(inst)
def run_it_thread(inst, queue)
Thread.new(inst) do |subj|
subj.run(queue)
end
Expand All @@ -324,16 +326,22 @@ def publish_thread(new_redis, prefix)
Thread.new(new_redis, prefix) do |r, p|
sleep 0.1
2.times do |i|
r.publish('foo', "#{p}#{i.next}")
puts "i is #{i}"
r.publish('foo', {"#{p}" => "#{i.next}"}.to_json)
end
end
end

def close_thread(inst, rt)
def close_thread(inst, rt, queue)
Thread.new(inst, rt) do |subj, runner|
# block for the messages
puts "close_thread: queue: #{queue.object_id}, queue.size #{queue.size}"
puts "popping first event"
e1 = queue.pop
puts "got event #{e1.to_hash}"
puts "popping second event"
e2 = queue.pop
puts "got event #{e2.to_hash}"
# put em back for the tests
queue.push(e1)
queue.push(e2)
Expand Down Expand Up @@ -366,22 +374,32 @@ def close_thread(inst, rt)

context 'real redis', :redis => true do
it 'calling the run method, adds events to the queue' do
queue = Queue.new
#simulate the input thread
rt = run_it_thread(subject)
puts "channel real redis: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}"
rt = run_it_thread(subject, queue)
#simulate the other system thread
puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
publish_thread(subject.send(:new_redis_instance), 'c').join
puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
#simulate the pipeline thread
close_thread(subject, rt).join
close_thread(subject, rt, queue).join
puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}"

expect(queue.size).to eq(2)
end
it 'events had redis_channel' do
queue = Queue.new
#simulate the input thread
rt = run_it_thread(subject)
puts "channel: events had redis_channel: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}"
rt = run_it_thread(subject, queue)
puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'c').join
puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
#simulate the pipeline thread
close_thread(subject, rt).join
close_thread(subject, rt, queue).join
puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}"
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
Expand All @@ -404,23 +422,33 @@ def close_thread(inst, rt)

context 'real redis', :redis => true do
it 'calling the run method, adds events to the queue' do
queue = Queue.new
#simulate the input thread
rt = run_it_thread(subject)
puts "pattern_channel: real redis: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}"
rt = run_it_thread(subject, queue)
#simulate the other system thread
puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
publish_thread(subject.send(:new_redis_instance), 'pc').join
puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
#simulate the pipeline thread
close_thread(subject, rt).join
close_thread(subject, rt, queue).join
puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}"

expect(queue.size).to eq(2)
end

it 'events had redis_channel' do
queue = Queue.new
#simulate the input thread
rt = run_it_thread(subject)
puts "pattern_channel: redis_channel: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}"
rt = run_it_thread(subject, queue)
#simulate the other system thread
puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
publish_thread(subject.send(:new_redis_instance), 'pc').join
puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}"
#simulate the pipeline thread
close_thread(subject, rt).join
close_thread(subject, rt, queue).join
puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}"
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
Expand Down