diff --git a/lib/redsnapper.rb b/lib/redsnapper.rb index 839946a..87ba8a2 100644 --- a/lib/redsnapper.rb +++ b/lib/redsnapper.rb @@ -1,4 +1,4 @@ -require 'thread/pool' +require 'thread' require 'open3' require 'set' require 'date' @@ -32,7 +32,38 @@ def <=>(other) def initialize(archive, options = {}) @archive = archive @options = options - @thread_pool = Thread.pool(options[:thread_pool_size] || THREAD_POOL_DEFAULT_SIZE) + @tpsize = options[:thread_pool_size] || THREAD_POOL_DEFAULT_SIZE + @work_qs = (0...@tpsize).map do + Queue.new + end + @thread_pool = (0...@tpsize).map do |i| + Thread.new do + chunk = @work_qs[i].pop() + + # The following gross hack works around this gross bug in BSD tar that tarsnap inherits: + # https://github.com/Tarsnap/tarsnap/issues/329 + chunk.map! { |file| file.gsub(/([#{Regexp.escape(GLOB_CHARS)}])/) { |m| "\\#{m}" } } + command = [ TARSNAP, '-xpf', @archive, '--null', '-T', '-', *@options[:tarsnap_options] ] + Open3.popen3(*command) do |stin, _, err| + chunk.each do |file| + stin.write file + stin.putc 0 + end + stin.flush + stin.close + + while line = err.gets + next if line.end_with?(NOT_OLDER_ERROR) + if line == EXIT_ERROR + @error = true + next + end + @@output_mutex.synchronize { warn line.chomp } + end + end + end + end + @error = false end @@ -82,7 +113,7 @@ def files_to_extract end def file_groups - groups = (1..@thread_pool.max).map { Group.new } + groups = (1..@tpsize).map { Group.new } files_to_extract.sort { |a, b| b.last[:size] <=> a.last[:size] }.each do |name, props| # If the previous batch of files had an entry with the same size and date, @@ -98,24 +129,11 @@ def file_groups end def run - file_groups.each do |chunk| - @thread_pool.process do - chunk.map! { |file| file.gsub(/([#{Regexp.escape(GLOB_CHARS)}])/) { |m| "\\#{m}" } } - command = [ TARSNAP, '-xvf', @archive, *(@options[:tarsnap_options] + chunk) ] - Open3.popen3(*command) do |_, _, err| - while line = err.gets - next if line.end_with?(NOT_OLDER_ERROR) - if line == EXIT_ERROR - @error = true - next - end - @@output_mutex.synchronize { warn line.chomp } - end - end - end + file_groups.each_with_index do |chunk, idx| + @work_qs[idx].push chunk end - @thread_pool.shutdown + @thread_pool.map(&:join) @@output_mutex.synchronize { warn EXIT_ERROR } if @error end end