@@ -18,13 +18,14 @@ package cache
1818
1919import (
2020 "bytes"
21- "errors"
2221 "fmt"
2322 "io"
2423 "os"
2524 "path/filepath"
2625 "sync"
2726
27+ "github.com/containerd/log"
28+ "github.com/containerd/stargz-snapshotter/hardlink"
2829 "github.com/containerd/stargz-snapshotter/util/cacheutil"
2930 "github.com/containerd/stargz-snapshotter/util/namedmutex"
3031 "golang.org/x/sys/unix"
@@ -33,6 +34,10 @@ import (
3334const (
3435 defaultMaxLRUCacheEntry = 10
3536 defaultMaxCacheFds = 10
37+
38+ // cache key namespaces to avoid collisions between raw keys and digests
39+ cacheKeyPrefixRaw = "raw:"
40+ cacheKeyPrefixDigest = "digest:"
3641)
3742
3843type DirectoryCacheConfig struct {
@@ -65,6 +70,9 @@ type DirectoryCacheConfig struct {
6570
6671 // FadvDontNeed forcefully clean fscache pagecache for saving memory.
6772 FadvDontNeed bool
73+
74+ // EnableHardlink enables hardlinking of cache files to reduce memory usage
75+ EnableHardlink bool
6876}
6977
7078// TODO: contents validation.
@@ -103,6 +111,7 @@ type Writer interface {
103111type cacheOpt struct {
104112 direct bool
105113 passThrough bool
114+ chunkDigest string
106115}
107116
108117type Option func (o * cacheOpt ) * cacheOpt
@@ -127,6 +136,14 @@ func PassThrough() Option {
127136 }
128137}
129138
139+ // ChunkDigest option allows specifying a chunk digest for the cache
140+ func ChunkDigest (digest string ) Option {
141+ return func (o * cacheOpt ) * cacheOpt {
142+ o .chunkDigest = digest
143+ return o
144+ }
145+ }
146+
130147func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
131148 if ! filepath .IsAbs (directory ) {
132149 return nil , fmt .Errorf ("dir cache path must be an absolute path; got %q" , directory )
@@ -178,8 +195,14 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
178195 bufPool : bufPool ,
179196 direct : config .Direct ,
180197 fadvDontNeed : config .FadvDontNeed ,
198+ syncAdd : config .SyncAdd ,
199+ }
200+
201+ // Initialize hardlink manager if enabled
202+ if config .EnableHardlink {
203+ dc .hlManager = hardlink .GetGlobalManager ()
181204 }
182- dc . syncAdd = config . SyncAdd
205+
183206 return dc , nil
184207}
185208
@@ -199,6 +222,8 @@ type directoryCache struct {
199222
200223 closed bool
201224 closedMu sync.Mutex
225+
226+ hlManager * hardlink.HardlinkManager
202227}
203228
204229func (dc * directoryCache ) Get (key string , opts ... Option ) (Reader , error ) {
@@ -211,9 +236,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
211236 opt = o (opt )
212237 }
213238
239+ // Try to get from memory cache
214240 if ! dc .direct && ! opt .direct {
215- // Get data from memory
216- if b , done , ok := dc .cache .Get (key ); ok {
241+ // Try memory cache for digest or key, with namespaced keys to avoid collisions
242+ cacheKey := cacheKeyPrefixRaw + key
243+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
244+ cacheKey = cacheKeyPrefixDigest + opt .chunkDigest
245+ }
246+
247+ if b , done , ok := dc .cache .Get (cacheKey ); ok {
217248 return & reader {
218249 ReaderAt : bytes .NewReader (b .(* bytes.Buffer ).Bytes ()),
219250 closeFunc : func () error {
@@ -223,8 +254,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
223254 }, nil
224255 }
225256
226- // Get data from disk. If the file is already opened, use it.
227- if f , done , ok := dc .fileCache .Get (key ); ok {
257+ // Get data from file cache for digest or key
258+ if f , done , ok := dc .fileCache .Get (cacheKey ); ok {
228259 return & reader {
229260 ReaderAt : f .(* os.File ),
230261 closeFunc : func () error {
@@ -235,10 +266,20 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
235266 }
236267 }
237268
269+ // First try regular file path
270+ filepath := buildCachePath (dc .directory , key )
271+
272+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
273+ if digestPath , exists := dc .hlManager .ProcessCacheGet (key , opt .chunkDigest , opt .direct ); exists {
274+ log .L .Debugf ("Using existing file for digest %q instead of key %q" , opt .chunkDigest , key )
275+ filepath = digestPath
276+ }
277+ }
278+
238279 // Open the cache file and read the target region
239280 // TODO: If the target cache is write-in-progress, should we wait for the completion
240281 // or simply report the cache miss?
241- file , err := os .Open (dc . cachePath ( key ) )
282+ file , err := os .Open (filepath )
242283 if err != nil {
243284 return nil , fmt .Errorf ("failed to open blob file for %q: %w" , key , err )
244285 }
@@ -273,7 +314,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
273314 return & reader {
274315 ReaderAt : file ,
275316 closeFunc : func () error {
276- _ , done , added := dc .fileCache .Add (key , file )
317+ cacheKey := cacheKeyPrefixRaw + key
318+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
319+ cacheKey = cacheKeyPrefixDigest + opt .chunkDigest
320+ }
321+
322+ _ , done , added := dc .fileCache .Add (cacheKey , file )
277323 defer done () // Release it immediately. Cleaned up on eviction.
278324 if ! added {
279325 return file .Close () // file already exists in the cache. close it.
@@ -293,88 +339,78 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
293339 opt = o (opt )
294340 }
295341
296- wip , err := dc .wipFile (key )
342+ // If hardlink manager exists and digest is provided, check if a hardlink can be created
343+ if dc .hlManager != nil && opt .chunkDigest != "" {
344+ keyPath := buildCachePath (dc .directory , key )
345+
346+ err := dc .hlManager .ProcessCacheAdd (key , opt .chunkDigest , keyPath )
347+ if err == nil {
348+ return & writer {
349+ WriteCloser : nopWriteCloser (io .Discard ),
350+ commitFunc : func () error { return nil },
351+ abortFunc : func () error { return nil },
352+ }, nil
353+ }
354+ }
355+
356+ // Create temporary file
357+ w , err := wipFile (dc .wipDirectory , key )
297358 if err != nil {
298359 return nil , err
299360 }
300- w := & writer {
301- WriteCloser : wip ,
361+
362+ // Create writer
363+ writer := & writer {
364+ WriteCloser : w ,
302365 commitFunc : func () error {
303366 if dc .isClosed () {
304367 return fmt .Errorf ("cache is already closed" )
305368 }
306- // Commit the cache contents
307- c := dc .cachePath (key )
308- if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
309- var errs []error
310- if err := os .Remove (wip .Name ()); err != nil {
311- errs = append (errs , err )
312- }
313- errs = append (errs , fmt .Errorf ("failed to create cache directory %q: %w" , c , err ))
314- return errors .Join (errs ... )
369+
370+ // Commit file
371+ targetPath := buildCachePath (dc .directory , key )
372+ if err := os .MkdirAll (filepath .Dir (targetPath ), 0700 ); err != nil {
373+ return fmt .Errorf ("failed to create cache directory: %w" , err )
315374 }
316375
317376 if dc .fadvDontNeed {
318- if err := dropFilePageCache (wip ); err != nil {
377+ if err := dropFilePageCache (w ); err != nil {
319378 fmt .Printf ("Warning: failed to drop page cache: %v\n " , err )
320379 }
321380 }
322381
323- return os .Rename (wip .Name (), c )
382+ if err := os .Rename (w .Name (), targetPath ); err != nil {
383+ return fmt .Errorf ("failed to commit cache file: %w" , err )
384+ }
385+
386+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
387+ if err := dc .hlManager .RegisterDigestFile (opt .chunkDigest , targetPath ); err != nil {
388+ return fmt .Errorf ("failed to register digest file: %w" , err )
389+ }
390+
391+ internalKey := dc .hlManager .GenerateInternalKey (dc .directory , key )
392+ if err := dc .hlManager .MapKeyToDigest (internalKey , opt .chunkDigest ); err != nil {
393+ return fmt .Errorf ("failed to map key to digest: %w" , err )
394+ }
395+ }
396+
397+ return nil
324398 },
325399 abortFunc : func () error {
326- return os .Remove (wip .Name ())
400+ return os .Remove (w .Name ())
327401 },
328402 }
329403
330404 // If "direct" option is specified, do not cache the passed data on memory.
331405 // This option is useful for preventing memory cache from being polluted by data
332406 // that won't be accessed immediately.
333407 if dc .direct || opt .direct {
334- return w , nil
408+ return writer , nil
335409 }
336410
411+ // Create memory cache
337412 b := dc .bufPool .Get ().(* bytes.Buffer )
338- memW := & writer {
339- WriteCloser : nopWriteCloser (io .Writer (b )),
340- commitFunc : func () error {
341- if dc .isClosed () {
342- w .Close ()
343- return fmt .Errorf ("cache is already closed" )
344- }
345- cached , done , added := dc .cache .Add (key , b )
346- if ! added {
347- dc .putBuffer (b ) // already exists in the cache. abort it.
348- }
349- commit := func () error {
350- defer done ()
351- defer w .Close ()
352- n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
353- if err != nil || n != cached .(* bytes.Buffer ).Len () {
354- w .Abort ()
355- return err
356- }
357- return w .Commit ()
358- }
359- if dc .syncAdd {
360- return commit ()
361- }
362- go func () {
363- if err := commit (); err != nil {
364- fmt .Println ("failed to commit to file:" , err )
365- }
366- }()
367- return nil
368- },
369- abortFunc : func () error {
370- defer w .Close ()
371- defer w .Abort ()
372- dc .putBuffer (b ) // abort it.
373- return nil
374- },
375- }
376-
377- return memW , nil
413+ return dc .wrapMemoryWriter (b , writer , key )
378414}
379415
380416func (dc * directoryCache ) putBuffer (b * bytes.Buffer ) {
@@ -399,14 +435,6 @@ func (dc *directoryCache) isClosed() bool {
399435 return closed
400436}
401437
402- func (dc * directoryCache ) cachePath (key string ) string {
403- return filepath .Join (dc .directory , key [:2 ], key )
404- }
405-
406- func (dc * directoryCache ) wipFile (key string ) (* os.File , error ) {
407- return os .CreateTemp (dc .wipDirectory , key + "-*" )
408- }
409-
410438func NewMemoryCache () BlobCache {
411439 return & MemoryCache {
412440 Membuf : map [string ]* bytes.Buffer {},
@@ -495,3 +523,68 @@ func dropFilePageCache(file *os.File) error {
495523 }
496524 return nil
497525}
526+
527+ // wrapMemoryWriter wraps a writer with memory caching
528+ func (dc * directoryCache ) wrapMemoryWriter (b * bytes.Buffer , w * writer , key string ) (Writer , error ) {
529+ return & writer {
530+ WriteCloser : nopWriteCloser (b ),
531+ commitFunc : func () error {
532+ if dc .isClosed () {
533+ w .Close ()
534+ return fmt .Errorf ("cache is already closed" )
535+ }
536+
537+ cached , done , added := dc .cache .Add (key , b )
538+ if ! added {
539+ dc .putBuffer (b )
540+ }
541+
542+ commit := func () error {
543+ defer done ()
544+ defer w .Close ()
545+
546+ n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
547+ if err != nil || n != cached .(* bytes.Buffer ).Len () {
548+ w .Abort ()
549+ return err
550+ }
551+ return w .Commit ()
552+ }
553+
554+ if dc .syncAdd {
555+ return commit ()
556+ }
557+
558+ go func () {
559+ if err := commit (); err != nil {
560+ log .L .Infof ("failed to commit to file: %v" , err )
561+ }
562+ }()
563+ return nil
564+ },
565+ abortFunc : func () error {
566+ defer w .Close ()
567+ defer w .Abort ()
568+ dc .putBuffer (b )
569+ return nil
570+ },
571+ }, nil
572+ }
573+
574+ // shouldUseDigestCacheKey determines whether to use the digest as the cache key.
575+ // Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty.
576+ func shouldUseDigestCacheKey (hlManager * hardlink.HardlinkManager , chunkDigest string ) bool {
577+ return hlManager != nil && hlManager .IsEnabled () && chunkDigest != ""
578+ }
579+
580+ func buildCachePath (directory string , key string ) string {
581+ return filepath .Join (directory , key [:2 ], key )
582+ }
583+
584+ // WipFile creates a temporary file in the given directory with the given key pattern
585+ func wipFile (wipDirectory string , key string ) (* os.File , error ) {
586+ if err := os .MkdirAll (wipDirectory , 0700 ); err != nil {
587+ return nil , fmt .Errorf ("failed to create wip directory: %w" , err )
588+ }
589+ return os .CreateTemp (wipDirectory , key + "-*" )
590+ }
0 commit comments