Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@
# increase in cache size may lead to an increase in heap usage.
series-id-set-cache-size = 100

# The size of tar buffer window size in bytes while running tar
# streaming operations such as renaming and copying tar files during backups.
# The default value is 1MB. This should only change if backups are having performance issues
# and you understand that this value is a heuristic that may need to be tweaked.
# tar-stream-buffer-size = 1048576
Comment on lines +170 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the tar buffer size is only used in the backup code paths. If this is correct, I wonder if backup-stream-buffer-size would be a better name.


###
### [coordinator]
###
Expand Down
31 changes: 21 additions & 10 deletions pkg/tar/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files
// being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped
// depending on the custom logic.
func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error {
func Stream(w io.Writer, dir, relativePath string, bufSize uint64, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error) error {
tw := tar.NewWriter(w)
defer tw.Close()

Expand All @@ -41,30 +41,36 @@ func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo,
return err
}

return writeFunc(f, filepath.Join(relativePath, subDir), path, tw)
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw, bufSize)
})
}

// Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if
// its mod time is later than since. Example: to tar only files newer than a certain datetime, use
// tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime))
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
if f.ModTime().After(since) {
return StreamFile(f, shardRelativePath, fullPath, tw)
return StreamFile(f, shardRelativePath, fullPath, tw, bufSize)
}
return nil
}
}

// stream a single file to tw, extending the header name using the shardRelativePath
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw)
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw, bufSize)
}

// / Stream a single file to tw, using tarHeaderFileName instead of the actual filename
// e.g., when we want to write a *.tmp file using the original file's non-tmp name.
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer) error {
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
// We NEVER want this to be 0 (or less). This is just a safety harness.
// influxd will panic if we pass a 0 size buffer into CopyBuffer.
if bufSize <= 0 {
bufSize = 1024 * 1024 // 1MB
}
buf := make([]byte, bufSize)
h, err := tar.FileInfoHeader(f, f.Name())
if err != nil {
return err
Expand All @@ -86,9 +92,14 @@ func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath s

defer fr.Close()

_, err = io.CopyN(tw, fr, h.Size)
bytesWritten, err := io.CopyBuffer(tw, fr, buf)
if err != nil {
return err
} else if bytesWritten != h.Size {
return fmt.Errorf("StreamRenameFile: Error while copying buffer, expected %d bytes but wrote %d.", h.Size, bytesWritten)
}

return err
return nil
}

// Restore reads a tar archive from r and extracts all of its files into dir,
Expand Down
46 changes: 46 additions & 0 deletions pkg/tar/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package tar_test

import (
"archive/tar"
"bytes"
"io"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

pkgtar "github.com/influxdata/influxdb/pkg/tar"
)

func TestStreamRenameWithBufSize(t *testing.T) {
dir := t.TempDir()
testFile := filepath.Join(dir, "test2.txt.tar")
testData := []byte("test data for buffer size")

testFileRename := "test.txt.tar"

require.NoError(t, os.WriteFile(testFile, testData, 0644))

var buf bytes.Buffer
bufSize := uint64(1024 * 1024)

tw := tar.NewWriter(&buf)

f, err := os.Open(testFile)
require.NoError(t, err, "error opening testFile")
info, err := f.Stat()
require.NoError(t, err, "error stat testFile")

require.NoError(t, pkgtar.StreamRenameFile(info, testFileRename, "", testFile, tw, bufSize))
require.NoError(t, tw.Close())

tr := tar.NewReader(&buf)
hdr, err := tr.Next()
require.NoError(t, err)
require.Equal(t, testFileRename, hdr.Name)

content, err := io.ReadAll(tr)
require.NoError(t, err)
require.Equal(t, testData, content)
}
15 changes: 15 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (

// MaxTSMFileSize is the maximum size of TSM files.
MaxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB

// DefaultTarStreamBufferSize is the default window size to use during tar file streaming.
// This impacts backups and should only be modified if backups are having performance issues.
DefaultTarStreamBufferSize = uint64(1024 * 1024) // 1MB
)

var SingleGenerationReasonText string = SingleGenerationReason()
Expand Down Expand Up @@ -186,6 +190,11 @@ type Config struct {
// been found to be problematic in some cases. It may help users who have
// slow disks.
TSMWillNeed bool `toml:"tsm-use-madv-willneed"`

// TarStreamBufferSize is the size of tar buffer window size in bytes while running tar
// streaming operations such as renaming and copying tar files during backups.
// The default value is 1MB. This should only change if backups are having performance issues.
TarStreamBufferSize uint64 `toml:"tar-stream-buffer-size"`
}

// NewConfig returns the default configuration for tsdb.
Expand Down Expand Up @@ -217,6 +226,8 @@ func NewConfig() Config {

TraceLoggingEnabled: false,
TSMWillNeed: false,

TarStreamBufferSize: DefaultTarStreamBufferSize,
}
}

Expand Down Expand Up @@ -266,6 +277,10 @@ func (c *Config) Validate() error {
return fmt.Errorf("unrecognized index %s", c.Index)
}

if c.TarStreamBufferSize <= 0 {
return fmt.Errorf("tar-stream-buffer-size cannot be zero and must be non-negative")
}

return nil
}

Expand Down
21 changes: 13 additions & 8 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ type Engine struct {

// muDigest ensures only one goroutine can generate a digest at a time.
muDigest sync.RWMutex

// TarStreamBufferSize is the size used for our buffer windows within the tar package.
TarStreamBufferSize uint64
}

// NewEngine returns a new instance of Engine.
Expand Down Expand Up @@ -286,6 +289,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
optimizedCompactionLimiter: opt.OptimizedCompactionLimiter,
Scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,

TarStreamBufferSize: opt.Config.TarStreamBufferSize,
}

// Feature flag to enable per-series type checking, by default this is off and
Expand Down Expand Up @@ -994,13 +999,13 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
}
}()

return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, intar.SinceFilterTarFile(since))
}

func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
if !strings.HasSuffix(fi.Name(), ".tsm") {
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
return intar.StreamFile(fi, shardRelativePath, fullPath, tw, e.TarStreamBufferSize)
}

f, err := os.Open(fullPath)
Expand All @@ -1014,7 +1019,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo

// Grab the tombstone file if one exists.
if ts := r.TombstoneStats(); ts.TombstoneExists {
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw, e.TarStreamBufferSize)
}

min, max := r.TimeRange()
Expand Down Expand Up @@ -1042,7 +1047,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo

// the TSM file is 100% inside the range, so we can just write it without scanning each block
if min >= start.UnixNano() && max <= end.UnixNano() {
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw); err != nil {
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw, bufSize); err != nil {
return err
}
}
Expand All @@ -1062,7 +1067,7 @@ func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.
}
}()

return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, e.timeStampFilterTarFile(start, end))
}

func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
Expand Down Expand Up @@ -1117,7 +1122,7 @@ func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativeP
return err
}

return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw)
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw, e.TarStreamBufferSize)
}

// Restore reads a tar archive generated by Backup().
Expand Down