Skip to content

Commit e376da9

Browse files
committed
feat: implement hardlinking for cache files to reduce storage usage
Implement hardlink management for Stargz Snapshotter cache to reduce storage usage by deduplicating identical content chunks. Implementation: - Store canonical files in {root}/hardlinks/ directory - Use nlink-based garbage collection (remove when nlink == 1) - Track chunk digests with LRU cache for efficient lookups - Create hardlinks from canonical files to cache locations Configuration: - Add HardlinkRoot option for hardlink storage directory API: - Enroll(): Register canonical files by digest - Get(): Retrieve canonical file paths - Add(): Create hardlinks to target locations Signed-off-by: ChengyuZhu6 <[email protected]>
1 parent 60de78b commit e376da9

File tree

10 files changed

+771
-91
lines changed

10 files changed

+771
-91
lines changed

cache/cache.go

Lines changed: 159 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ package cache
1818

1919
import (
2020
"bytes"
21-
"errors"
2221
"fmt"
2322
"io"
2423
"os"
2524
"path/filepath"
2625
"sync"
2726

27+
"github.com/containerd/log"
28+
"github.com/containerd/stargz-snapshotter/hardlink"
2829
"github.com/containerd/stargz-snapshotter/util/cacheutil"
2930
"github.com/containerd/stargz-snapshotter/util/namedmutex"
3031
"golang.org/x/sys/unix"
@@ -33,6 +34,10 @@ import (
3334
const (
3435
defaultMaxLRUCacheEntry = 10
3536
defaultMaxCacheFds = 10
37+
38+
// cache key namespaces to avoid collisions between raw keys and digests
39+
cacheKeyPrefixRaw = "raw:"
40+
cacheKeyPrefixDigest = "digest:"
3641
)
3742

3843
type DirectoryCacheConfig struct {
@@ -65,6 +70,9 @@ type DirectoryCacheConfig struct {
6570

6671
// FadvDontNeed forcefully clean fscache pagecache for saving memory.
6772
FadvDontNeed bool
73+
74+
// HardlinkManager provides hardlink support when non-nil.
75+
HardlinkManager *hardlink.Manager
6876
}
6977

7078
// TODO: contents validation.
@@ -103,6 +111,7 @@ type Writer interface {
103111
type cacheOpt struct {
104112
direct bool
105113
passThrough bool
114+
chunkDigest string
106115
}
107116

108117
type Option func(o *cacheOpt) *cacheOpt
@@ -127,6 +136,14 @@ func PassThrough() Option {
127136
}
128137
}
129138

139+
// ChunkDigest option allows specifying a chunk digest for the cache
140+
func ChunkDigest(digest string) Option {
141+
return func(o *cacheOpt) *cacheOpt {
142+
o.chunkDigest = digest
143+
return o
144+
}
145+
}
146+
130147
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
131148
if !filepath.IsAbs(directory) {
132149
return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory)
@@ -178,8 +195,10 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
178195
bufPool: bufPool,
179196
direct: config.Direct,
180197
fadvDontNeed: config.FadvDontNeed,
198+
syncAdd: config.SyncAdd,
199+
hlManager: config.HardlinkManager,
181200
}
182-
dc.syncAdd = config.SyncAdd
201+
183202
return dc, nil
184203
}
185204

@@ -199,6 +218,8 @@ type directoryCache struct {
199218

200219
closed bool
201220
closedMu sync.Mutex
221+
222+
hlManager *hardlink.Manager
202223
}
203224

204225
func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
@@ -211,9 +232,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
211232
opt = o(opt)
212233
}
213234

