diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index c631572..4bb90fb 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -203,7 +203,7 @@ def configure(conf) begin buffer_type = @buffer_config[:@type] - @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log) + @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, buffer_compressed_type: @buffer.compress, log: log) rescue => e log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}" @compressor = TextCompressor.new @@ -600,6 +600,7 @@ class Compressor def initialize(opts = {}) super() @buffer_type = opts[:buffer_type] + @buffer_compressed_type = opts[:buffer_compressed_type] @log = opts[:log] end @@ -642,11 +643,18 @@ def content_type end def compress(chunk, tmp) + if @buffer_compressed_type == :gzip + chunk.write_to(tmp, compressed: @buffer_compressed_type) + return + end + w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish ensure - w.finish rescue nil + if w + w.finish rescue nil + end end end diff --git a/lib/fluent/plugin/s3_compressor_gzip_command.rb b/lib/fluent/plugin/s3_compressor_gzip_command.rb index 1fbc04b..fafd8c3 100644 --- a/lib/fluent/plugin/s3_compressor_gzip_command.rb +++ b/lib/fluent/plugin/s3_compressor_gzip_command.rb @@ -19,6 +19,11 @@ def content_type end def compress(chunk, tmp) + if @buffer_compressed_type == :gzip + chunk.write_to(tmp, compressed: @buffer_compressed_type) + return + end + chunk_is_file = @buffer_type == 'file' path = if chunk_is_file chunk.path diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb index dc48519..03a8d43 100644 --- a/lib/fluent/plugin/s3_compressor_zstd.rb +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -25,6 +25,11 @@ def content_type end def compress(chunk, tmp) + if @buffer_compressed_type == :zstd + chunk.write_to(tmp, compressed: @buffer_compressed_type) + return + end + compressed = Zstd.compress(chunk.read, level: @compress_config.level) tmp.write(compressed) rescue => e diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index fbb2945..6d9bf9b 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -2,6 +2,7 @@ require 'fluent/test/helpers' require 'fluent/test/log' require 'fluent/test/driver/output' +require 'fluent/version' require 'aws-sdk-s3' require 'fluent/plugin/out_s3' @@ -495,6 +496,125 @@ def test_write_with_zstd FileUtils.rm_f(s3_local_file_path) end + def test_no_compress_by_gzip_with_compressed_buffer + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.gz" + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path) + + d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF) + + @type memory + compress gzip + timekey 3600 + timekey_use_utc true + + EOF + + # GzipCompressor should not use Zlib::GzipWriter + dont_allow(Zlib::GzipWriter).new + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + + data = "" + File.open(s3_local_file_path, "rb") do |f| + until f.eof? + gz = Zlib::GzipReader.new(f) + data << gz.read + + unused = gz.unused + gz.finish + f.pos -= unused.length if unused + end + end + assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], + data + FileUtils.rm_f(s3_local_file_path) + end + + def test_no_compress_by_gzip_command_with_compressed_buffer + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.gz" + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path) + + d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF) + store_as gzip_command + + @type memory + compress gzip + timekey 3600 + timekey_use_utc true + + EOF + + # GzipCommandCompressor should not use Kernel.system and Zlib::GzipWriter + dont_allow(Kernel).system + dont_allow(Zlib::GzipWriter).new + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + + data = "" + File.open(s3_local_file_path, "rb") do |f| + until f.eof? + gz = Zlib::GzipReader.new(f) + data << gz.read + + unused = gz.unused + gz.finish + f.pos -= unused.length if unused + end + end + assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], + data + FileUtils.rm_f(s3_local_file_path) + end + + def test_no_compress_by_zstd_with_compressed_buffer + pend "This test require Fluentd 1.19.0 or later which supports zstd buffer compressed." if Gem::Version.new(Fluent::VERSION) < Gem::Version.new('1.19.0') + + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.zst" + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) + + d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF) + store_as zstd + + @type memory + compress zstd + timekey 3600 + timekey_use_utc true + + EOF + + # ZstdCompressor should not use Zstd.compress + dont_allow(Zstd).compress + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + + File.open(s3_local_file_path, 'rb') do |file| + compressed_data = file.read + uncompressed_data = Zstd.decompress(compressed_data) + expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + assert_equal expected_data, uncompressed_data + end + FileUtils.rm_f(s3_local_file_path) + end + class MockResponse attr_reader :data