Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 159 additions & 72 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package cache

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"

"github.com/containerd/log"
"github.com/containerd/stargz-snapshotter/hardlink"
"github.com/containerd/stargz-snapshotter/util/cacheutil"
"github.com/containerd/stargz-snapshotter/util/namedmutex"
"golang.org/x/sys/unix"
Expand All @@ -33,6 +34,10 @@ import (
const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10

// cache key namespaces to avoid collisions between raw keys and digests
cacheKeyPrefixRaw = "raw:"
cacheKeyPrefixDigest = "digest:"
)

type DirectoryCacheConfig struct {
Expand Down Expand Up @@ -65,6 +70,9 @@ type DirectoryCacheConfig struct {

// FadvDontNeed forcefully clean fscache pagecache for saving memory.
FadvDontNeed bool

// HardlinkManager provides hardlink support when non-nil.
HardlinkManager *hardlink.Manager
}

// TODO: contents validation.
Expand Down Expand Up @@ -103,6 +111,7 @@ type Writer interface {
type cacheOpt struct {
direct bool
passThrough bool
chunkDigest string
}

type Option func(o *cacheOpt) *cacheOpt
Expand All @@ -127,6 +136,14 @@ func PassThrough() Option {
}
}

// ChunkDigest option allows specifying a chunk digest for the cache
func ChunkDigest(digest string) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use digest.Digest

return func(o *cacheOpt) *cacheOpt {
o.chunkDigest = digest
return o
}
}

func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
if !filepath.IsAbs(directory) {
return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory)
Expand Down Expand Up @@ -178,8 +195,10 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
bufPool: bufPool,
direct: config.Direct,
fadvDontNeed: config.FadvDontNeed,
syncAdd: config.SyncAdd,
hlManager: config.HardlinkManager,
}
dc.syncAdd = config.SyncAdd

return dc, nil
}

Expand All @@ -199,6 +218,8 @@ type directoryCache struct {

closed bool
closedMu sync.Mutex

hlManager *hardlink.Manager
}

func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
Expand All @@ -211,9 +232,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
opt = o(opt)
}

