Skip to content

Commit 3351d93

Browse files
committed
feat: implement hardlinking for cache files to reduce storage usage
This commit adds a hardlink system for the Stargz Snapshotter cache to optimize storage and improve performance. The system intelligently creates hardlinks between identical content chunks, significantly reducing disk space usage in environments with many containers using the same base layers. Key changes: - Add new HardlinkManager that tracks files by chunk digest - Enable hardlinking between chunk files with same content - Add configuration option `EnableHardlink` to control the feature - Add documentation on hardlink usage and configuration The implementation includes: - Chunk-level digest tracking for optimizing cache lookups - Test suite for hardlink functionality Signed-off-by: ChengyuZhu6 <[email protected]>
1 parent 7d8fae5 commit 3351d93

File tree

8 files changed

+763
-85
lines changed

8 files changed

+763
-85
lines changed

cache/cache.go

Lines changed: 163 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ package cache
1818

1919
import (
2020
"bytes"
21-
"errors"
2221
"fmt"
2322
"io"
2423
"os"
2524
"path/filepath"
2625
"sync"
2726

27+
"github.com/containerd/log"
28+
"github.com/containerd/stargz-snapshotter/hardlink"
2829
"github.com/containerd/stargz-snapshotter/util/cacheutil"
2930
"github.com/containerd/stargz-snapshotter/util/namedmutex"
3031
"golang.org/x/sys/unix"
@@ -33,6 +34,10 @@ import (
3334
const (
3435
defaultMaxLRUCacheEntry = 10
3536
defaultMaxCacheFds = 10
37+
38+
// cache key namespaces to avoid collisions between raw keys and digests
39+
cacheKeyPrefixRaw = "raw:"
40+
cacheKeyPrefixDigest = "digest:"
3641
)
3742

3843
type DirectoryCacheConfig struct {
@@ -65,6 +70,9 @@ type DirectoryCacheConfig struct {
6570

6671
// FadvDontNeed forcefully clean fscache pagecache for saving memory.
6772
FadvDontNeed bool
73+
74+
// EnableHardlink enables hardlinking of cache files to reduce memory usage
75+
EnableHardlink bool
6876
}
6977

7078
// TODO: contents validation.
@@ -103,6 +111,7 @@ type Writer interface {
103111
type cacheOpt struct {
104112
direct bool
105113
passThrough bool
114+
chunkDigest string
106115
}
107116

108117
type Option func(o *cacheOpt) *cacheOpt
@@ -127,6 +136,14 @@ func PassThrough() Option {
127136
}
128137
}
129138

139+
// ChunkDigest option allows specifying a chunk digest for the cache
140+
func ChunkDigest(digest string) Option {
141+
return func(o *cacheOpt) *cacheOpt {
142+
o.chunkDigest = digest
143+
return o
144+
}
145+
}
146+
130147
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
131148
if !filepath.IsAbs(directory) {
132149
return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory)
@@ -178,8 +195,14 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
178195
bufPool: bufPool,
179196
direct: config.Direct,
180197
fadvDontNeed: config.FadvDontNeed,
198+
syncAdd: config.SyncAdd,
199+
}
200+
201+
// Initialize hardlink manager if enabled
202+
if config.EnableHardlink {
203+
dc.hlManager = hardlink.GetGlobalManager()
181204
}
182-
dc.syncAdd = config.SyncAdd
205+
183206
return dc, nil
184207
}
185208

@@ -199,6 +222,8 @@ type directoryCache struct {
199222

200223
closed bool
201224
closedMu sync.Mutex
225+
226+
hlManager *hardlink.Manager
202227
}
203228

204229
func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
@@ -211,9 +236,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
211236
opt = o(opt)
212237
}
213238

