diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb
index c631572..4bb90fb 100644
--- a/lib/fluent/plugin/out_s3.rb
+++ b/lib/fluent/plugin/out_s3.rb
@@ -203,7 +203,7 @@ def configure(conf)
begin
buffer_type = @buffer_config[:@type]
- @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
+ @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, buffer_compressed_type: @buffer.compress, log: log)
rescue => e
log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
@compressor = TextCompressor.new
@@ -600,6 +600,7 @@ class Compressor
def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
+ @buffer_compressed_type = opts[:buffer_compressed_type]
@log = opts[:log]
end
@@ -642,11 +643,18 @@ def content_type
end
def compress(chunk, tmp)
+ if @buffer_compressed_type == :gzip
+ chunk.write_to(tmp, compressed: @buffer_compressed_type)
+ return
+ end
+
w = Zlib::GzipWriter.new(tmp)
chunk.write_to(w)
w.finish
ensure
- w.finish rescue nil
+ if w
+ w.finish rescue nil
+ end
end
end
diff --git a/lib/fluent/plugin/s3_compressor_gzip_command.rb b/lib/fluent/plugin/s3_compressor_gzip_command.rb
index 1fbc04b..fafd8c3 100644
--- a/lib/fluent/plugin/s3_compressor_gzip_command.rb
+++ b/lib/fluent/plugin/s3_compressor_gzip_command.rb
@@ -19,6 +19,11 @@ def content_type
end
def compress(chunk, tmp)
+ if @buffer_compressed_type == :gzip
+ chunk.write_to(tmp, compressed: @buffer_compressed_type)
+ return
+ end
+
chunk_is_file = @buffer_type == 'file'
path = if chunk_is_file
chunk.path
diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb
index dc48519..03a8d43 100644
--- a/lib/fluent/plugin/s3_compressor_zstd.rb
+++ b/lib/fluent/plugin/s3_compressor_zstd.rb
@@ -25,6 +25,11 @@ def content_type
end
def compress(chunk, tmp)
+ if @buffer_compressed_type == :zstd
+ chunk.write_to(tmp, compressed: @buffer_compressed_type)
+ return
+ end
+
compressed = Zstd.compress(chunk.read, level: @compress_config.level)
tmp.write(compressed)
rescue => e
diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb
index fbb2945..6d9bf9b 100644
--- a/test/test_out_s3.rb
+++ b/test/test_out_s3.rb
@@ -2,6 +2,7 @@
require 'fluent/test/helpers'
require 'fluent/test/log'
require 'fluent/test/driver/output'
+require 'fluent/version'
require 'aws-sdk-s3'
require 'fluent/plugin/out_s3'
@@ -495,6 +496,125 @@ def test_write_with_zstd
FileUtils.rm_f(s3_local_file_path)
end
+ def test_no_compress_by_gzip_with_compressed_buffer
+ setup_mocks(true)
+ s3_local_file_path = "/tmp/s3-test.gz"
+ setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)
+
+ d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
+
+ @type memory
+ compress gzip
+ timekey 3600
+ timekey_use_utc true
+
+ EOF
+
+ # GzipCompressor should not use Zlib::GzipWriter
+ dont_allow(Zlib::GzipWriter).new
+
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+
+ data = ""
+ File.open(s3_local_file_path, "rb") do |f|
+ until f.eof?
+ gz = Zlib::GzipReader.new(f)
+ data << gz.read
+
+ unused = gz.unused
+ gz.finish
+ f.pos -= unused.length if unused
+ end
+ end
+ assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
+ %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
+ data
+ FileUtils.rm_f(s3_local_file_path)
+ end
+
+ def test_no_compress_by_gzip_command_with_compressed_buffer
+ setup_mocks(true)
+ s3_local_file_path = "/tmp/s3-test.gz"
+ setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)
+
+ d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
+ store_as gzip_command
+
+ @type memory
+ compress gzip
+ timekey 3600
+ timekey_use_utc true
+
+ EOF
+
+ # GzipCommandCompressor should not use Kernel.system and Zlib::GzipWriter
+ dont_allow(Kernel).system
+ dont_allow(Zlib::GzipWriter).new
+
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+
+ data = ""
+ File.open(s3_local_file_path, "rb") do |f|
+ until f.eof?
+ gz = Zlib::GzipReader.new(f)
+ data << gz.read
+
+ unused = gz.unused
+ gz.finish
+ f.pos -= unused.length if unused
+ end
+ end
+ assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
+ %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
+ data
+ FileUtils.rm_f(s3_local_file_path)
+ end
+
+ def test_no_compress_by_zstd_with_compressed_buffer
+ pend "This test require Fluentd 1.19.0 or later which supports zstd buffer compressed." if Gem::Version.new(Fluent::VERSION) < Gem::Version.new('1.19.0')
+
+ setup_mocks(true)
+ s3_local_file_path = "/tmp/s3-test.zst"
+ expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst"
+ setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path)
+
+ d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
+ store_as zstd
+
+ @type memory
+ compress zstd
+ timekey 3600
+ timekey_use_utc true
+
+ EOF
+
+ # ZstdCompressor should not use Zstd.compress
+ dont_allow(Zstd).compress
+
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+
+ File.open(s3_local_file_path, 'rb') do |file|
+ compressed_data = file.read
+ uncompressed_data = Zstd.decompress(compressed_data)
+ expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
+ %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
+ assert_equal expected_data, uncompressed_data
+ end
+ FileUtils.rm_f(s3_local_file_path)
+ end
+
class MockResponse
attr_reader :data