diff --git a/pkg/lib/filesystem/ignore/ignore.go b/pkg/lib/filesystem/ignore/ignore.go index 1217de6e..009fcef4 100644 --- a/pkg/lib/filesystem/ignore/ignore.go +++ b/pkg/lib/filesystem/ignore/ignore.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "strings" + "sync" kfutils "github.com/kitops-ml/kitops/pkg/lib/repo/util" @@ -63,9 +64,12 @@ func New(kitIgnorePaths []string, kitfile *artifact.KitFile, extraLayers ...stri type ignorePaths struct { ignoreFileMatcher *patternmatcher.PatternMatcher layers []string + mu sync.Mutex } func (pm *ignorePaths) Matches(path, layerPath string) (bool, error) { + pm.mu.Lock() + defer pm.mu.Unlock() path = cleanPath(path) layerPath = cleanPath(layerPath) ignoreFileMatches, err := pm.ignoreFileMatcher.MatchesOrParentMatches(path) diff --git a/pkg/lib/filesystem/local-storage.go b/pkg/lib/filesystem/local-storage.go index d55e1a24..d7e742a1 100644 --- a/pkg/lib/filesystem/local-storage.go +++ b/pkg/lib/filesystem/local-storage.go @@ -23,6 +23,8 @@ import ( "errors" "fmt" "os" + "sort" + "time" "github.com/kitops-ml/kitops/pkg/artifact" "github.com/kitops-ml/kitops/pkg/lib/constants" @@ -32,6 +34,8 @@ import ( "github.com/kitops-ml/kitops/pkg/lib/repo/local" "github.com/kitops-ml/kitops/pkg/lib/repo/util" "github.com/kitops-ml/kitops/pkg/output" + "github.com/vbauerster/mpb/v8" + "golang.org/x/sync/errgroup" "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go" @@ -136,63 +140,129 @@ func saveConfig(ctx context.Context, localRepo local.LocalRepo, kitfile *artifac } func saveKitfileLayers(ctx context.Context, localRepo local.LocalRepo, kitfile *artifact.KitFile, ignore ignore.Paths, opts *SaveModelOptions) (layers []ocispec.Descriptor, diffIDs []digest.Digest, err error) { + + var ( + eg, egCtx = errgroup.WithContext(ctx) + ) + + type layerResult struct { + layer ocispec.Descriptor + diffID digest.Digest + index int + } + + results := make(chan layerResult, 100) + currentIndex := 0 + progress := mpb.New( + mpb.WithWidth(60), + mpb.WithRefreshRate(150*time.Millisecond), + ) + if kitfile.Model != nil { if kitfile.Model.Path != "" && !util.IsModelKitReference(kitfile.Model.Path) { - mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression) - layer, layerInfo, err := saveContentLayer(ctx, localRepo, kitfile.Model.Path, mediaType, ignore) - if err != nil { - return nil, nil, err - } - layers = append(layers, layer) - diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) - kitfile.Model.LayerInfo = layerInfo + currentIndexCopy := currentIndex + currentIndex++ + eg.Go(func() error { + mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression) + layer, layerInfo, err := saveContentLayer(egCtx, localRepo, kitfile.Model.Path, mediaType, ignore, progress) + if err != nil { + return err + } + kitfile.Model.LayerInfo = layerInfo + results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} + return nil + }) } for idx, part := range kitfile.Model.Parts { - mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression) - layer, layerInfo, err := saveContentLayer(ctx, localRepo, part.Path, mediaType, ignore) - if err != nil { - return nil, nil, err - } - layers = append(layers, layer) - diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) - kitfile.Model.Parts[idx].LayerInfo = layerInfo + currentIndexCopy := currentIndex + currentIndex++ + index := idx + path := part.Path + eg.Go(func() error { + mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression) + layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) + if err != nil { + return err + } + kitfile.Model.Parts[index].LayerInfo = layerInfo + results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} + return nil + }) } } for idx, code := range kitfile.Code { - mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression) - layer, layerInfo, err := saveContentLayer(ctx, localRepo, code.Path, mediaType, ignore) - if err != nil { - return nil, nil, err - } - layers = append(layers, layer) - diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) - kitfile.Code[idx].LayerInfo = layerInfo + currentIndexCopy := currentIndex + currentIndex++ + index := idx + path := code.Path + eg.Go(func() error { + mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression) + layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) + if err != nil { + return err + } + kitfile.Code[index].LayerInfo = layerInfo + results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} + return nil + }) } for idx, dataset := range kitfile.DataSets { - mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression) - layer, layerInfo, err := saveContentLayer(ctx, localRepo, dataset.Path, mediaType, ignore) - if err != nil { - return nil, nil, err - } - layers = append(layers, layer) - diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) - kitfile.DataSets[idx].LayerInfo = layerInfo + currentIndexCopy := currentIndex + currentIndex++ + index := idx + path := dataset.Path + eg.Go(func() error { + mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression) + layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) + if err != nil { + return err + } + kitfile.DataSets[index].LayerInfo = layerInfo + results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} + return nil + }) } for idx, docs := range kitfile.Docs { - mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression) - layer, layerInfo, err := saveContentLayer(ctx, localRepo, docs.Path, mediaType, ignore) - if err != nil { - return nil, nil, err - } - layers = append(layers, layer) - diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId)) - kitfile.Docs[idx].LayerInfo = layerInfo + currentIndexCopy := currentIndex + currentIndex++ + index := idx + path := docs.Path + eg.Go(func() error { + mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression) + layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress) + if err != nil { + return err + } + kitfile.Docs[index].LayerInfo = layerInfo + results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy} + return nil + }) + } + + if err = eg.Wait(); err != nil { + close(results) + return nil, nil, err + } + + resultsStore := []layerResult{} + close(results) + for j := range results { + resultsStore = append(resultsStore, j) + } + + sort.SliceStable(resultsStore, func(i, j int) bool { + return resultsStore[i].index < resultsStore[j].index + }) + + for j := range resultsStore { + layers = append(layers, resultsStore[j].layer) + diffIDs = append(diffIDs, resultsStore[j].diffID) } - return layers, diffIDs, nil + return layers, diffIDs, err } -func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths) (ocispec.Descriptor, *artifact.LayerInfo, error) { +func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths, progress *mpb.Progress) (ocispec.Descriptor, *artifact.LayerInfo, error) { // We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress // to a temporary file. Ideally, we can also add this to the internal store by moving the file to avoid // copying if possible. @@ -200,7 +270,7 @@ func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path strin // TODO: Add support for ModelPack's "raw" layer type return ocispec.DescriptorEmptyJSON, nil, fmt.Errorf("Only tar-formatted layers are currently supported") } - tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore) + tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore, progress) if err != nil { return ocispec.DescriptorEmptyJSON, nil, err } diff --git a/pkg/lib/filesystem/tar.go b/pkg/lib/filesystem/tar.go index 06a13c17..5108f7f7 100644 --- a/pkg/lib/filesystem/tar.go +++ b/pkg/lib/filesystem/tar.go @@ -32,6 +32,7 @@ import ( "github.com/kitops-ml/kitops/pkg/lib/filesystem/cache" "github.com/kitops-ml/kitops/pkg/lib/filesystem/ignore" "github.com/kitops-ml/kitops/pkg/output" + "github.com/vbauerster/mpb/v8" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -41,7 +42,7 @@ import ( // a descriptor (including hash) for the compressed file, the layer is saved to a temporary file // on disk and must be moved to an appropriate location. It is the responsibility of the caller // to clean up the temporary file when it is no longer needed. -func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Paths) (tempFilePath string, desc ocispec.Descriptor, layerInfo *artifact.LayerInfo, err error) { +func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Paths, progress *mpb.Progress) (tempFilePath string, desc ocispec.Descriptor, layerInfo *artifact.LayerInfo, err error) { // Clean path to ensure consistent format (./path vs path/ vs path) path = filepath.Clean(path) @@ -92,7 +93,7 @@ func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Pa default: return "", ocispec.DescriptorEmptyJSON, nil, fmt.Errorf("Unsupported compression format: %s", mediaType.Compression()) } - progressTarWriter, plog := output.TarProgress(totalSize, tarWriter) + progressTarWriter, plog := output.TarProgress(totalSize, tarWriter, progress) if err := writeLayerToTar(path, ignore, progressTarWriter, plog); err != nil { // Don't care about these errors since we'll be deleting the file anyways diff --git a/pkg/lib/filesystem/unpack/core.go b/pkg/lib/filesystem/unpack/core.go index a45d3756..d39970d1 100644 --- a/pkg/lib/filesystem/unpack/core.go +++ b/pkg/lib/filesystem/unpack/core.go @@ -27,6 +27,7 @@ import ( "path/filepath" "slices" "strings" + "time" "github.com/kitops-ml/kitops/pkg/artifact" "github.com/kitops-ml/kitops/pkg/lib/constants" @@ -34,6 +35,8 @@ import ( "github.com/kitops-ml/kitops/pkg/lib/filesystem" "github.com/kitops-ml/kitops/pkg/lib/repo/util" "github.com/kitops-ml/kitops/pkg/output" + "github.com/vbauerster/mpb/v8" + "golang.org/x/sync/errgroup" modelspecv1 "github.com/modelpack/model-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -67,6 +70,9 @@ func UnpackModelKit(ctx context.Context, opts *UnpackOptions) error { } func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []string) error { + + eg, egCtx := errgroup.WithContext(ctx) + if len(visitedRefs) > constants.MaxModelRefChain { return fmt.Errorf("reached maximum number of model references: [%s]", strings.Join(visitedRefs, "=>")) } @@ -119,7 +125,13 @@ func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []str // We need to support older ModelKits (that were packed without diffIDs and digest // in the config) for now, so we need to continue using the old structure. var modelPartIdx, codeIdx, datasetIdx, docsIdx int - for _, layerDesc := range manifest.Layers { + + progress := mpb.New( + mpb.WithWidth(60), + mpb.WithRefreshRate(150*time.Millisecond), + ) + for index := range manifest.Layers { + layerDesc := manifest.Layers[index] // This variable supports older-format tar layers (that don't include the // layer path). For current ModelKits, this will be empty var relPath string @@ -207,10 +219,21 @@ func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []str } } + layerDescCopy := layerDesc + relPathCopy := relPath + compressionCopy := mediaType.Compression() + // TODO: handle DiffIDs when unpacking layers - if err := unpackLayer(ctx, store, layerDesc, relPath, opts.Overwrite, opts.IgnoreExisting, mediaType.Compression()); err != nil { - return fmt.Errorf("failed to unpack: %w", err) - } + eg.Go(func() error { + if err := unpackLayer(egCtx, store, layerDescCopy, relPathCopy, opts.Overwrite, opts.IgnoreExisting, compressionCopy, progress); err != nil { + return fmt.Errorf("failed to unpack: %w", err) + } + return nil + }) + + } + if err = eg.Wait(); err != nil { + return err } output.Debugf("Unpacked %d model part layers", modelPartIdx) output.Debugf("Unpacked %d code layers", codeIdx) @@ -294,13 +317,13 @@ func unpackConfig(config *artifact.KitFile, unpackDir string, overwrite bool) er return nil } -func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite, ignoreExisting bool, compression mediatype.CompressionType) error { +func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite, ignoreExisting bool, compression mediatype.CompressionType, progress *mpb.Progress) error { rc, err := store.Fetch(ctx, desc) if err != nil { return fmt.Errorf("failed get layer %s: %w", desc.Digest, err) } var logger *output.ProgressLogger - rc, logger = output.WrapUnpackReadCloser(desc.Size, rc) + rc, logger = output.WrapUnpackReadCloser(desc.Size, rc, progress) defer rc.Close() var cr io.ReadCloser diff --git a/pkg/output/progress.go b/pkg/output/progress.go index 3a76bf5f..7d1b14ca 100644 --- a/pkg/output/progress.go +++ b/pkg/output/progress.go @@ -138,15 +138,11 @@ func WrapTarget(wrap oras.Target) (oras.Target, *ProgressLogger) { }, &ProgressLogger{p} } -func WrapUnpackReadCloser(size int64, rc io.ReadCloser) (io.ReadCloser, *ProgressLogger) { +func WrapUnpackReadCloser(size int64, rc io.ReadCloser, p *mpb.Progress) (io.ReadCloser, *ProgressLogger) { if !progressEnabled { return rc, &ProgressLogger{stdout} } - p := mpb.New( - mpb.WithWidth(60), - mpb.WithRefreshRate(150*time.Millisecond), - ) bar := p.New(size, barStyle(), mpb.PrependDecorators( @@ -187,15 +183,11 @@ func (t *ProgressTar) Close() error { return nil } -func TarProgress(total int64, tw *tar.Writer) (*ProgressTar, *ProgressLogger) { +func TarProgress(total int64, tw *tar.Writer, p *mpb.Progress) (*ProgressTar, *ProgressLogger) { if !progressEnabled || total == 0 { return &ProgressTar{tw: tw}, &ProgressLogger{stdout} } - p := mpb.New( - mpb.WithWidth(60), - mpb.WithRefreshRate(150*time.Millisecond), - ) bar := p.New(total, barStyle(), mpb.PrependDecorators(