diff --git a/cache/cache.go b/cache/cache.go index d4f798b9e..18602523b 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -18,13 +18,14 @@ package cache import ( "bytes" - "errors" "fmt" "io" "os" "path/filepath" "sync" + "github.com/containerd/log" + "github.com/containerd/stargz-snapshotter/hardlink" "github.com/containerd/stargz-snapshotter/util/cacheutil" "github.com/containerd/stargz-snapshotter/util/namedmutex" "golang.org/x/sys/unix" @@ -33,6 +34,10 @@ import ( const ( defaultMaxLRUCacheEntry = 10 defaultMaxCacheFds = 10 + + // cache key namespaces to avoid collisions between raw keys and digests + cacheKeyPrefixRaw = "raw:" + cacheKeyPrefixDigest = "digest:" ) type DirectoryCacheConfig struct { @@ -65,6 +70,9 @@ type DirectoryCacheConfig struct { // FadvDontNeed forcefully clean fscache pagecache for saving memory. FadvDontNeed bool + + // HardlinkManager provides hardlink support when non-nil. + HardlinkManager *hardlink.Manager } // TODO: contents validation. @@ -103,6 +111,7 @@ type Writer interface { type cacheOpt struct { direct bool passThrough bool + chunkDigest string } type Option func(o *cacheOpt) *cacheOpt @@ -127,6 +136,14 @@ func PassThrough() Option { } } +// ChunkDigest option allows specifying a chunk digest for the cache +func ChunkDigest(digest string) Option { + return func(o *cacheOpt) *cacheOpt { + o.chunkDigest = digest + return o + } +} + func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) { if !filepath.IsAbs(directory) { return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory) @@ -178,8 +195,10 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache bufPool: bufPool, direct: config.Direct, fadvDontNeed: config.FadvDontNeed, + syncAdd: config.SyncAdd, + hlManager: config.HardlinkManager, } - dc.syncAdd = config.SyncAdd + return dc, nil } @@ -199,6 +218,8 @@ type directoryCache struct { closed bool closedMu sync.Mutex + + hlManager *hardlink.Manager } func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) { @@ -211,9 +232,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) { opt = o(opt) } + // Try to get from memory cache if !dc.direct && !opt.direct { - // Get data from memory - if b, done, ok := dc.cache.Get(key); ok { + // Try memory cache for digest or key, with namespaced keys to avoid collisions + cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest) + + if b, done, ok := dc.cache.Get(cacheKey); ok { return &reader{ ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()), closeFunc: func() error { @@ -223,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) { }, nil } - // Get data from disk. If the file is already opened, use it. - if f, done, ok := dc.fileCache.Get(key); ok { + // Get data from file cache for digest or key + if f, done, ok := dc.fileCache.Get(cacheKey); ok { return &reader{ ReaderAt: f.(*os.File), closeFunc: func() error { @@ -235,10 +259,20 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) { } } + // First try regular file path + filepath := buildCachePath(dc.directory, key) + + if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) { + if digestPath, exists := dc.hlManager.Get(key, opt.chunkDigest, opt.direct); exists { + log.L.Debugf("Using existing file for digest %q instead of key %q", opt.chunkDigest, key) + filepath = digestPath + } + } + // Open the cache file and read the target region // TODO: If the target cache is write-in-progress, should we wait for the completion // or simply report the cache miss? - file, err := os.Open(dc.cachePath(key)) + file, err := os.Open(filepath) if err != nil { return nil, fmt.Errorf("failed to open blob file for %q: %w", key, err) } @@ -273,7 +307,9 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) { return &reader{ ReaderAt: file, closeFunc: func() error { - _, done, added := dc.fileCache.Add(key, file) + cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest) + + _, done, added := dc.fileCache.Add(cacheKey, file) defer done() // Release it immediately. Cleaned up on eviction. if !added { return file.Close() // file already exists in the cache. close it. @@ -293,37 +329,60 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) { opt = o(opt) } - wip, err := dc.wipFile(key) + // If hardlink manager exists and digest is provided, check if a hardlink can be created + if dc.hlManager != nil && opt.chunkDigest != "" { + keyPath := buildCachePath(dc.directory, key) + + err := dc.hlManager.Add(key, opt.chunkDigest, keyPath) + if err == nil { + return &writer{ + WriteCloser: nopWriteCloser(io.Discard), + commitFunc: func() error { return nil }, + abortFunc: func() error { return nil }, + }, nil + } + } + + // Create temporary file + w, err := wipFile(dc.wipDirectory, key) if err != nil { return nil, err } - w := &writer{ - WriteCloser: wip, + + // Create writer + writer := &writer{ + WriteCloser: w, commitFunc: func() error { if dc.isClosed() { return fmt.Errorf("cache is already closed") } - // Commit the cache contents - c := dc.cachePath(key) - if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil { - var errs []error - if err := os.Remove(wip.Name()); err != nil { - errs = append(errs, err) - } - errs = append(errs, fmt.Errorf("failed to create cache directory %q: %w", c, err)) - return errors.Join(errs...) + + // Commit file + targetPath := buildCachePath(dc.directory, key) + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + return fmt.Errorf("failed to create cache directory: %w", err) } if dc.fadvDontNeed { - if err := dropFilePageCache(wip); err != nil { + if err := dropFilePageCache(w); err != nil { fmt.Printf("Warning: failed to drop page cache: %v\n", err) } } - return os.Rename(wip.Name(), c) + if err := os.Rename(w.Name(), targetPath); err != nil { + return fmt.Errorf("failed to commit cache file: %w", err) + } + + if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) { + if err := dc.hlManager.Enroll(opt.chunkDigest, targetPath, dc.directory, key); err != nil { + return fmt.Errorf("failed to register digest file: %w", err) + } + } + + return nil }, abortFunc: func() error { - return os.Remove(wip.Name()) + return os.Remove(w.Name()) }, } @@ -331,50 +390,12 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) { // This option is useful for preventing memory cache from being polluted by data // that won't be accessed immediately. if dc.direct || opt.direct { - return w, nil + return writer, nil } + // Create memory cache b := dc.bufPool.Get().(*bytes.Buffer) - memW := &writer{ - WriteCloser: nopWriteCloser(io.Writer(b)), - commitFunc: func() error { - if dc.isClosed() { - w.Close() - return fmt.Errorf("cache is already closed") - } - cached, done, added := dc.cache.Add(key, b) - if !added { - dc.putBuffer(b) // already exists in the cache. abort it. - } - commit := func() error { - defer done() - defer w.Close() - n, err := w.Write(cached.(*bytes.Buffer).Bytes()) - if err != nil || n != cached.(*bytes.Buffer).Len() { - w.Abort() - return err - } - return w.Commit() - } - if dc.syncAdd { - return commit() - } - go func() { - if err := commit(); err != nil { - fmt.Println("failed to commit to file:", err) - } - }() - return nil - }, - abortFunc: func() error { - defer w.Close() - defer w.Abort() - dc.putBuffer(b) // abort it. - return nil - }, - } - - return memW, nil + return dc.wrapMemoryWriter(b, writer, key) } func (dc *directoryCache) putBuffer(b *bytes.Buffer) { @@ -399,14 +420,6 @@ func (dc *directoryCache) isClosed() bool { return closed } -func (dc *directoryCache) cachePath(key string) string { - return filepath.Join(dc.directory, key[:2], key) -} - -func (dc *directoryCache) wipFile(key string) (*os.File, error) { - return os.CreateTemp(dc.wipDirectory, key+"-*") -} - func NewMemoryCache() BlobCache { return &MemoryCache{ Membuf: map[string]*bytes.Buffer{}, @@ -495,3 +508,77 @@ func dropFilePageCache(file *os.File) error { } return nil } + +// wrapMemoryWriter wraps a writer with memory caching +func (dc *directoryCache) wrapMemoryWriter(b *bytes.Buffer, w *writer, key string) (Writer, error) { + return &writer{ + WriteCloser: nopWriteCloser(b), + commitFunc: func() error { + if dc.isClosed() { + w.Close() + return fmt.Errorf("cache is already closed") + } + + cached, done, added := dc.cache.Add(key, b) + if !added { + dc.putBuffer(b) + } + + commit := func() error { + defer done() + defer w.Close() + + n, err := w.Write(cached.(*bytes.Buffer).Bytes()) + if err != nil || n != cached.(*bytes.Buffer).Len() { + w.Abort() + return err + } + return w.Commit() + } + + if dc.syncAdd { + return commit() + } + + go func() { + if err := commit(); err != nil { + log.L.Infof("failed to commit to file: %v", err) + } + }() + return nil + }, + abortFunc: func() error { + defer w.Close() + defer w.Abort() + dc.putBuffer(b) + return nil + }, + }, nil +} + +// shouldUseDigestCacheKey determines whether to use the digest as the cache key. +// Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty. +func shouldUseDigestCacheKey(hlManager *hardlink.Manager, chunkDigest string) bool { + return hlManager != nil && chunkDigest != "" +} + +// getCacheKey returns the appropriate cache key based on whether digest-based caching should be used. +// If digest-based caching is enabled, it returns a key with the digest prefix, otherwise with the raw key prefix. +func getCacheKey(hlManager *hardlink.Manager, key string, chunkDigest string) string { + if shouldUseDigestCacheKey(hlManager, chunkDigest) { + return cacheKeyPrefixDigest + chunkDigest + } + return cacheKeyPrefixRaw + key +} + +func buildCachePath(directory string, key string) string { + return filepath.Join(directory, key[:2], key) +} + +// WipFile creates a temporary file in the given directory with the given key pattern +func wipFile(wipDirectory string, key string) (*os.File, error) { + if err := os.MkdirAll(wipDirectory, 0700); err != nil { + return nil, fmt.Errorf("failed to create wip directory: %w", err) + } + return os.CreateTemp(wipDirectory, key+"-*") +} diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index 436dca938..c46a03791 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -36,7 +36,9 @@ import ( "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/log" "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts" + "github.com/containerd/stargz-snapshotter/fs" "github.com/containerd/stargz-snapshotter/fusemanager" + "github.com/containerd/stargz-snapshotter/hardlink" "github.com/containerd/stargz-snapshotter/service" "github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig" snbase "github.com/containerd/stargz-snapshotter/snapshot" @@ -135,6 +137,14 @@ func main() { log.G(ctx).WithError(err).Fatalf("snapshotter is not supported") } + var hlm *hardlink.Manager + if config.HardlinkRoot != "" { + hlm, err = hardlink.New(config.HardlinkRoot) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to init hardlink manager") + } + } + // Create a gRPC server rpc := grpc.NewServer() @@ -252,6 +262,9 @@ func main() { if err != nil { log.G(ctx).WithError(err).Fatalf("failed to configure fs config") } + if hlm != nil { + fsOpts = append(fsOpts, fs.WithHardlinkManager(hlm)) + } rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config, service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...)) diff --git a/docs/hardlink.md b/docs/hardlink.md new file mode 100644 index 000000000..223a526f6 --- /dev/null +++ b/docs/hardlink.md @@ -0,0 +1,69 @@ +# Enabling and Using Hardlink in Stargz Snapshotter + +The `stargz-snapshotter` provides a hardlink feature to optimize storage by reducing redundancy and improving access times. This guide will walk you through enabling and using hardlinks in your setup. + +## Overview + +Hardlinking allows multiple references to the same file data without duplicating the data itself. This is particularly useful in environments where storage efficiency and performance are critical. The hardlink feature in stargz-snapshotter works by: + +- Tracking file chunks by their content digest +- Creating hardlinks between identical chunks across different container layers + +## Prerequisites + +- Ensure you have `stargz-snapshotter` installed and configured. +- Familiarity with the configuration files and environment where `stargz-snapshotter` is deployed. +- A filesystem that supports hardlinks (most Linux filesystems do). + +## Enabling Hardlinking + +To enable hardlinking, you need to modify the configuration file of `stargz-snapshotter`. + +1. **Locate the Configuration File**: The configuration file is typically named `config.toml` or similar, depending on your setup. + +2. **Modify the Configuration**: + - Open the configuration file in a text editor. + - Locate the `DirectoryCacheConfig` section. + - Set the `hardlink_root` option to a directory path where canonical hardlink files will be stored. + + Example: + ```toml + [directory_cache] + hardlink_root = "/var/lib/containerd-stargz-grpc" + ``` + +3. **How It Works**: + - When `hardlink_root` is configured, the snapshotter will automatically create a `hardlinks/` subdirectory under this path. + - For each unique chunk (identified by digest), a canonical file is stored in the `hardlinks/` directory. + - All cache files with the same digest are hardlinked to this canonical file, sharing the same inode. + - When a canonical file is no longer referenced (nlink == 1), it will be automatically cleaned up during cache eviction. + - This approach provides clear semantics and efficient garbage collection based on reference counting. + +## Using Hardlinking + +Once hardlinking is enabled and the manager is initialized, `stargz-snapshotter` will automatically manage hardlinks for cached files. Here's how it works: + +1. **Cache Initialization**: When the cache is initialized, the system checks if hardlinking is enabled and uses the global hardlink manager. It will: + - Create a hardlink directory structure under the configured root + - Use in-memory LRU-backed mappings (no on-disk persistence) + +2. **Adding Files to Cache**: When a file is added to the cache: + - The system calculates and uses the file's content digest + - It checks if a file with the same digest already exists + - If found, it creates a hardlink rather than duplicating the content + - It maps the cache key to the digest for future lookups + +3. **Accessing Cached Files**: When accessing a cached file: + - The system first checks for the cache key/digest in memory + - It then retrieves the hardlinked file path if available + - If the underlying file is missing, the mapping is removed on-the-fly and the operation falls back to creating a new file + +## Technical Details + +The hardlink feature works through several components: + +- **HardlinkManager**: Centralized service that manages digest tracking and hardlink creation +- **LRU-backed Digest Mapping**: In-memory LRU cache mapping digest → file path +- **Reverse Indexes**: Plain maps for key → digest and digest → keys, pruned on eviction + +When prefetching or downloading content, the chunk digest is passed to the cache system via the `ChunkDigest` option, which enables the hardlink manager to track and link identical content. diff --git a/fs/config/config.go b/fs/config/config.go index 9b6962f74..ecbce23cb 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -142,6 +142,12 @@ type DirectoryCacheConfig struct { // FadvDontNeed forcefully clean fscache pagecache for saving memory. Default is false. FadvDontNeed bool `toml:"fadv_dontneed" json:"fadv_dontneed"` + + // HardlinkRoot specifies the root directory for storing canonical hardlink files. + // When set, hardlinking will be enabled to reduce disk usage by sharing identical chunks. + // The hardlink manager will create a "hardlinks" subdirectory under this root. + // Leave empty to disable hardlinking. + HardlinkRoot string `toml:"hardlink_root" json:"hardlink_root"` } // FuseConfig is configuration for FUSE fs. diff --git a/fs/fs.go b/fs/fs.go index c55d57002..ef6dd5a83 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -53,6 +53,7 @@ import ( layermetrics "github.com/containerd/stargz-snapshotter/fs/metrics/layer" "github.com/containerd/stargz-snapshotter/fs/remote" "github.com/containerd/stargz-snapshotter/fs/source" + "github.com/containerd/stargz-snapshotter/hardlink" "github.com/containerd/stargz-snapshotter/metadata" memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory" "github.com/containerd/stargz-snapshotter/snapshot" @@ -86,6 +87,7 @@ type options struct { metricsLogLevel *log.Level overlayOpaqueType layer.OverlayOpaqueType additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor + hlManager *hardlink.Manager } func WithGetSources(s source.GetSources) Option { @@ -127,6 +129,12 @@ func WithAdditionalDecompressors(d func(context.Context, source.RegistryHosts, r } } +func WithHardlinkManager(hlManager *hardlink.Manager) Option { + return func(opts *options) { + opts.hlManager = hlManager + } +} + func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.FileSystem, err error) { var fsOpts options for _, o := range opts { @@ -159,7 +167,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F }) } tm := task.NewBackgroundTaskManager(maxConcurrency, 5*time.Second) - r, err := layer.NewResolver(root, tm, cfg, fsOpts.resolveHandlers, metadataStore, fsOpts.overlayOpaqueType, fsOpts.additionalDecompressors) + r, err := layer.NewResolver(root, tm, cfg, fsOpts.resolveHandlers, metadataStore, fsOpts.overlayOpaqueType, fsOpts.additionalDecompressors, fsOpts.hlManager) if err != nil { return nil, fmt.Errorf("failed to setup resolver: %w", err) } diff --git a/fs/layer/layer.go b/fs/layer/layer.go index 0397f2e9e..e6a859b19 100644 --- a/fs/layer/layer.go +++ b/fs/layer/layer.go @@ -42,6 +42,7 @@ import ( "github.com/containerd/stargz-snapshotter/fs/reader" "github.com/containerd/stargz-snapshotter/fs/remote" "github.com/containerd/stargz-snapshotter/fs/source" + "github.com/containerd/stargz-snapshotter/hardlink" "github.com/containerd/stargz-snapshotter/metadata" "github.com/containerd/stargz-snapshotter/task" "github.com/containerd/stargz-snapshotter/util/cacheutil" @@ -141,10 +142,11 @@ type Resolver struct { metadataStore metadata.Store overlayOpaqueType OverlayOpaqueType additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor + hlManager *hardlink.Manager } // NewResolver returns a new layer resolver. -func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, cfg config.Config, resolveHandlers map[string]remote.Handler, metadataStore metadata.Store, overlayOpaqueType OverlayOpaqueType, additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor) (*Resolver, error) { +func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, cfg config.Config, resolveHandlers map[string]remote.Handler, metadataStore metadata.Store, overlayOpaqueType OverlayOpaqueType, additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor, hlManager *hardlink.Manager) (*Resolver, error) { resolveResultEntryTTL := time.Duration(cfg.ResolveResultEntryTTLSec) * time.Second if resolveResultEntryTTL == 0 { resolveResultEntryTTL = defaultResolveResultEntryTTLSec * time.Second @@ -193,10 +195,11 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, metadataStore: metadataStore, overlayOpaqueType: overlayOpaqueType, additionalDecompressors: additionalDecompressors, + hlManager: hlManager, }, nil } -func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache, error) { +func newCache(root string, cacheType string, cfg config.Config, hlManager *hardlink.Manager) (cache.BlobCache, error) { if cacheType == memoryCacheType { return cache.NewMemoryCache(), nil } @@ -235,12 +238,13 @@ func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache return cache.NewDirectoryCache( cachePath, cache.DirectoryCacheConfig{ - SyncAdd: dcc.SyncAdd, - DataCache: dCache, - FdCache: fCache, - BufPool: bufPool, - Direct: dcc.Direct, - FadvDontNeed: dcc.FadvDontNeed, + HardlinkManager: hlManager, + SyncAdd: dcc.SyncAdd, + DataCache: dCache, + FdCache: fCache, + BufPool: bufPool, + Direct: dcc.Direct, + FadvDontNeed: dcc.FadvDontNeed, }, ) } @@ -285,7 +289,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs } }() - fsCache, err := newCache(filepath.Join(r.rootDir, "fscache"), r.config.FSCacheType, r.config) + fsCache, err := newCache(filepath.Join(r.rootDir, "fscache"), r.config.FSCacheType, r.config, r.hlManager) if err != nil { return nil, fmt.Errorf("failed to create fs cache: %w", err) } @@ -367,7 +371,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, r.blobCacheMu.Unlock() } - httpCache, err := newCache(filepath.Join(r.rootDir, "httpcache"), r.config.HTTPCacheType, r.config) + httpCache, err := newCache(filepath.Join(r.rootDir, "httpcache"), r.config.HTTPCacheType, r.config, r.hlManager) if err != nil { return nil, fmt.Errorf("failed to create http cache: %w", err) } diff --git a/fs/reader/reader.go b/fs/reader/reader.go index 6362b5b5d..dfc30f95c 100644 --- a/fs/reader/reader.go +++ b/fs/reader/reader.go @@ -238,7 +238,7 @@ func (vr *VerifiableReader) readAndCache(id uint32, fr io.Reader, chunkOffset, c // Check if it already exists in the cache cacheID := genID(id, chunkOffset, chunkSize) - if r, err := gr.cache.Get(cacheID); err == nil { + if r, err := gr.cache.Get(cacheID, cache.ChunkDigest(chunkDigest)); err == nil { r.Close() return nil } @@ -248,6 +248,7 @@ func (vr *VerifiableReader) readAndCache(id uint32, fr io.Reader, chunkOffset, c if _, err := br.Peek(int(chunkSize)); err != nil { return fmt.Errorf("cacheWithReader.peek: %v", err) } + opts = append(opts, cache.ChunkDigest(chunkDigest)) w, err := gr.cache.Add(cacheID, opts...) if err != nil { return err @@ -443,7 +444,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { ) // Check if the content exists in the cache - if r, err := sf.gr.cache.Get(id); err == nil { + if r, err := sf.gr.cache.Get(id, cache.ChunkDigest(chunkDigestStr)); err == nil { n, err := r.ReadAt(p[nr:int64(nr)+expectedSize], lowerDiscard) if (err == nil || err == io.EOF) && int64(n) == expectedSize { nr += n @@ -634,13 +635,11 @@ type batchWorkerArgs struct { } func (sf *file) prefetchEntireFile(entireCacheID string, chunks []chunkData, totalSize int64, bufferSize int64, workerCount int) error { - w, err := sf.gr.cache.Add(entireCacheID) if err != nil { return fmt.Errorf("failed to create cache writer: %w", err) } defer w.Close() - batchCount := (totalSize + bufferSize - 1) / bufferSize for batchIdx := int64(0); batchIdx < batchCount; batchIdx++ { @@ -760,7 +759,7 @@ func (sf *file) processBatchChunks(args *batchWorkerArgs) error { bufStart := args.buffer[chunk.bufferPos : chunk.bufferPos+chunk.size] id := genID(sf.id, chunk.offset, chunk.size) - if r, err := sf.gr.cache.Get(id); err == nil { + if r, err := sf.gr.cache.Get(id, cache.ChunkDigest(chunk.digestStr)); err == nil { n, err := r.ReadAt(bufStart, 0) r.Close() if err == nil || err == io.EOF { @@ -804,8 +803,8 @@ func (gr *reader) verifyOneChunk(entryID uint32, ip []byte, chunkDigestStr strin return nil } -func (gr *reader) cacheData(ip []byte, cacheID string) { - if w, err := gr.cache.Add(cacheID); err == nil { +func (gr *reader) cacheData(ip []byte, cacheID string, chunkDigestStr string) { + if w, err := gr.cache.Add(cacheID, cache.ChunkDigest(chunkDigestStr)); err == nil { if cn, err := w.Write(ip); err != nil || cn != len(ip) { w.Abort() } else { @@ -819,7 +818,7 @@ func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr strin if err := gr.verifyOneChunk(entryID, ip, chunkDigestStr); err != nil { return err } - gr.cacheData(ip, cacheID) + gr.cacheData(ip, cacheID, chunkDigestStr) return nil } diff --git a/hardlink/hardlink.go b/hardlink/hardlink.go new file mode 100644 index 000000000..73de89112 --- /dev/null +++ b/hardlink/hardlink.go @@ -0,0 +1,218 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package hardlink + +import ( + "crypto/sha256" + "fmt" + "os" + "path/filepath" + "sync" + + "golang.org/x/sys/unix" + + "github.com/containerd/stargz-snapshotter/util/cacheutil" +) + +type ChunkDigestMapping struct { + Digest string `json:"digest"` + Keys []string `json:"keys"` +} + +type Manager struct { + mu sync.RWMutex + hardlinkRoot string + digestToKeys map[string]*ChunkDigestMapping + keyToDigest map[string]string + digestToFile *cacheutil.LRUCache +} + +func New(root string) (*Manager, error) { + hardlinkRoot := filepath.Join(root, "hardlinks") + if err := os.MkdirAll(hardlinkRoot, 0700); err != nil { + return nil, fmt.Errorf("failed to create hardlink directory: %w", err) + } + + hm := &Manager{ + hardlinkRoot: hardlinkRoot, + digestToKeys: make(map[string]*ChunkDigestMapping), + keyToDigest: make(map[string]string), + } + + hm.digestToFile = cacheutil.NewLRUCache(100000) + hm.digestToFile.OnEvicted = func(d string, v interface{}) { + hm.mu.Lock() + defer hm.mu.Unlock() + if mapping, ok := hm.digestToKeys[d]; ok { + for _, k := range mapping.Keys { + delete(hm.keyToDigest, k) + } + delete(hm.digestToKeys, d) + } + if canonicalPath, ok := v.(string); ok { + hm.cleanup(canonicalPath) + } + } + + return hm, nil +} + +func (hm *Manager) cleanup(canonicalPath string) { + stat, err := os.Stat(canonicalPath) + if err != nil { + return + } + + if sys, ok := stat.Sys().(*unix.Stat_t); ok { + if sys.Nlink == 1 { + _ = os.Remove(canonicalPath) + } + } +} + +func (hm *Manager) getLink(chunkdigest string) (string, bool) { + v, done, ok := hm.digestToFile.Get(chunkdigest) + if !ok { + return "", false + } + defer done() + canonicalPath, _ := v.(string) + if _, err := os.Stat(canonicalPath); err != nil { + hm.digestToFile.Remove(chunkdigest) + return "", false + } + return canonicalPath, true +} + +func (hm *Manager) Enroll(chunkdigest string, filePath string, directory string, key string) error { + if _, err := os.Stat(filePath); err != nil { + return fmt.Errorf("file does not exist at %q", filePath) + } + + canonicalPath := filepath.Join(hm.hardlinkRoot, chunkdigest) + if _, err := os.Stat(canonicalPath); os.IsNotExist(err) { + if err := os.Link(filePath, canonicalPath); err != nil { + return fmt.Errorf("failed to create canonical hardlink: %w", err) + } + } + + _, done, _ := hm.digestToFile.Add(chunkdigest, canonicalPath) + done() + + if directory != "" && key != "" { + internalKey := hm.generateInternalKey(directory, key) + if err := hm.mapKeyToDigest(internalKey, chunkdigest); err != nil { + return fmt.Errorf("failed to map key to digest: %w", err) + } + } + + return nil +} + +func (hm *Manager) mapKeyToDigest(key string, chunkdigest string) error { + _, done, ok := hm.digestToFile.Get(chunkdigest) + if !ok { + return fmt.Errorf("digest %q is not registered", chunkdigest) + } + done() + + hm.mu.Lock() + defer hm.mu.Unlock() + + hm.removeKeyFromOldDigest(key) + hm.addKeyToDigest(key, chunkdigest) + return nil +} + +func (hm *Manager) removeKeyFromOldDigest(key string) { + oldDigest, exists := hm.keyToDigest[key] + if !exists { + return + } + + mapping, ok := hm.digestToKeys[oldDigest] + if !ok { + return + } + + for i, k := range mapping.Keys { + if k == key { + mapping.Keys = append(mapping.Keys[:i], mapping.Keys[i+1:]...) + break + } + } + + if len(mapping.Keys) == 0 { + delete(hm.digestToKeys, oldDigest) + } +} + +func (hm *Manager) addKeyToDigest(key, chunkdigest string) { + mapping, exists := hm.digestToKeys[chunkdigest] + if !exists { + mapping = &ChunkDigestMapping{ + Digest: chunkdigest, + Keys: []string{}, + } + hm.digestToKeys[chunkdigest] = mapping + } + mapping.Keys = append(mapping.Keys, key) + hm.keyToDigest[key] = chunkdigest +} + +func (hm *Manager) createLink(chunkdigest string, targetPath string) error { + digestPath, exists := hm.getLink(chunkdigest) + if !exists { + return fmt.Errorf("no existing file found for digest %q", chunkdigest) + } + + if digestPath == targetPath { + return fmt.Errorf("source and target paths are the same") + } + + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + return fmt.Errorf("failed to create directory for hardlink: %w", err) + } + + _ = os.Remove(targetPath) + + if err := os.Link(digestPath, targetPath); err != nil { + return fmt.Errorf("failed to create hardlink: %w", err) + } + + return nil +} + +func (hm *Manager) generateInternalKey(directory, key string) string { + internalKey := sha256.Sum256([]byte(fmt.Sprintf("%s-%s", directory, key))) + return fmt.Sprintf("%x", internalKey) +} + +func (hm *Manager) Get(key string, chunkDigest string, direct bool) (string, bool) { + if hm == nil || chunkDigest == "" { + return "", false + } + return hm.getLink(chunkDigest) +} + +func (hm *Manager) Add(key string, chunkDigest string, targetPath string) error { + if err := hm.createLink(chunkDigest, targetPath); err != nil { + return err + } + internalKey := hm.generateInternalKey(filepath.Dir(filepath.Dir(targetPath)), key) + return hm.mapKeyToDigest(internalKey, chunkDigest) +} diff --git a/hardlink/hardlink_test.go b/hardlink/hardlink_test.go new file mode 100644 index 000000000..b488af09e --- /dev/null +++ b/hardlink/hardlink_test.go @@ -0,0 +1,275 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package hardlink + +import ( + "crypto/sha256" + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func setupTestEnvironment(t *testing.T) (string, string, string, *Manager) { + t.Helper() + tmpDir := t.TempDir() + sourceDir := filepath.Join(tmpDir, "source") + if err := os.MkdirAll(sourceDir, 0700); err != nil { + t.Fatalf("failed to create source dir: %v", err) + } + + sourceFile := filepath.Join(sourceDir, "test.txt") + content := []byte("test content") + if err := os.WriteFile(sourceFile, content, 0600); err != nil { + t.Fatalf("failed to create source file: %v", err) + } + + hlm, err := New(tmpDir) + if err != nil { + t.Fatalf("failed to create hardlink manager: %v", err) + } + + return tmpDir, sourceDir, sourceFile, hlm +} + +// Helper function to generate a digest for testing +func generateTestDigest(content string) string { + hash := sha256.Sum256([]byte(content)) + return fmt.Sprintf("sha256:%x", hash[:]) +} + +func TestManager_RegisterAndGetDigest(t *testing.T) { + tmpDir, _, sourceFile, hlm := setupTestEnvironment(t) + testDigest := generateTestDigest("test content") + + t.Run("RegisterAndRetrieveDigest", func(t *testing.T) { + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + canonicalPath, exists := hlm.Get("test-key", testDigest, false) + if !exists { + t.Fatal("digest should exist") + } + expectedCanonicalPath := filepath.Join(tmpDir, "hardlinks", testDigest) + if canonicalPath != expectedCanonicalPath { + t.Fatalf("expected canonical path %q, got %q", expectedCanonicalPath, canonicalPath) + } + content, err := os.ReadFile(canonicalPath) + if err != nil { + t.Fatalf("failed to read canonical file: %v", err) + } + if string(content) != "test content" { + t.Fatalf("canonical file has wrong content: %q", string(content)) + } + }) + + t.Run("NonExistentFile", func(t *testing.T) { + nonExistentFile := filepath.Join(t.TempDir(), "nonexistent.txt") + nonexistentDigest := "sha256:nonexistent" + err := hlm.Enroll(nonexistentDigest, nonExistentFile, "", "") + if err == nil { + t.Fatal("should fail with nonexistent file") + } + }) + + t.Run("NonExistentDigest", func(t *testing.T) { + nonexistentDigest := "sha256:nonexistent" + _, exists := hlm.Get("test-key", nonexistentDigest, false) + if exists { + t.Fatal("should not find nonexistent digest") + } + }) +} + +func TestManager_Add(t *testing.T) { + tmpDir, _, sourceFile, hlm := setupTestEnvironment(t) + testDigest := generateTestDigest("test content") + + t.Run("AddAndRetrieve", func(t *testing.T) { + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + + targetPath := filepath.Join(tmpDir, "cache", "layer1", "chunk1") + if err := hlm.Add("chunk1", testDigest, targetPath); err != nil { + t.Fatalf("Add failed: %v", err) + } + + if _, err := os.Stat(targetPath); err != nil { + t.Fatalf("hardlink file should exist: %v", err) + } + + content, err := os.ReadFile(targetPath) + if err != nil { + t.Fatalf("failed to read hardlink file: %v", err) + } + if string(content) != "test content" { + t.Fatalf("hardlink file has wrong content: %q", string(content)) + } + }) + + t.Run("NonExistentDigest", func(t *testing.T) { + nonexistentDigest := "sha256:nonexistent" + targetPath := filepath.Join(tmpDir, "cache", "layer2", "chunk1") + err := hlm.Add("chunk1", nonexistentDigest, targetPath) + if err == nil { + t.Fatal("should fail with nonexistent digest") + } + }) + + t.Run("Remapping", func(t *testing.T) { + anotherFile := filepath.Join(t.TempDir(), "another.txt") + anotherContent := []byte("different content") + if err := os.WriteFile(anotherFile, anotherContent, 0600); err != nil { + t.Fatalf("failed to create another file: %v", err) + } + anotherDigest := generateTestDigest("different content") + if err := hlm.Enroll(anotherDigest, anotherFile, "", ""); err != nil { + t.Fatalf("Enroll failed for another digest: %v", err) + } + + targetPath1 := filepath.Join(tmpDir, "cache", "layer3", "chunk1") + if err := hlm.Add("chunk1", testDigest, targetPath1); err != nil { + t.Fatalf("Add failed for first digest: %v", err) + } + + targetPath2 := filepath.Join(tmpDir, "cache", "layer4", "chunk1") + if err := hlm.Add("chunk1", anotherDigest, targetPath2); err != nil { + t.Fatalf("Add failed for remapping: %v", err) + } + + canonicalPath, exists := hlm.Get("chunk1", anotherDigest, false) + if !exists { + t.Fatal("second digest should exist") + } + expectedCanonicalPath := filepath.Join(tmpDir, "hardlinks", anotherDigest) + if canonicalPath != expectedCanonicalPath { + t.Fatalf("expected canonical path %q, got %q", expectedCanonicalPath, canonicalPath) + } + }) +} + +func TestManager_CreateLink(t *testing.T) { + tmpDir, _, sourceFile, hlm := setupTestEnvironment(t) + testDigest := generateTestDigest("test content") + + t.Run("SuccessfulHardlink", func(t *testing.T) { + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + targetPath := filepath.Join(tmpDir, "hardlink-target.txt") + if err := hlm.Add("hardlink-key", testDigest, targetPath); err != nil { + t.Fatalf("Add should succeed: %v", err) + } + canonicalPath := filepath.Join(tmpDir, "hardlinks", testDigest) + canonicalStat, err := os.Stat(canonicalPath) + if err != nil { + t.Fatalf("failed to stat canonical: %v", err) + } + targetStat, err := os.Stat(targetPath) + if err != nil { + t.Fatalf("failed to stat target: %v", err) + } + if !os.SameFile(canonicalStat, targetStat) { + t.Fatal("files should reference the same inode") + } + }) + + t.Run("SamePathHardlink", func(t *testing.T) { + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + canonicalPath := filepath.Join(tmpDir, "hardlinks", testDigest) + if err := hlm.Add("same-path-key", testDigest, canonicalPath); err == nil { + t.Fatal("Add to canonical path should return error") + } + }) +} + +func TestManager_InMemoryState(t *testing.T) { + tmpDir, _, sourceFile, hlm := setupTestEnvironment(t) + testDigest := generateTestDigest("test content") + + t.Run("StateAvailable", func(t *testing.T) { + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + targetPath := filepath.Join(tmpDir, "cache", "layer5", "chunk1") + if err := hlm.Add("persist-test", testDigest, targetPath); err != nil { + t.Fatalf("Add failed: %v", err) + } + canonicalPath, exists := hlm.Get("persist-test", testDigest, false) + if !exists { + t.Fatal("digest mapping should exist") + } + expectedCanonicalPath := filepath.Join(tmpDir, "hardlinks", testDigest) + if canonicalPath != expectedCanonicalPath { + t.Fatalf("expected canonical path %q, got %q", expectedCanonicalPath, canonicalPath) + } + }) +} + +func TestManager_Concurrent(t *testing.T) { + tmpDir, _, sourceFile, hlm := setupTestEnvironment(t) + testDigest := generateTestDigest("test content") + if err := hlm.Enroll(testDigest, sourceFile, "", ""); err != nil { + t.Fatalf("Enroll failed: %v", err) + } + done := make(chan bool) + timeout := time.After(10 * time.Second) + go func() { + for i := 0; i < 100; i++ { + hlm.Get("test-key", testDigest, false) + } + done <- true + }() + go func() { + for i := 0; i < 100; i++ { + targetPath := filepath.Join(tmpDir, "cache", fmt.Sprintf("layer%d", i), "chunk1") + _ = hlm.Add(fmt.Sprintf("test-key-%d", i), testDigest, targetPath) + } + done <- true + }() + for i := 0; i < 2; i++ { + select { + case <-done: + continue + case <-timeout: + t.Fatal("concurrent test timed out") + } + } +} + +func TestManager_UtilityFunctions(t *testing.T) { + _, _, _, hlm := setupTestEnvironment(t) + + t.Run("Get_NilManager", func(t *testing.T) { + var nilManager *Manager + _, exists := nilManager.Get("key", "digest", false) + if exists { + t.Fatal("nil manager should return false") + } + }) + + t.Run("Get_EmptyDigest", func(t *testing.T) { + _, exists := hlm.Get("key", "", false) + if exists { + t.Fatal("empty digest should return false") + } + }) +} diff --git a/store/manager.go b/store/manager.go index b6c75f7d9..5afc91138 100644 --- a/store/manager.go +++ b/store/manager.go @@ -62,6 +62,7 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost func(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) []metadata.Decompressor { return []metadata.Decompressor{esgzexternaltoc.NewRemoteDecompressor(ctx, hosts, refspec, desc)} }, + nil, ) if err != nil { return nil, fmt.Errorf("failed to setup resolver: %w", err)