Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def configure(conf)

begin
buffer_type = @buffer_config[:@type]
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, buffer_compressed_type: @buffer.compress, log: log)
rescue => e
log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
@compressor = TextCompressor.new
Expand Down Expand Up @@ -600,6 +600,7 @@ class Compressor
def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
@buffer_compressed_type = opts[:buffer_compressed_type]
@log = opts[:log]
end

Expand Down Expand Up @@ -642,11 +643,18 @@ def content_type
end

def compress(chunk, tmp)
if @buffer_compressed_type == :gzip
chunk.write_to(tmp, compressed: @buffer_compressed_type)
return
end

w = Zlib::GzipWriter.new(tmp)
chunk.write_to(w)
w.finish
ensure
w.finish rescue nil
if w
w.finish rescue nil
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/plugin/s3_compressor_gzip_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def content_type
end

def compress(chunk, tmp)
if @buffer_compressed_type == :gzip
chunk.write_to(tmp, compressed: @buffer_compressed_type)
return
end

chunk_is_file = @buffer_type == 'file'
path = if chunk_is_file
chunk.path
Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def content_type
end

def compress(chunk, tmp)
if @buffer_compressed_type == :zstd
chunk.write_to(tmp, compressed: @buffer_compressed_type)
return
end

compressed = Zstd.compress(chunk.read, level: @compress_config.level)
tmp.write(compressed)
rescue => e
Expand Down
120 changes: 120 additions & 0 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'fluent/test/helpers'
require 'fluent/test/log'
require 'fluent/test/driver/output'
require 'fluent/version'
require 'aws-sdk-s3'
require 'fluent/plugin/out_s3'

Expand Down Expand Up @@ -495,6 +496,125 @@ def test_write_with_zstd
FileUtils.rm_f(s3_local_file_path)
end

def test_no_compress_by_gzip_with_compressed_buffer
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.gz"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)

d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
<buffer tag,time>
@type memory
compress gzip
timekey 3600
timekey_use_utc true
</buffer>
EOF

# GzipCompressor should not use Zlib::GzipWriter
dont_allow(Zlib::GzipWriter).new

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
end

data = ""
File.open(s3_local_file_path, "rb") do |f|
until f.eof?
gz = Zlib::GzipReader.new(f)
data << gz.read

unused = gz.unused
gz.finish
f.pos -= unused.length if unused
end
end
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
FileUtils.rm_f(s3_local_file_path)
end

def test_no_compress_by_gzip_command_with_compressed_buffer
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.gz"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)

d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
store_as gzip_command
<buffer tag,time>
@type memory
compress gzip
timekey 3600
timekey_use_utc true
</buffer>
EOF

# GzipCommandCompressor should not use Kernel.system and Zlib::GzipWriter
dont_allow(Kernel).system
dont_allow(Zlib::GzipWriter).new

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
end

data = ""
File.open(s3_local_file_path, "rb") do |f|
until f.eof?
gz = Zlib::GzipReader.new(f)
data << gz.read

unused = gz.unused
gz.finish
f.pos -= unused.length if unused
end
end
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
FileUtils.rm_f(s3_local_file_path)
end

def test_no_compress_by_zstd_with_compressed_buffer
pend "This test require Fluentd 1.19.0 or later which supports zstd buffer compressed." if Gem::Version.new(Fluent::VERSION) < Gem::Version.new('1.19.0')

setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.zst"
expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path)

d = create_time_sliced_driver(CONFIG_TIME_SLICE + <<~EOF)
store_as zstd
<buffer tag,time>
@type memory
compress zstd
timekey 3600
timekey_use_utc true
</buffer>
EOF

# ZstdCompressor should not use Zstd.compress
dont_allow(Zstd).compress

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
end

File.open(s3_local_file_path, 'rb') do |file|
compressed_data = file.read
uncompressed_data = Zstd.decompress(compressed_data)
expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
assert_equal expected_data, uncompressed_data
end
FileUtils.rm_f(s3_local_file_path)
end

class MockResponse
attr_reader :data

Expand Down