Skip to content

Commit fcd3cae

Browse files
committed
feat: Update Stream to accept a bufSize for tar CopyBuffer window
1 parent e30bad5 commit fcd3cae

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

pkg/tar/stream.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files
1717
// being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped
1818
// depending on the custom logic.
19-
func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error {
19+
func Stream(w io.Writer, dir, relativePath string, bufSize uint64, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error) error {
2020
tw := tar.NewWriter(w)
2121
defer tw.Close()
2222

@@ -41,30 +41,31 @@ func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo,
4141
return err
4242
}
4343

44-
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw)
44+
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw, bufSize)
4545
})
4646
}
4747

4848
// Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if
4949
// its mod time is later than since. Example: to tar only files newer than a certain datetime, use
5050
// tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime))
51-
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
52-
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
51+
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
52+
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
5353
if f.ModTime().After(since) {
54-
return StreamFile(f, shardRelativePath, fullPath, tw)
54+
return StreamFile(f, shardRelativePath, fullPath, tw, bufSize)
5555
}
5656
return nil
5757
}
5858
}
5959

6060
// stream a single file to tw, extending the header name using the shardRelativePath
61-
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
62-
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw)
61+
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
62+
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw, bufSize)
6363
}
6464

6565
// / Stream a single file to tw, using tarHeaderFileName instead of the actual filename
6666
// e.g., when we want to write a *.tmp file using the original file's non-tmp name.
67-
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer) error {
67+
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
68+
buf := make([]byte, bufSize)
6869
h, err := tar.FileInfoHeader(f, f.Name())
6970
if err != nil {
7071
return err
@@ -86,7 +87,7 @@ func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath s
8687

8788
defer fr.Close()
8889

89-
_, err = io.CopyN(tw, fr, h.Size)
90+
_, err = io.CopyBuffer(tw, fr, buf)
9091

9192
return err
9293
}

tsdb/engine/tsm1/engine.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ type Engine struct {
227227

228228
// muDigest ensures only one goroutine can generate a digest at a time.
229229
muDigest sync.RWMutex
230+
231+
// TarStreamBufferSize is the size used for our buffer windows within the tar package.
232+
TarStreamBufferSize uint64
230233
}
231234

232235
// NewEngine returns a new instance of Engine.
@@ -994,13 +997,13 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
994997
}
995998
}()
996999

997-
return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
1000+
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, intar.SinceFilterTarFile(since))
9981001
}
9991002

1000-
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
1001-
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
1003+
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
1004+
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
10021005
if !strings.HasSuffix(fi.Name(), ".tsm") {
1003-
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
1006+
return intar.StreamFile(fi, shardRelativePath, fullPath, tw, e.TarStreamBufferSize)
10041007
}
10051008

10061009
f, err := os.Open(fullPath)
@@ -1014,7 +1017,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
10141017

10151018
// Grab the tombstone file if one exists.
10161019
if ts := r.TombstoneStats(); ts.TombstoneExists {
1017-
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
1020+
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw, e.TarStreamBufferSize)
10181021
}
10191022

10201023
min, max := r.TimeRange()
@@ -1042,7 +1045,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
10421045

10431046
// the TSM file is 100% inside the range, so we can just write it without scanning each block
10441047
if min >= start.UnixNano() && max <= end.UnixNano() {
1045-
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw); err != nil {
1048+
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw, bufSize); err != nil {
10461049
return err
10471050
}
10481051
}
@@ -1062,7 +1065,7 @@ func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.
10621065
}
10631066
}()
10641067

1065-
return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
1068+
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, e.timeStampFilterTarFile(start, end))
10661069
}
10671070

10681071
func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
@@ -1117,7 +1120,7 @@ func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativeP
11171120
return err
11181121
}
11191122

1120-
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw)
1123+
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw, e.TarStreamBufferSize)
11211124
}
11221125

11231126
// Restore reads a tar archive generated by Backup().

0 commit comments

Comments
 (0)