Skip to content

Commit 9d0c3bc

Browse files
author
abushwang
committed
fix fscache not cleanup
Signed-off-by: abushwang <[email protected]>
1 parent 9ef808e commit 9d0c3bc

File tree

6 files changed

+56
-16
lines changed

6 files changed

+56
-16
lines changed

cache/cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
214214
return &reader{
215215
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
216216
closeFunc: func() error {
217-
done()
217+
done(false)
218218
return nil
219219
},
220220
}, nil
@@ -225,7 +225,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
225225
return &reader{
226226
ReaderAt: f.(*os.File),
227227
closeFunc: func() error {
228-
done() // file will be closed when it's evicted from the cache
228+
done(false) // file will be closed when it's evicted from the cache
229229
return nil
230230
},
231231
}, nil
@@ -257,7 +257,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
257257
ReaderAt: file,
258258
closeFunc: func() error {
259259
_, done, added := dc.fileCache.Add(key, file)
260-
defer done() // Release it immediately. Cleaned up on eviction.
260+
defer done(false) // Release it immediately. Cleaned up on eviction.
261261
if !added {
262262
return file.Close() // file already exists in the cache. close it.
263263
}
@@ -323,7 +323,7 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
323323
dc.putBuffer(b) // already exists in the cache. abort it.
324324
}
325325
commit := func() error {
326-
defer done()
326+
defer done(false)
327327
defer w.Close()
328328
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
329329
if err != nil || n != cached.(*bytes.Buffer).Len() {

fs/fs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,9 @@ func (fs *filesystem) Unmount(ctx context.Context, mountpoint string) error {
12291229
// If the mountpoint is an id-mapped layer, it is pointing to the
12301230
// underlying layer, so we cannot call done on it.
12311231
if !isIDMappedDir(mountpoint) {
1232-
l.Done()
1232+
if err := l.Close(); err != nil {
1233+
log.G(ctx).WithError(err).Warn("failed to release resources of the layer")
1234+
}
12331235
}
12341236
fs.layerMu.Unlock()
12351237
fs.metricsController.Remove(mountpoint)

fs/layer/layer.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ type Layer interface {
104104
// Done releases the reference to this layer. The resources related to this layer will be
105105
// discarded sooner or later. Queries after calling this function won't be serviced.
106106
Done()
107+
108+
// Close is the same as Done, but evicts resources related to this layer immediately.
109+
// This is used for cleaning up resources on unmount.
110+
Close() error
107111
}
108112

109113
// Info is the current status of a layer.
@@ -247,7 +251,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
247251
return &layerRef{l, done}, nil
248252
}
249253
// Cached layer is invalid
250-
done()
254+
done(true)
251255
r.layerCacheMu.Lock()
252256
r.layerCache.Remove(name)
253257
r.layerCacheMu.Unlock()
@@ -262,7 +266,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
262266
}
263267
defer func() {
264268
if retErr != nil {
265-
blobR.done()
269+
blobR.done(true)
266270
}
267271
}()
268272

@@ -364,7 +368,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts []docker.RegistryHost,
364368
return &blobRef{blob, done}, nil
365369
}
366370
// invalid blob. discard this.
367-
done()
371+
done(true)
368372
r.blobCacheMu.Lock()
369373
r.blobCache.Remove(name)
370374
r.blobCacheMu.Unlock()
@@ -454,7 +458,12 @@ func (l *layer) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspe
454458
}
455459

456460
func (l *layerRef) Done() {
457-
l.done()
461+
l.done(false)
462+
}
463+
464+
func (l *layerRef) Close() error {
465+
l.done(true)
466+
return nil
458467
}
459468

