-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: Update Stream to accept a bufSize for tar CopyBuffer window #26964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
devanbenz
wants to merge
13
commits into
master-1.x
Choose a base branch
from
db/6518/backup-writer
base: master-1.x
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+101
−18
Open
Changes from 8 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
fcd3cae
feat: Update Stream to accept a bufSize for tar CopyBuffer window
devanbenz 9082c02
feat: Use a config value for setting tar stream buffer size
devanbenz ec4c30d
feat: Add value to sample config
devanbenz 40383be
feat: Use BufferSize instead of BufferWindow
devanbenz 36cde6d
feat: Adds test for for Stream
devanbenz d167345
feat: Use StreamRenameFile for our buffer testing
devanbenz 22da726
feat: formatting in config example toml
devanbenz 29d999b
feat: Add error message regarding bytesWritten for CopyBuffer
devanbenz 820a9f5
feat: Update bytesWritten handling
devanbenz 4d495d5
feat: Add method name to error
devanbenz 6783a6c
feat: I used a zero size buffer and this caused influxd to crash. Add…
devanbenz 08e88b9
feat: Create Validate() error for config
devanbenz 53efb45
feat: Update Validator error to include value
devanbenz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ import ( | |
| // possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files | ||
| // being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped | ||
| // depending on the custom logic. | ||
| func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error { | ||
| func Stream(w io.Writer, dir, relativePath string, bufSize uint64, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error) error { | ||
| tw := tar.NewWriter(w) | ||
| defer tw.Close() | ||
|
|
||
|
|
@@ -41,30 +41,31 @@ func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, | |
| return err | ||
| } | ||
|
|
||
| return writeFunc(f, filepath.Join(relativePath, subDir), path, tw) | ||
| return writeFunc(f, filepath.Join(relativePath, subDir), path, tw, bufSize) | ||
| }) | ||
| } | ||
|
|
||
| // Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if | ||
| // its mod time is later than since. Example: to tar only files newer than a certain datetime, use | ||
| // tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime)) | ||
| func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error { | ||
| return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error { | ||
| func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error { | ||
| return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error { | ||
| if f.ModTime().After(since) { | ||
| return StreamFile(f, shardRelativePath, fullPath, tw) | ||
| return StreamFile(f, shardRelativePath, fullPath, tw, bufSize) | ||
| } | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // stream a single file to tw, extending the header name using the shardRelativePath | ||
| func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error { | ||
| return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw) | ||
| func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error { | ||
| return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw, bufSize) | ||
| } | ||
|
|
||
| // / Stream a single file to tw, using tarHeaderFileName instead of the actual filename | ||
| // e.g., when we want to write a *.tmp file using the original file's non-tmp name. | ||
| func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer) error { | ||
| func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer, bufSize uint64) error { | ||
| buf := make([]byte, bufSize) | ||
| h, err := tar.FileInfoHeader(f, f.Name()) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -86,7 +87,10 @@ func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath s | |
|
|
||
| defer fr.Close() | ||
|
|
||
| _, err = io.CopyN(tw, fr, h.Size) | ||
| bytesWritten, err := io.CopyBuffer(tw, fr, buf) | ||
| if bytesWritten != h.Size { | ||
| return fmt.Errorf("error while copying buffer, expected %d bytes but wrote %d", h.Size, bytesWritten) | ||
|
||
| } | ||
gwossum marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return err | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package tar_test | ||
|
|
||
| import ( | ||
| "archive/tar" | ||
| "bytes" | ||
| "io" | ||
| "os" | ||
| "path/filepath" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
|
|
||
| pkgtar "github.com/influxdata/influxdb/pkg/tar" | ||
| ) | ||
|
|
||
| func TestStreamRenameWithBufSize(t *testing.T) { | ||
| dir := t.TempDir() | ||
| testFile := filepath.Join(dir, "test2.txt.tar") | ||
| testData := []byte("test data for buffer size") | ||
|
|
||
| testFileRename := "test.txt.tar" | ||
|
|
||
| require.NoError(t, os.WriteFile(testFile, testData, 0644)) | ||
|
|
||
| var buf bytes.Buffer | ||
| bufSize := uint64(1024 * 1024) | ||
|
|
||
| tw := tar.NewWriter(&buf) | ||
|
|
||
| f, err := os.Open(testFile) | ||
| require.NoError(t, err, "error opening testFile") | ||
| info, err := f.Stat() | ||
| require.NoError(t, err, "error stat testFile") | ||
|
|
||
| require.NoError(t, pkgtar.StreamRenameFile(info, testFileRename, "", testFile, tw, bufSize)) | ||
| require.NoError(t, tw.Close()) | ||
|
|
||
| tr := tar.NewReader(&buf) | ||
| hdr, err := tr.Next() | ||
| require.NoError(t, err) | ||
| require.Equal(t, testFileRename, hdr.Name) | ||
|
|
||
| content, err := io.ReadAll(tr) | ||
| require.NoError(t, err) | ||
| require.Equal(t, testData, content) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the tar buffer size is only used in the backup code paths. If this is correct, I wonder if
backup-stream-buffer-sizewould be a better name.