235+
// Try to get from memory cache
214236
if !dc.direct && !opt.direct {
215-
// Get data from memory
216-
if b, done, ok := dc.cache.Get(key); ok {
237+
// Try memory cache for digest or key, with namespaced keys to avoid collisions
238+
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)
239+
240+
if b, done, ok := dc.cache.Get(cacheKey); ok {
217241
return &reader{
218242
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
219243
closeFunc: func() error {
@@ -223,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
223247
}, nil
224248
}
225249

226-
// Get data from disk. If the file is already opened, use it.
227-
if f, done, ok := dc.fileCache.Get(key); ok {
250+
// Get data from file cache for digest or key
251+
if f, done, ok := dc.fileCache.Get(cacheKey); ok {
228252
return &reader{
229253
ReaderAt: f.(*os.File),
230254
closeFunc: func() error {
@@ -235,10 +259,20 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
235259
}
236260
}
237261

262+
// First try regular file path
263+
filepath := buildCachePath(dc.directory, key)
264+
265+
if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
266+
if digestPath, exists := dc.hlManager.Get(key, opt.chunkDigest, opt.direct); exists {
267+
log.L.Debugf("Using existing file for digest %q instead of key %q", opt.chunkDigest, key)
268+
filepath = digestPath
269+
}
270+
}
271+
238272
// Open the cache file and read the target region
239273
// TODO: If the target cache is write-in-progress, should we wait for the completion
240274
// or simply report the cache miss?
241-
file, err := os.Open(dc.cachePath(key))
275+
file, err := os.Open(filepath)
242276
if err != nil {
243277
return nil, fmt.Errorf("failed to open blob file for %q: %w", key, err)
244278
}
@@ -273,7 +307,9 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
273307
return &reader{
274308
ReaderAt: file,
275309
closeFunc: func() error {
276-
_, done, added := dc.fileCache.Add(key, file)
310+
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)
311+
312+
_, done, added := dc.fileCache.Add(cacheKey, file)
277313
defer done() // Release it immediately. Cleaned up on eviction.
278314
if !added {
279315
return file.Close() // file already exists in the cache. close it.
@@ -293,88 +329,73 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
293329
opt = o(opt)
294330
}
295331

296-
wip, err := dc.wipFile(key)
332+
// If hardlink manager exists and digest is provided, check if a hardlink can be created
333+
if dc.hlManager != nil && opt.chunkDigest != "" {
334+
keyPath := buildCachePath(dc.directory, key)
335+
336+
err := dc.hlManager.Add(key, opt.chunkDigest, keyPath)
337+
if err == nil {
338+
return &writer{
339+
WriteCloser: nopWriteCloser(io.Discard),
340+
commitFunc: func() error { return nil },
341+
abortFunc: func() error { return nil },
342+
}, nil
343+
}
344+
}
345+
346+
// Create temporary file
347+
w, err := wipFile(dc.wipDirectory, key)
297348
if err != nil {
298349
return nil, err
299350
}
300-
w := &writer{
301-
WriteCloser: wip,
351+
352+
// Create writer
353+
writer := &writer{
354+
WriteCloser: w,
302355
commitFunc: func() error {
303356
if dc.isClosed() {
304357
return fmt.Errorf("cache is already closed")
305358
}
306-
// Commit the cache contents
307-
c := dc.cachePath(key)
308-
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
309-
var errs []error
310-
if err := os.Remove(wip.Name()); err != nil {
311-
errs = append(errs, err)
312-
}
313-
errs = append(errs, fmt.Errorf("failed to create cache directory %q: %w", c, err))
314-
return errors.Join(errs...)
359+
360+
// Commit file
361+
targetPath := buildCachePath(dc.directory, key)
362+
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
363+
return fmt.Errorf("failed to create cache directory: %w", err)
315364
}
316365

317366
if dc.fadvDontNeed {
318-
if err := dropFilePageCache(wip); err != nil {
367+
if err := dropFilePageCache(w); err != nil {
319368
fmt.Printf("Warning: failed to drop page cache: %v\n", err)
320369
}
321370
}
322371

323-
return os.Rename(wip.Name(), c)
372+
if err := os.Rename(w.Name(), targetPath); err != nil {
373+
return fmt.Errorf("failed to commit cache file: %w", err)
374+
}
375+
376+
if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
377+
if err := dc.hlManager.Enroll(opt.chunkDigest, targetPath, dc.directory, key); err != nil {
378+
return fmt.Errorf("failed to register digest file: %w", err)
379+
}
380+
}
381+
382+
return nil
324383
},
325384
abortFunc: func() error {
326-
return os.Remove(wip.Name())
385+
return os.Remove(w.Name())
327386
},
328387
}
329388

330389
// If "direct" option is specified, do not cache the passed data on memory.
331390
// This option is useful for preventing memory cache from being polluted by data
332391
// that won't be accessed immediately.
333392
if dc.direct || opt.direct {
334-
return w, nil
393+
return writer, nil
335394
}
336395

396+
// Create memory cache
337397
b := dc.bufPool.Get().(*bytes.Buffer)
338-
memW := &writer{
339-
WriteCloser: nopWriteCloser(io.Writer(b)),
340-
commitFunc: func() error {
341-
if dc.isClosed() {
342-
w.Close()
343-
return fmt.Errorf("cache is already closed")
344-
}
345-
cached, done, added := dc.cache.Add(key, b)
346-
if !added {
347-
dc.putBuffer(b) // already exists in the cache. abort it.
348-
}
349-
commit := func() error {
350-
defer done()
351-
defer w.Close()
352-
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
353-
if err != nil || n != cached.(*bytes.Buffer).Len() {
354-
w.Abort()
355-
return err
356-
}
357-
return w.Commit()
358-
}
359-
if dc.syncAdd {
360-
return commit()
361-
}
362-
go func() {
363-
if err := commit(); err != nil {
364-
fmt.Println("failed to commit to file:", err)
365-
}
366-
}()
367-
return nil
368-
},
369-
abortFunc: func() error {
370-
defer w.Close()
371-
defer w.Abort()
372-
dc.putBuffer(b) // abort it.
373-
return nil
374-
},
375-
}
376-
377-
return memW, nil
398+
return dc.wrapMemoryWriter(b, writer, key)
378399
}
379400

380401
func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
@@ -399,14 +420,6 @@ func (dc *directoryCache) isClosed() bool {
399420
return closed
400421
}
401422

402-
func (dc *directoryCache) cachePath(key string) string {
403-
return filepath.Join(dc.directory, key[:2], key)
404-
}
405-
406-
func (dc *directoryCache) wipFile(key string) (*os.File, error) {
407-
return os.CreateTemp(dc.wipDirectory, key+"-*")
408-
}
409-
410423
func NewMemoryCache() BlobCache {
411424
return &MemoryCache{
412425
Membuf: map[string]*bytes.Buffer{},
@@ -495,3 +508,77 @@ func dropFilePageCache(file *os.File) error {
495508
}
496509
return nil
497510
}
511+
512+
// wrapMemoryWriter wraps a writer with memory caching
513+
func (dc *directoryCache) wrapMemoryWriter(b *bytes.Buffer, w *writer, key string) (Writer, error) {
514+
return &writer{
515+
WriteCloser: nopWriteCloser(b),
516+
commitFunc: func() error {
517+
if dc.isClosed() {
518+
w.Close()
519+
return fmt.Errorf("cache is already closed")
520+
}
521+
522+
cached, done, added := dc.cache.Add(key, b)
523+
if !added {
524+
dc.putBuffer(b)
525+
}
526+
527+
commit := func() error {
528+
defer done()
529+
defer w.Close()
530+
531+
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
532+
if err != nil || n != cached.(*bytes.Buffer).Len() {
533+
w.Abort()
534+
return err
535+
}
536+
return w.Commit()
537+
}
538+
539+
if dc.syncAdd {
540+
return commit()
541+
}
542+
543+
go func() {
544+
if err := commit(); err != nil {
545+
log.L.Infof("failed to commit to file: %v", err)
546+
}
547+
}()
548+
return nil
549+
},
550+
abortFunc: func() error {
551+
defer w.Close()
552+
defer w.Abort()
553+
dc.putBuffer(b)
554+
return nil
555+
},
556+
}, nil
557+
}
558+
559+
// shouldUseDigestCacheKey determines whether to use the digest as the cache key.
560+
// Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty.
561+
func shouldUseDigestCacheKey(hlManager *hardlink.Manager, chunkDigest string) bool {
562+
return hlManager != nil && chunkDigest != ""
563+
}
564+
565+
// getCacheKey returns the appropriate cache key based on whether digest-based caching should be used.
566+
// If digest-based caching is enabled, it returns a key with the digest prefix, otherwise with the raw key prefix.
567+
func getCacheKey(hlManager *hardlink.Manager, key string, chunkDigest string) string {
568+
if shouldUseDigestCacheKey(hlManager, chunkDigest) {
569+
return cacheKeyPrefixDigest + chunkDigest
570+
}
571+
return cacheKeyPrefixRaw + key
572+
}
573+
574+
func buildCachePath(directory string, key string) string {
575+
return filepath.Join(directory, key[:2], key)
576+
}
577+
578+
// WipFile creates a temporary file in the given directory with the given key pattern
579+
func wipFile(wipDirectory string, key string) (*os.File, error) {
580+
if err := os.MkdirAll(wipDirectory, 0700); err != nil {
581+
return nil, fmt.Errorf("failed to create wip directory: %w", err)
582+
}
583+
return os.CreateTemp(wipDirectory, key+"-*")
584+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ import (
3636
"github.com/containerd/containerd/v2/pkg/sys"
3737
"github.com/containerd/log"
3838
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
39+
"github.com/containerd/stargz-snapshotter/fs"
3940
"github.com/containerd/stargz-snapshotter/fusemanager"
41+
"github.com/containerd/stargz-snapshotter/hardlink"
4042
"github.com/containerd/stargz-snapshotter/service"
4143
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
4244
snbase "github.com/containerd/stargz-snapshotter/snapshot"
@@ -135,6 +137,14 @@ func main() {
135137
log.G(ctx).WithError(err).Fatalf("snapshotter is not supported")
136138
}
137139

140+
var hlm *hardlink.Manager
141+
if config.HardlinkRoot != "" {
142+
hlm, err = hardlink.New(config.HardlinkRoot)
143+
if err != nil {
144+
log.G(ctx).WithError(err).Fatalf("failed to init hardlink manager")
145+
}
146+
}
147+
138148
// Create a gRPC server
139149
rpc := grpc.NewServer()
140150

@@ -252,6 +262,9 @@ func main() {
252262
if err != nil {
253263
log.G(ctx).WithError(err).Fatalf("failed to configure fs config")
254264
}
265+
if hlm != nil {
266+
fsOpts = append(fsOpts, fs.WithHardlinkManager(hlm))
267+
}
255268

256269
rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
257270
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))

0 commit comments

Comments
 (0)