460469
func (l *layer) RootNode(baseInode uint32, idMapper idtools.IDMap) (fusefs.InodeEmbedder, error) {
@@ -482,7 +491,7 @@ func (l *layer) close() error {
482491
if l.bgResolver != nil {
483492
l.bgResolver.Close()
484493
}
485-
defer l.blob.done() // Close reader first, then close the blob
494+
defer l.blob.done(true) // Close reader first, then close the blob
486495
return l.r.Close()
487496
}
488497

@@ -511,15 +520,15 @@ func getDisableXAttrAnnotation(desc ocispec.Descriptor) bool {
511520
// to this blob will be discarded.
512521
type blobRef struct {
513522
remote.Blob
514-
done func()
523+
done func(bool)
515524
}
516525

517526
// layerRef is a reference to the layer in the cache. Calling `Done` or `done` decreases the
518527
// reference counter of this blob in the underlying cache. When nobody refers to the layer in the
519528
// cache, resources bound to this layer will be discarded.
520529
type layerRef struct {
521530
*layer
522-
done func()
531+
done func(bool)
523532
}
524533

525534
type readerAtFunc func([]byte, int64) (int, error)

fs/reader/reader.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ func (gr *reader) Close() (retErr error) {
128128
return nil
129129
}
130130
gr.closed = true
131+
if gr.spanManager != nil {
132+
gr.spanManager.Close()
133+
}
131134
if err := gr.r.Close(); err != nil {
132135
retErr = errors.Join(retErr, err)
133136
}

snapshot/snapshot.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,26 @@ func (o *snapshotter) restoreRemoteSnapshot(ctx context.Context) error {
11171117
return err
11181118
}
11191119
for _, info := range task {
1120+
if err := func() error {
1121+
ctx, t, err := o.ms.TransactionContext(ctx, false)
1122+
if err != nil {
1123+
return err
1124+
}
1125+
defer t.Rollback()
1126+
id, _, _, err := storage.GetInfo(ctx, info.Name)
1127+
if err != nil {
1128+
return err
1129+
}
1130+
if err := os.Mkdir(filepath.Join(o.root, "snapshots", id), 0700); err != nil && !os.IsExist(err) {
1131+
return err
1132+
}
1133+
if err := os.Mkdir(o.upperPath(id), 0755); err != nil && !os.IsExist(err) {
1134+
return err
1135+
}
1136+
return nil
1137+
}(); err != nil {
1138+
return fmt.Errorf("failed to create remote snapshot directory: %s: %w", info.Name, err)
1139+
}
11201140
ns, ok := info.Labels[source.TargetNamespace]
11211141
if !ok {
11221142
return ErrNoNamespace

util/lrucache/lrucache.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func New(maxEntries int) *Cache {
6767
// Get retrieves the specified object from the cache and increments the reference counter of the
6868
// target content. Client must call `done` callback to decrease the reference count when the value
6969
// will no longer be used.
70-
func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
70+
func (c *Cache) Get(key string) (value interface{}, done func(bool), ok bool) {
7171
c.mu.Lock()
7272
defer c.mu.Unlock()
7373
o, ok := c.cache.Get(key)
@@ -83,7 +83,7 @@ func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
8383
// If the specified content already exists in the cache, this sets `added` to false and returns
8484
// "already cached" content (i.e. doesn't replace the content with the new one). Client must call
8585
// `done` callback to decrease the counter when the value will no longer be used.
86-
func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, done func(), added bool) {
86+
func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, done func(bool), added bool) {
8787
c.mu.Lock()
8888
defer c.mu.Unlock()
8989
if o, ok := c.cache.Get(key); ok {
@@ -110,12 +110,18 @@ func (c *Cache) Remove(key string) {
110110
c.cache.Remove(key)
111111
}
112112

113-
func (c *Cache) decreaseOnceFunc(rc *refCounter) func() {
113+
func (c *Cache) decreaseOnceFunc(rc *refCounter) func(bool) {
114114
var once sync.Once
115-
return func() {
115+
return func(evict bool) {
116116
c.mu.Lock()
117117
defer c.mu.Unlock()
118118
once.Do(func() { rc.dec() })
119+
if evict {
120+
rc.finalize()
121+
if cached, ok := c.cache.Get(rc.key); ok && cached == rc {
122+
c.cache.Remove(rc.key)
123+
}
124+
}
119125
}
120126
}
121127

0 commit comments

Comments
 (0)