// Try to get from memory cache
if !dc.direct && !opt.direct {
// Get data from memory
if b, done, ok := dc.cache.Get(key); ok {
// Try memory cache for digest or key, with namespaced keys to avoid collisions
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)

if b, done, ok := dc.cache.Get(cacheKey); ok {
return &reader{
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
closeFunc: func() error {
Expand All @@ -223,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
}, nil
}

// Get data from disk. If the file is already opened, use it.
if f, done, ok := dc.fileCache.Get(key); ok {
// Get data from file cache for digest or key
if f, done, ok := dc.fileCache.Get(cacheKey); ok {
return &reader{
ReaderAt: f.(*os.File),
closeFunc: func() error {
Expand All @@ -235,10 +259,20 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
}
}

// First try regular file path
filepath := buildCachePath(dc.directory, key)

if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
if digestPath, exists := dc.hlManager.Get(key, opt.chunkDigest, opt.direct); exists {
log.L.Debugf("Using existing file for digest %q instead of key %q", opt.chunkDigest, key)
filepath = digestPath
}
}

// Open the cache file and read the target region
// TODO: If the target cache is write-in-progress, should we wait for the completion
// or simply report the cache miss?
file, err := os.Open(dc.cachePath(key))
file, err := os.Open(filepath)
if err != nil {
return nil, fmt.Errorf("failed to open blob file for %q: %w", key, err)
}
Expand Down Expand Up @@ -273,7 +307,9 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
return &reader{
ReaderAt: file,
closeFunc: func() error {
_, done, added := dc.fileCache.Add(key, file)
cacheKey := getCacheKey(dc.hlManager, key, opt.chunkDigest)

_, done, added := dc.fileCache.Add(cacheKey, file)
defer done() // Release it immediately. Cleaned up on eviction.
if !added {
return file.Close() // file already exists in the cache. close it.
Expand All @@ -293,88 +329,73 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
opt = o(opt)
}

wip, err := dc.wipFile(key)
// If hardlink manager exists and digest is provided, check if a hardlink can be created
if dc.hlManager != nil && opt.chunkDigest != "" {
keyPath := buildCachePath(dc.directory, key)

err := dc.hlManager.Add(key, opt.chunkDigest, keyPath)
if err == nil {
return &writer{
WriteCloser: nopWriteCloser(io.Discard),
commitFunc: func() error { return nil },
abortFunc: func() error { return nil },
}, nil
}
}

// Create temporary file
w, err := wipFile(dc.wipDirectory, key)
if err != nil {
return nil, err
}
w := &writer{
WriteCloser: wip,

// Create writer
writer := &writer{
WriteCloser: w,
commitFunc: func() error {
if dc.isClosed() {
return fmt.Errorf("cache is already closed")
}
// Commit the cache contents
c := dc.cachePath(key)
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
var errs []error
if err := os.Remove(wip.Name()); err != nil {
errs = append(errs, err)
}
errs = append(errs, fmt.Errorf("failed to create cache directory %q: %w", c, err))
return errors.Join(errs...)

// Commit file
targetPath := buildCachePath(dc.directory, key)
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}

if dc.fadvDontNeed {
if err := dropFilePageCache(wip); err != nil {
if err := dropFilePageCache(w); err != nil {
fmt.Printf("Warning: failed to drop page cache: %v\n", err)
}
}

return os.Rename(wip.Name(), c)
if err := os.Rename(w.Name(), targetPath); err != nil {
return fmt.Errorf("failed to commit cache file: %w", err)
}

if shouldUseDigestCacheKey(dc.hlManager, opt.chunkDigest) {
if err := dc.hlManager.Enroll(opt.chunkDigest, targetPath, dc.directory, key); err != nil {
return fmt.Errorf("failed to register digest file: %w", err)
}
}

return nil
},
abortFunc: func() error {
return os.Remove(wip.Name())
return os.Remove(w.Name())
},
}

// If "direct" option is specified, do not cache the passed data on memory.
// This option is useful for preventing memory cache from being polluted by data
// that won't be accessed immediately.
if dc.direct || opt.direct {
return w, nil
return writer, nil
}

// Create memory cache
b := dc.bufPool.Get().(*bytes.Buffer)
memW := &writer{
WriteCloser: nopWriteCloser(io.Writer(b)),
commitFunc: func() error {
if dc.isClosed() {
w.Close()
return fmt.Errorf("cache is already closed")
}
cached, done, added := dc.cache.Add(key, b)
if !added {
dc.putBuffer(b) // already exists in the cache. abort it.
}
commit := func() error {
defer done()
defer w.Close()
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
if err != nil || n != cached.(*bytes.Buffer).Len() {
w.Abort()
return err
}
return w.Commit()
}
if dc.syncAdd {
return commit()
}
go func() {
if err := commit(); err != nil {
fmt.Println("failed to commit to file:", err)
}
}()
return nil
},
abortFunc: func() error {
defer w.Close()
defer w.Abort()
dc.putBuffer(b) // abort it.
return nil
},
}

return memW, nil
return dc.wrapMemoryWriter(b, writer, key)
}

func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
Expand All @@ -399,14 +420,6 @@ func (dc *directoryCache) isClosed() bool {
return closed
}

func (dc *directoryCache) cachePath(key string) string {
return filepath.Join(dc.directory, key[:2], key)
}

func (dc *directoryCache) wipFile(key string) (*os.File, error) {
return os.CreateTemp(dc.wipDirectory, key+"-*")
}

func NewMemoryCache() BlobCache {
return &MemoryCache{
Membuf: map[string]*bytes.Buffer{},
Expand Down Expand Up @@ -495,3 +508,77 @@ func dropFilePageCache(file *os.File) error {
}
return nil
}

// wrapMemoryWriter wraps a writer with memory caching
func (dc *directoryCache) wrapMemoryWriter(b *bytes.Buffer, w *writer, key string) (Writer, error) {
return &writer{
WriteCloser: nopWriteCloser(b),
commitFunc: func() error {
if dc.isClosed() {
w.Close()
return fmt.Errorf("cache is already closed")
}

cached, done, added := dc.cache.Add(key, b)
if !added {
dc.putBuffer(b)
}

commit := func() error {
defer done()
defer w.Close()

n, err := w.Write(cached.(*bytes.Buffer).Bytes())
if err != nil || n != cached.(*bytes.Buffer).Len() {
w.Abort()
return err
}
return w.Commit()
}

if dc.syncAdd {
return commit()
}

go func() {
if err := commit(); err != nil {
log.L.Infof("failed to commit to file: %v", err)
}
}()
return nil
},
abortFunc: func() error {
defer w.Close()
defer w.Abort()
dc.putBuffer(b)
return nil
},
}, nil
}

// shouldUseDigestCacheKey determines whether to use the digest as the cache key.
// Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty.
func shouldUseDigestCacheKey(hlManager *hardlink.Manager, chunkDigest string) bool {
return hlManager != nil && chunkDigest != ""
}

// getCacheKey returns the appropriate cache key based on whether digest-based caching should be used.
// If digest-based caching is enabled, it returns a key with the digest prefix, otherwise with the raw key prefix.
func getCacheKey(hlManager *hardlink.Manager, key string, chunkDigest string) string {
if shouldUseDigestCacheKey(hlManager, chunkDigest) {
return cacheKeyPrefixDigest + chunkDigest
}
return cacheKeyPrefixRaw + key
}

func buildCachePath(directory string, key string) string {
return filepath.Join(directory, key[:2], key)
}

// WipFile creates a temporary file in the given directory with the given key pattern
func wipFile(wipDirectory string, key string) (*os.File, error) {
if err := os.MkdirAll(wipDirectory, 0700); err != nil {
return nil, fmt.Errorf("failed to create wip directory: %w", err)
}
return os.CreateTemp(wipDirectory, key+"-*")
}
13 changes: 13 additions & 0 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import (
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/log"
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
"github.com/containerd/stargz-snapshotter/fs"
"github.com/containerd/stargz-snapshotter/fusemanager"
"github.com/containerd/stargz-snapshotter/hardlink"
"github.com/containerd/stargz-snapshotter/service"
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
snbase "github.com/containerd/stargz-snapshotter/snapshot"
Expand Down Expand Up @@ -135,6 +137,14 @@ func main() {
log.G(ctx).WithError(err).Fatalf("snapshotter is not supported")
}

var hlm *hardlink.Manager
if config.HardlinkRoot != "" {
hlm, err = hardlink.New(config.HardlinkRoot)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to init hardlink manager")
}
}

// Create a gRPC server
rpc := grpc.NewServer()

Expand Down Expand Up @@ -252,6 +262,9 @@ func main() {
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure fs config")
}
if hlm != nil {
fsOpts = append(fsOpts, fs.WithHardlinkManager(hlm))
}

rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
Expand Down
Loading
Loading