239+
// Try to get from memory cache
214240
if !dc.direct && !opt.direct {
215-
// Get data from memory
216-
if b, done, ok := dc.cache.Get(key); ok {
241+
// Try memory cache for digest or key, with namespaced keys to avoid collisions
242+
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)
243+
244+
if b, done, ok := dc.cache.Get(cacheKey); ok {
217245
return &reader{
218246
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
219247
closeFunc: func() error {
@@ -223,8 +251,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
223251
}, nil
224252
}
225253

226-
// Get data from disk. If the file is already opened, use it.
227-
if f, done, ok := dc.fileCache.Get(key); ok {
254+
// Get data from file cache for digest or key
255+
if f, done, ok := dc.fileCache.Get(cacheKey); ok {
228256
return &reader{
229257
ReaderAt: f.(*os.File),
230258
closeFunc: func() error {
@@ -235,10 +263,20 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
235263
}
236264
}
237265

266+
// First try regular file path
267+
filepath := buildCachePath(dc.directory, key)
268+
269+
if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
270+
if digestPath, exists := dc.hlManager.ProcessCacheGet(key, opt.chunkDigest, opt.direct); exists {
271+
log.L.Debugf("Using existing file for digest %q instead of key %q", opt.chunkDigest, key)
272+
filepath = digestPath
273+
}
274+
}
275+
238276
// Open the cache file and read the target region
239277
// TODO: If the target cache is write-in-progress, should we wait for the completion
240278
// or simply report the cache miss?
241-
file, err := os.Open(dc.cachePath(key))
279+
file, err := os.Open(filepath)
242280
if err != nil {
243281
return nil, fmt.Errorf("failed to open blob file for %q: %w", key, err)
244282
}
@@ -273,7 +311,9 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
273311
return &reader{
274312
ReaderAt: file,
275313
closeFunc: func() error {
276-
_, done, added := dc.fileCache.Add(key, file)
314+
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)
315+
316+
_, done, added := dc.fileCache.Add(cacheKey, file)
277317
defer done() // Release it immediately. Cleaned up on eviction.
278318
if !added {
279319
return file.Close() // file already exists in the cache. close it.
@@ -293,88 +333,73 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
293333
opt = o(opt)
294334
}
295335

296-
wip, err := dc.wipFile(key)
336+
// If hardlink manager exists and digest is provided, check if a hardlink can be created
337+
if dc.hlManager != nil && opt.chunkDigest != "" {
338+
keyPath := buildCachePath(dc.directory, key)
339+
340+
err := dc.hlManager.ProcessCacheAdd(key, opt.chunkDigest, keyPath)
341+
if err == nil {
342+
return &writer{
343+
WriteCloser: nopWriteCloser(io.Discard),
344+
commitFunc: func() error { return nil },
345+
abortFunc: func() error { return nil },
346+
}, nil
347+
}
348+
}
349+
350+
// Create temporary file
351+
w, err := wipFile(dc.wipDirectory, key)
297352
if err != nil {
298353
return nil, err
299354
}
300-
w := &writer{
301-
WriteCloser: wip,
355+
356+
// Create writer
357+
writer := &writer{
358+
WriteCloser: w,
302359
commitFunc: func() error {
303360
if dc.isClosed() {
304361
return fmt.Errorf("cache is already closed")
305362
}
306-
// Commit the cache contents
307-
c := dc.cachePath(key)
308-
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
309-
var errs []error
310-
if err := os.Remove(wip.Name()); err != nil {
311-
errs = append(errs, err)
312-
}
313-
errs = append(errs, fmt.Errorf("failed to create cache directory %q: %w", c, err))
314-
return errors.Join(errs...)
363+
364+
// Commit file
365+
targetPath := buildCachePath(dc.directory, key)
366+
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
367+
return fmt.Errorf("failed to create cache directory: %w", err)
315368
}
316369

317370
if dc.fadvDontNeed {
318-
if err := dropFilePageCache(wip); err != nil {
371+
if err := dropFilePageCache(w); err != nil {
319372
fmt.Printf("Warning: failed to drop page cache: %v\n", err)
320373
}
321374
}
322375

323-
return os.Rename(wip.Name(), c)
376+
if err := os.Rename(w.Name(), targetPath); err != nil {
377+
return fmt.Errorf("failed to commit cache file: %w", err)
378+
}
379+
380+
if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
381+
if err := dc.hlManager.RegisterDigestFile(opt.chunkDigest, targetPath, dc.directory, key); err != nil {
382+
return fmt.Errorf("failed to register digest file: %w", err)
383+
}
384+
}
385+
386+
return nil
324387
},
325388
abortFunc: func() error {
326-
return os.Remove(wip.Name())
389+
return os.Remove(w.Name())
327390
},
328391
}
329392

330393
// If "direct" option is specified, do not cache the passed data on memory.
331394
// This option is useful for preventing memory cache from being polluted by data
332395
// that won't be accessed immediately.
333396
if dc.direct || opt.direct {
334-
return w, nil
397+
return writer, nil
335398
}
336399

400+
// Create memory cache
337401
b := dc.bufPool.Get().(*bytes.Buffer)
338-
memW := &writer{
339-
WriteCloser: nopWriteCloser(io.Writer(b)),
340-
commitFunc: func() error {
341-
if dc.isClosed() {
342-
w.Close()
343-
return fmt.Errorf("cache is already closed")
344-
}
345-
cached, done, added := dc.cache.Add(key, b)
346-
if !added {
347-
dc.putBuffer(b) // already exists in the cache. abort it.
348-
}
349-
commit := func() error {
350-
defer done()
351-
defer w.Close()
352-
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
353-
if err != nil || n != cached.(*bytes.Buffer).Len() {
354-
w.Abort()
355-
return err
356-
}
357-
return w.Commit()
358-
}
359-
if dc.syncAdd {
360-
return commit()
361-
}
362-
go func() {
363-
if err := commit(); err != nil {
364-
fmt.Println("failed to commit to file:", err)
365-
}
366-
}()
367-
return nil
368-
},
369-
abortFunc: func() error {
370-
defer w.Close()
371-
defer w.Abort()
372-
dc.putBuffer(b) // abort it.
373-
return nil
374-
},
375-
}
376-
377-
return memW, nil
402+
return dc.wrapMemoryWriter(b, writer, key)
378403
}
379404

380405
func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
@@ -399,14 +424,6 @@ func (dc *directoryCache) isClosed() bool {
399424
return closed
400425
}
401426

402-
func (dc *directoryCache) cachePath(key string) string {
403-
return filepath.Join(dc.directory, key[:2], key)
404-
}
405-
406-
func (dc *directoryCache) wipFile(key string) (*os.File, error) {
407-
return os.CreateTemp(dc.wipDirectory, key+"-*")
408-
}
409-
410427
func NewMemoryCache() BlobCache {
411428
return &MemoryCache{
412429
Membuf: map[string]*bytes.Buffer{},
@@ -495,3 +512,77 @@ func dropFilePageCache(file *os.File) error {
495512
}
496513
return nil
497514
}
515+
516+
// wrapMemoryWriter wraps a writer with memory caching
517+
func (dc *directoryCache) wrapMemoryWriter(b *bytes.Buffer, w *writer, key string) (Writer, error) {
518+
return &writer{
519+
WriteCloser: nopWriteCloser(b),
520+
commitFunc: func() error {
521+
if dc.isClosed() {
522+
w.Close()
523+
return fmt.Errorf("cache is already closed")
524+
}
525+
526+
cached, done, added := dc.cache.Add(key, b)
527+
if !added {
528+
dc.putBuffer(b)
529+
}
530+
531+
commit := func() error {
532+
defer done()
533+
defer w.Close()
534+
535+
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
536+
if err != nil || n != cached.(*bytes.Buffer).Len() {
537+
w.Abort()
538+
return err
539+
}
540+
return w.Commit()
541+
}
542+
543+
if dc.syncAdd {
544+
return commit()
545+
}
546+
547+
go func() {
548+
if err := commit(); err != nil {
549+
log.L.Infof("failed to commit to file: %v", err)
550+
}
551+
}()
552+
return nil
553+
},
554+
abortFunc: func() error {
555+
defer w.Close()
556+
defer w.Abort()
557+
dc.putBuffer(b)
558+
return nil
559+
},
560+
}, nil
561+
}
562+
563+
// shouldUseDigestCacheKey determines whether to use the digest as the cache key.
564+
// Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty.
565+
func shouldUseDigestCacheKey(hlManager *hardlink.Manager, chunkDigest string) bool {
566+
return hlManager != nil && hlManager.IsEnabled() && chunkDigest != ""
567+
}
568+
569+
// getCacheKey returns the appropriate cache key based on whether digest-based caching should be used.
570+
// If digest-based caching is enabled, it returns a key with the digest prefix, otherwise with the raw key prefix.
571+
func getCacheKey(hlManager *hardlink.Manager, key string, chunkDigest string) string {
572+
if shouldUseDigestCacheKey(hlManager, chunkDigest) {
573+
return cacheKeyPrefixDigest + chunkDigest
574+
}
575+
return cacheKeyPrefixRaw + key
576+
}
577+
578+
func buildCachePath(directory string, key string) string {
579+
return filepath.Join(directory, key[:2], key)
580+
}
581+
582+
// WipFile creates a temporary file in the given directory with the given key pattern
583+
func wipFile(wipDirectory string, key string) (*os.File, error) {
584+
if err := os.MkdirAll(wipDirectory, 0700); err != nil {
585+
return nil, fmt.Errorf("failed to create wip directory: %w", err)
586+
}
587+
return os.CreateTemp(wipDirectory, key+"-*")
588+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/containerd/log"
3838
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
3939
"github.com/containerd/stargz-snapshotter/fusemanager"
40+
"github.com/containerd/stargz-snapshotter/hardlink"
4041
"github.com/containerd/stargz-snapshotter/service"
4142
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
4243
snbase "github.com/containerd/stargz-snapshotter/snapshot"
@@ -135,6 +136,13 @@ func main() {
135136
log.G(ctx).WithError(err).Fatalf("snapshotter is not supported")
136137
}
137138

139+
// Initialize global HardlinkManager using snapshotter root
140+
hlm, err := hardlink.NewHardlinkManager()
141+
if err != nil {
142+
log.G(ctx).WithError(err).Fatalf("failed to init hardlink manager")
143+
}
144+
hardlink.SetGlobalManager(hlm)
145+
138146
// Create a gRPC server
139147
rpc := grpc.NewServer()
140148

0 commit comments

Comments
 (0)