-
Notifications
You must be signed in to change notification settings - Fork 153
Add multi-threading for pack/unpack #1029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
071d681
e40fe9d
fd121b9
33f1fd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "os" | ||
| "sort" | ||
| "time" | ||
|
|
||
| "github.com/kitops-ml/kitops/pkg/artifact" | ||
| "github.com/kitops-ml/kitops/pkg/lib/constants" | ||
|
|
@@ -32,6 +34,8 @@ import ( | |
| "github.com/kitops-ml/kitops/pkg/lib/repo/local" | ||
| "github.com/kitops-ml/kitops/pkg/lib/repo/util" | ||
| "github.com/kitops-ml/kitops/pkg/output" | ||
| "github.com/vbauerster/mpb/v8" | ||
| "golang.org/x/sync/errgroup" | ||
|
|
||
| "github.com/opencontainers/go-digest" | ||
| specs "github.com/opencontainers/image-spec/specs-go" | ||
|
|
@@ -136,71 +140,137 @@ func saveConfig(ctx context.Context, localRepo local.LocalRepo, kitfile *artifac | |
| } | ||
|
|
||
| func saveKitfileLayers(ctx context.Context, localRepo local.LocalRepo, kitfile *artifact.KitFile, ignore ignore.Paths, opts *SaveModelOptions) (layers []ocispec.Descriptor, diffIDs []digest.Digest, err error) { | ||
|
|
||
| var ( | ||
| eg, egCtx = errgroup.WithContext(ctx) | ||
| ) | ||
|
|
||
| type layerResult struct { | ||
| layer ocispec.Descriptor | ||
| diffID digest.Digest | ||
| index int | ||
| } | ||
|
|
||
| results := make(chan layerResult, 100) | ||
| currentIndex := 0 | ||
| progress := mpb.New( | ||
| mpb.WithWidth(60), | ||
| mpb.WithRefreshRate(150*time.Millisecond), | ||
| ) | ||
|
|
||
| if kitfile.Model != nil { | ||
| if kitfile.Model.Path != "" && !util.IsModelKitReference(kitfile.Model.Path) { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(ctx, localRepo, kitfile.Model.Path, mediaType, ignore) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| layers = append(layers, layer) | ||
| diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) | ||
| kitfile.Model.LayerInfo = layerInfo | ||
| currentIndexCopy := currentIndex | ||
| currentIndex++ | ||
| eg.Go(func() error { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(egCtx, localRepo, kitfile.Model.Path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| kitfile.Model.LayerInfo = layerInfo | ||
| results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} | ||
| return nil | ||
| }) | ||
| } | ||
| for idx, part := range kitfile.Model.Parts { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(ctx, localRepo, part.Path, mediaType, ignore) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| layers = append(layers, layer) | ||
| diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) | ||
| kitfile.Model.Parts[idx].LayerInfo = layerInfo | ||
| currentIndexCopy := currentIndex | ||
| currentIndex++ | ||
| index := idx | ||
| path := part.Path | ||
| eg.Go(func() error { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| kitfile.Model.Parts[index].LayerInfo = layerInfo | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is introducing concurrent writes to shared Kitfile fields without synchronization, causing potential data races and corruption. The same pattern is repeated on other layers too. |
||
| results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} | ||
| return nil | ||
| }) | ||
| } | ||
| } | ||
| for idx, code := range kitfile.Code { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(ctx, localRepo, code.Path, mediaType, ignore) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| layers = append(layers, layer) | ||
| diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) | ||
| kitfile.Code[idx].LayerInfo = layerInfo | ||
| currentIndexCopy := currentIndex | ||
| currentIndex++ | ||
| index := idx | ||
| path := code.Path | ||
| eg.Go(func() error { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| kitfile.Code[index].LayerInfo = layerInfo | ||
| results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} | ||
| return nil | ||
| }) | ||
| } | ||
| for idx, dataset := range kitfile.DataSets { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(ctx, localRepo, dataset.Path, mediaType, ignore) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| layers = append(layers, layer) | ||
| diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) | ||
| kitfile.DataSets[idx].LayerInfo = layerInfo | ||
| currentIndexCopy := currentIndex | ||
| currentIndex++ | ||
| index := idx | ||
| path := dataset.Path | ||
| eg.Go(func() error { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| kitfile.DataSets[index].LayerInfo = layerInfo | ||
| results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} | ||
| return nil | ||
| }) | ||
| } | ||
| for idx, docs := range kitfile.Docs { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(ctx, localRepo, docs.Path, mediaType, ignore) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| layers = append(layers, layer) | ||
| diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) | ||
| kitfile.Docs[idx].LayerInfo = layerInfo | ||
| currentIndexCopy := currentIndex | ||
| currentIndex++ | ||
| index := idx | ||
| path := docs.Path | ||
| eg.Go(func() error { | ||
| mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression) | ||
| layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| kitfile.Docs[index].LayerInfo = layerInfo | ||
| results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| if err = eg.Wait(); err != nil { | ||
| close(results) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This closes the results channel and returns. However, some goroutines might still be running and will panic when trying to send to the closed channel. |
||
| return nil, nil, err | ||
| } | ||
|
|
||
| resultsStore := []layerResult{} | ||
| close(results) | ||
| for j := range results { | ||
| resultsStore = append(resultsStore, j) | ||
| } | ||
|
|
||
| sort.SliceStable(resultsStore, func(i, j int) bool { | ||
| return resultsStore[i].index < resultsStore[j].index | ||
| }) | ||
|
|
||
| for j := range resultsStore { | ||
| layers = append(layers, resultsStore[j].layer) | ||
| diffIDs = append(diffIDs, resultsStore[j].diffID) | ||
| } | ||
|
|
||
| return layers, diffIDs, nil | ||
| return layers, diffIDs, err | ||
| } | ||
|
|
||
| func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths) (ocispec.Descriptor, *artifact.LayerInfo, error) { | ||
| func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths, progress *mpb.Progress) (ocispec.Descriptor, *artifact.LayerInfo, error) { | ||
| // We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress | ||
| // to a temporary file. Ideally, we can also add this to the internal store by moving the file to avoid | ||
| // copying if possible. | ||
| if mediaType.Format() != mediatype.TarFormat { | ||
| // TODO: Add support for ModelPack's "raw" layer type | ||
| return ocispec.DescriptorEmptyJSON, nil, fmt.Errorf("Only tar-formatted layers are currently supported") | ||
| } | ||
| tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore) | ||
| tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore, progress) | ||
| if err != nil { | ||
| return ocispec.DescriptorEmptyJSON, nil, err | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saveContentLayer->packLayerToTar->writeLayerToTarwriteLayerToTar callsignore.Matches` for each file With the mutex, all file traversal becomes serialized. Parallel implementation may be SLOWER than sequential due to lock contention.