Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/lib/filesystem/ignore/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

kfutils "github.com/kitops-ml/kitops/pkg/lib/repo/util"

Expand Down Expand Up @@ -63,9 +64,12 @@ func New(kitIgnorePaths []string, kitfile *artifact.KitFile, extraLayers ...stri
type ignorePaths struct {
ignoreFileMatcher *patternmatcher.PatternMatcher
layers []string
mu sync.Mutex
}

func (pm *ignorePaths) Matches(path, layerPath string) (bool, error) {
pm.mu.Lock()
defer pm.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saveContentLayer -> packLayerToTar -> writeLayerToTar
writeLayerToTar calls ignore.Matches` for each file With the mutex, all file traversal becomes serialized. Parallel implementation may be SLOWER than sequential due to lock contention.

path = cleanPath(path)
layerPath = cleanPath(layerPath)
ignoreFileMatches, err := pm.ignoreFileMatcher.MatchesOrParentMatches(path)
Expand Down
156 changes: 113 additions & 43 deletions pkg/lib/filesystem/local-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"errors"
"fmt"
"os"
"sort"
"time"

"github.com/kitops-ml/kitops/pkg/artifact"
"github.com/kitops-ml/kitops/pkg/lib/constants"
Expand All @@ -32,6 +34,8 @@ import (
"github.com/kitops-ml/kitops/pkg/lib/repo/local"
"github.com/kitops-ml/kitops/pkg/lib/repo/util"
"github.com/kitops-ml/kitops/pkg/output"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"

"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go"
Expand Down Expand Up @@ -136,71 +140,137 @@ func saveConfig(ctx context.Context, localRepo local.LocalRepo, kitfile *artifac
}

func saveKitfileLayers(ctx context.Context, localRepo local.LocalRepo, kitfile *artifact.KitFile, ignore ignore.Paths, opts *SaveModelOptions) (layers []ocispec.Descriptor, diffIDs []digest.Digest, err error) {

var (
eg, egCtx = errgroup.WithContext(ctx)
)

type layerResult struct {
layer ocispec.Descriptor
diffID digest.Digest
index int
}

results := make(chan layerResult, 100)
currentIndex := 0
progress := mpb.New(
mpb.WithWidth(60),
mpb.WithRefreshRate(150*time.Millisecond),
)

if kitfile.Model != nil {
if kitfile.Model.Path != "" && !util.IsModelKitReference(kitfile.Model.Path) {
mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(ctx, localRepo, kitfile.Model.Path, mediaType, ignore)
if err != nil {
return nil, nil, err
}
layers = append(layers, layer)
diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId))
kitfile.Model.LayerInfo = layerInfo
currentIndexCopy := currentIndex
currentIndex++
eg.Go(func() error {
mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(egCtx, localRepo, kitfile.Model.Path, mediaType, ignore, progress)
if err != nil {
return err
}
kitfile.Model.LayerInfo = layerInfo
results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy}
return nil
})
}
for idx, part := range kitfile.Model.Parts {
mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(ctx, localRepo, part.Path, mediaType, ignore)
if err != nil {
return nil, nil, err
}
layers = append(layers, layer)
diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId))
kitfile.Model.Parts[idx].LayerInfo = layerInfo
currentIndexCopy := currentIndex
currentIndex++
index := idx
path := part.Path
eg.Go(func() error {
mediaType := mediatype.New(opts.ModelFormat, mediatype.ModelPartBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress)
if err != nil {
return err
}
kitfile.Model.Parts[index].LayerInfo = layerInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is introducing concurrent writes to shared Kitfile fields without synchronization, causing potential data races and corruption. The same pattern is repeated on other layers too.

results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy}
return nil
})
}
}
for idx, code := range kitfile.Code {
mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(ctx, localRepo, code.Path, mediaType, ignore)
if err != nil {
return nil, nil, err
}
layers = append(layers, layer)
diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId))
kitfile.Code[idx].LayerInfo = layerInfo
currentIndexCopy := currentIndex
currentIndex++
index := idx
path := code.Path
eg.Go(func() error {
mediaType := mediatype.New(opts.ModelFormat, mediatype.CodeBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress)
if err != nil {
return err
}
kitfile.Code[index].LayerInfo = layerInfo
results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy}
return nil
})
}
for idx, dataset := range kitfile.DataSets {
mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(ctx, localRepo, dataset.Path, mediaType, ignore)
if err != nil {
return nil, nil, err
}
layers = append(layers, layer)
diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId))
kitfile.DataSets[idx].LayerInfo = layerInfo
currentIndexCopy := currentIndex
currentIndex++
index := idx
path := dataset.Path
eg.Go(func() error {
mediaType := mediatype.New(opts.ModelFormat, mediatype.DatasetBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress)
if err != nil {
return err
}
kitfile.DataSets[index].LayerInfo = layerInfo
results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy}
return nil
})
}
for idx, docs := range kitfile.Docs {
mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(ctx, localRepo, docs.Path, mediaType, ignore)
if err != nil {
return nil, nil, err
}
layers = append(layers, layer)
diffIDs = append(diffIDs, digest.FromString(layerInfo.DiffId))
kitfile.Docs[idx].LayerInfo = layerInfo
currentIndexCopy := currentIndex
currentIndex++
index := idx
path := docs.Path
eg.Go(func() error {
mediaType := mediatype.New(opts.ModelFormat, mediatype.DocsBaseType, opts.LayerFormat, opts.Compression)
layer, layerInfo, err := saveContentLayer(egCtx, localRepo, path, mediaType, ignore, progress)
if err != nil {
return err
}
kitfile.Docs[index].LayerInfo = layerInfo
results <- layerResult{layer: layer, diffID: digest.FromString(layerInfo.DiffId), index: currentIndexCopy}
return nil
})
}

if err = eg.Wait(); err != nil {
close(results)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closes the results channel and returns. However, some goroutines might still be running and will panic when trying to send to the closed channel.

return nil, nil, err
}

resultsStore := []layerResult{}
close(results)
for j := range results {
resultsStore = append(resultsStore, j)
}

sort.SliceStable(resultsStore, func(i, j int) bool {
return resultsStore[i].index < resultsStore[j].index
})

for j := range resultsStore {
layers = append(layers, resultsStore[j].layer)
diffIDs = append(diffIDs, resultsStore[j].diffID)
}

return layers, diffIDs, nil
return layers, diffIDs, err
}

func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths) (ocispec.Descriptor, *artifact.LayerInfo, error) {
func saveContentLayer(ctx context.Context, localRepo local.LocalRepo, path string, mediaType mediatype.MediaType, ignore ignore.Paths, progress *mpb.Progress) (ocispec.Descriptor, *artifact.LayerInfo, error) {
// We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress
// to a temporary file. Ideally, we can also add this to the internal store by moving the file to avoid
// copying if possible.
if mediaType.Format() != mediatype.TarFormat {
// TODO: Add support for ModelPack's "raw" layer type
return ocispec.DescriptorEmptyJSON, nil, fmt.Errorf("Only tar-formatted layers are currently supported")
}
tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore)
tempPath, desc, info, err := packLayerToTar(path, mediaType, ignore, progress)
if err != nil {
return ocispec.DescriptorEmptyJSON, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/lib/filesystem/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/kitops-ml/kitops/pkg/lib/filesystem/cache"
"github.com/kitops-ml/kitops/pkg/lib/filesystem/ignore"
"github.com/kitops-ml/kitops/pkg/output"
"github.com/vbauerster/mpb/v8"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -41,7 +42,7 @@ import (
// a descriptor (including hash) for the compressed file, the layer is saved to a temporary file
// on disk and must be moved to an appropriate location. It is the responsibility of the caller
// to clean up the temporary file when it is no longer needed.
func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Paths) (tempFilePath string, desc ocispec.Descriptor, layerInfo *artifact.LayerInfo, err error) {
func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Paths, progress *mpb.Progress) (tempFilePath string, desc ocispec.Descriptor, layerInfo *artifact.LayerInfo, err error) {
// Clean path to ensure consistent format (./path vs path/ vs path)
path = filepath.Clean(path)

Expand Down Expand Up @@ -92,7 +93,7 @@ func packLayerToTar(path string, mediaType mediatype.MediaType, ignore ignore.Pa
default:
return "", ocispec.DescriptorEmptyJSON, nil, fmt.Errorf("Unsupported compression format: %s", mediaType.Compression())
}
progressTarWriter, plog := output.TarProgress(totalSize, tarWriter)
progressTarWriter, plog := output.TarProgress(totalSize, tarWriter, progress)

if err := writeLayerToTar(path, ignore, progressTarWriter, plog); err != nil {
// Don't care about these errors since we'll be deleting the file anyways
Expand Down
35 changes: 29 additions & 6 deletions pkg/lib/filesystem/unpack/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ import (
"path/filepath"
"slices"
"strings"
"time"

"github.com/kitops-ml/kitops/pkg/artifact"
"github.com/kitops-ml/kitops/pkg/lib/constants"
"github.com/kitops-ml/kitops/pkg/lib/constants/mediatype"
"github.com/kitops-ml/kitops/pkg/lib/filesystem"
"github.com/kitops-ml/kitops/pkg/lib/repo/util"
"github.com/kitops-ml/kitops/pkg/output"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"

modelspecv1 "github.com/modelpack/model-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -67,6 +70,9 @@ func UnpackModelKit(ctx context.Context, opts *UnpackOptions) error {
}

func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []string) error {

eg, egCtx := errgroup.WithContext(ctx)

if len(visitedRefs) > constants.MaxModelRefChain {
return fmt.Errorf("reached maximum number of model references: [%s]", strings.Join(visitedRefs, "=>"))
}
Expand Down Expand Up @@ -119,7 +125,13 @@ func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []str
// We need to support older ModelKits (that were packed without diffIDs and digest
// in the config) for now, so we need to continue using the old structure.
var modelPartIdx, codeIdx, datasetIdx, docsIdx int
for _, layerDesc := range manifest.Layers {

progress := mpb.New(
mpb.WithWidth(60),
mpb.WithRefreshRate(150*time.Millisecond),
)
for index := range manifest.Layers {
layerDesc := manifest.Layers[index]
// This variable supports older-format tar layers (that don't include the
// layer path). For current ModelKits, this will be empty
var relPath string
Expand Down Expand Up @@ -207,10 +219,21 @@ func unpackRecursive(ctx context.Context, opts *UnpackOptions, visitedRefs []str
}
}

layerDescCopy := layerDesc
relPathCopy := relPath
compressionCopy := mediaType.Compression()

// TODO: handle DiffIDs when unpacking layers
if err := unpackLayer(ctx, store, layerDesc, relPath, opts.Overwrite, opts.IgnoreExisting, mediaType.Compression()); err != nil {
return fmt.Errorf("failed to unpack: %w", err)
}
eg.Go(func() error {
if err := unpackLayer(egCtx, store, layerDescCopy, relPathCopy, opts.Overwrite, opts.IgnoreExisting, compressionCopy, progress); err != nil {
return fmt.Errorf("failed to unpack: %w", err)
}
return nil
})

}
if err = eg.Wait(); err != nil {
return err
}
output.Debugf("Unpacked %d model part layers", modelPartIdx)
output.Debugf("Unpacked %d code layers", codeIdx)
Expand Down Expand Up @@ -294,13 +317,13 @@ func unpackConfig(config *artifact.KitFile, unpackDir string, overwrite bool) er
return nil
}

func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite, ignoreExisting bool, compression mediatype.CompressionType) error {
func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite, ignoreExisting bool, compression mediatype.CompressionType, progress *mpb.Progress) error {
rc, err := store.Fetch(ctx, desc)
if err != nil {
return fmt.Errorf("failed get layer %s: %w", desc.Digest, err)
}
var logger *output.ProgressLogger
rc, logger = output.WrapUnpackReadCloser(desc.Size, rc)
rc, logger = output.WrapUnpackReadCloser(desc.Size, rc, progress)
defer rc.Close()

var cr io.ReadCloser
Expand Down
12 changes: 2 additions & 10 deletions pkg/output/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,11 @@ func WrapTarget(wrap oras.Target) (oras.Target, *ProgressLogger) {
}, &ProgressLogger{p}
}

func WrapUnpackReadCloser(size int64, rc io.ReadCloser) (io.ReadCloser, *ProgressLogger) {
func WrapUnpackReadCloser(size int64, rc io.ReadCloser, p *mpb.Progress) (io.ReadCloser, *ProgressLogger) {
if !progressEnabled {
return rc, &ProgressLogger{stdout}
}

p := mpb.New(
mpb.WithWidth(60),
mpb.WithRefreshRate(150*time.Millisecond),
)
bar := p.New(size,
barStyle(),
mpb.PrependDecorators(
Expand Down Expand Up @@ -187,15 +183,11 @@ func (t *ProgressTar) Close() error {
return nil
}

func TarProgress(total int64, tw *tar.Writer) (*ProgressTar, *ProgressLogger) {
func TarProgress(total int64, tw *tar.Writer, p *mpb.Progress) (*ProgressTar, *ProgressLogger) {
if !progressEnabled || total == 0 {
return &ProgressTar{tw: tw}, &ProgressLogger{stdout}
}

p := mpb.New(
mpb.WithWidth(60),
mpb.WithRefreshRate(150*time.Millisecond),
)
bar := p.New(total,
barStyle(),
mpb.PrependDecorators(
Expand Down