Skip to content

Commit ee35c36

Browse files
author
abushwang
committed
fix fscache not cleanup
Signed-off-by: abushwang <[email protected]>
1 parent 9ef808e commit ee35c36

File tree

9 files changed

+75
-26
lines changed

9 files changed

+75
-26
lines changed

cache/cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
214214
return &reader{
215215
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
216216
closeFunc: func() error {
217-
done()
217+
done(false)
218218
return nil
219219
},
220220
}, nil
@@ -225,7 +225,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
225225
return &reader{
226226
ReaderAt: f.(*os.File),
227227
closeFunc: func() error {
228-
done() // file will be closed when it's evicted from the cache
228+
done(false) // file will be closed when it's evicted from the cache
229229
return nil
230230
},
231231
}, nil
@@ -257,7 +257,7 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
257257
ReaderAt: file,
258258
closeFunc: func() error {
259259
_, done, added := dc.fileCache.Add(key, file)
260-
defer done() // Release it immediately. Cleaned up on eviction.
260+
defer done(false) // Release it immediately. Cleaned up on eviction.
261261
if !added {
262262
return file.Close() // file already exists in the cache. close it.
263263
}
@@ -323,7 +323,7 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
323323
dc.putBuffer(b) // already exists in the cache. abort it.
324324
}
325325
commit := func() error {
326-
defer done()
326+
defer done(false)
327327
defer w.Close()
328328
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
329329
if err != nil || n != cached.(*bytes.Buffer).Len() {

fs/fs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,9 @@ func (fs *filesystem) Unmount(ctx context.Context, mountpoint string) error {
12291229
// If the mountpoint is an id-mapped layer, it is pointing to the
12301230
// underlying layer, so we cannot call done on it.
12311231
if !isIDMappedDir(mountpoint) {
1232-
l.Done()
1232+
if err := l.Close(); err != nil {
1233+
log.G(ctx).WithError(err).Warn("failed to release resources of the layer")
1234+
}
12331235
}
12341236
fs.layerMu.Unlock()
12351237
fs.metricsController.Remove(mountpoint)

fs/fs_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,5 @@ func (l *breakableLayer) Refresh(ctx context.Context, hosts []docker.RegistryHos
106106
}
107107
return nil
108108
}
109-
func (l *breakableLayer) Done() {}
109+
func (l *breakableLayer) Done() {}
110+
func (l *breakableLayer) Close() error { return nil }

fs/layer/layer.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ type Layer interface {
104104
// Done releases the reference to this layer. The resources related to this layer will be
105105
// discarded sooner or later. Queries after calling this function won't be serviced.
106106
Done()
107+
108+
// Close is the same as Done, but evicts resources related to this layer immediately.
109+
// This is used for cleaning up resources on unmount.
110+
Close() error
107111
}
108112

109113
// Info is the current status of a layer.
@@ -247,7 +251,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
247251
return &layerRef{l, done}, nil
248252
}
249253
// Cached layer is invalid
250-
done()
254+
done(true)
251255
r.layerCacheMu.Lock()
252256
r.layerCache.Remove(name)
253257
r.layerCacheMu.Unlock()
@@ -262,7 +266,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
262266
}
263267
defer func() {
264268
if retErr != nil {
265-
blobR.done()
269+
blobR.done(true)
266270
}
267271
}()
268272

@@ -364,7 +368,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts []docker.RegistryHost,
364368
return &blobRef{blob, done}, nil
365369
}
366370
// invalid blob. discard this.
367-
done()
371+
done(true)
368372
r.blobCacheMu.Lock()
369373
r.blobCache.Remove(name)
370374
r.blobCacheMu.Unlock()
@@ -454,7 +458,12 @@ func (l *layer) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspe
454458
}
455459

456460
func (l *layerRef) Done() {
457-
l.done()
461+
l.done(false)
462+
}
463+
464+
func (l *layerRef) Close() error {
465+
l.done(true)
466+
return nil
458467
}
459468

460469
func (l *layer) RootNode(baseInode uint32, idMapper idtools.IDMap) (fusefs.InodeEmbedder, error) {
@@ -482,7 +491,7 @@ func (l *layer) close() error {
482491
if l.bgResolver != nil {
483492
l.bgResolver.Close()
484493
}
485-
defer l.blob.done() // Close reader first, then close the blob
494+
defer l.blob.done(true) // Close reader first, then close the blob
486495
return l.r.Close()
487496
}
488497

@@ -511,15 +520,15 @@ func getDisableXAttrAnnotation(desc ocispec.Descriptor) bool {
511520
// to this blob will be discarded.
512521
type blobRef struct {
513522
remote.Blob
514-
done func()
523+
done func(bool)
515524
}
516525

517526
// layerRef is a reference to the layer in the cache. Calling `Done` or `done` decreases the
518527
// reference counter of this blob in the underlying cache. When nobody refers to the layer in the
519528
// cache, resources bound to this layer will be discarded.
520529
type layerRef struct {
521530
*layer
522-
done func()
531+
done func(bool)
523532
}
524533

525534
type readerAtFunc func([]byte, int64) (int, error)

fs/reader/reader.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ func (gr *reader) Close() (retErr error) {
128128
return nil
129129
}
130130
gr.closed = true
131+
if gr.spanManager != nil {
132+
gr.spanManager.Close()
133+
}
131134
if err := gr.r.Close(); err != nil {
132135
retErr = errors.Join(retErr, err)
133136
}

fs/span-manager/span_manager.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"io"
2525
"runtime"
26+
"sync"
2627

2728
"github.com/awslabs/soci-snapshotter/cache"
2829
"github.com/awslabs/soci-snapshotter/util/ioutils"
@@ -49,6 +50,7 @@ type SpanManager struct {
4950
spans []*span
5051
ztoc *ztoc.Ztoc
5152
maxSpanVerificationFailureRetries int
53+
closeOnce sync.Once
5254
}
5355

5456
type spanInfo struct {
@@ -430,6 +432,12 @@ func (m *SpanManager) verifySpanContents(compressedData []byte, spanID compressi
430432

431433
// Close closes both the underlying zinfo data and blob cache.
432434
func (m *SpanManager) Close() {
433-
m.zinfo.Close()
434-
m.cache.Close()
435+
m.closeOnce.Do(func() {
436+
if m.zinfo != nil {
437+
m.zinfo.Close()
438+
}
439+
if m.cache != nil {
440+
m.cache.Close()
441+
}
442+
})
435443
}

snapshot/snapshot.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,26 @@ func (o *snapshotter) restoreRemoteSnapshot(ctx context.Context) error {
11171117
return err
11181118
}
11191119
for _, info := range task {
1120+
if err := func() error {
1121+
ctx, t, err := o.ms.TransactionContext(ctx, false)
1122+
if err != nil {
1123+
return err
1124+
}
1125+
defer t.Rollback()
1126+
id, _, _, err := storage.GetInfo(ctx, info.Name)
1127+
if err != nil {
1128+
return err
1129+
}
1130+
if err := os.Mkdir(filepath.Join(o.root, "snapshots", id), 0700); err != nil && !os.IsExist(err) {
1131+
return err
1132+
}
1133+
if err := os.Mkdir(o.upperPath(id), 0755); err != nil && !os.IsExist(err) {
1134+
return err
1135+
}
1136+
return nil
1137+
}(); err != nil {
1138+
return fmt.Errorf("failed to create remote snapshot directory: %s: %w", info.Name, err)
1139+
}
11201140
ns, ok := info.Labels[source.TargetNamespace]
11211141
if !ok {
11221142
return ErrNoNamespace

util/lrucache/lrucache.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func New(maxEntries int) *Cache {
6767
// Get retrieves the specified object from the cache and increments the reference counter of the
6868
// target content. Client must call `done` callback to decrease the reference count when the value
6969
// will no longer be used.
70-
func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
70+
func (c *Cache) Get(key string) (value interface{}, done func(bool), ok bool) {
7171
c.mu.Lock()
7272
defer c.mu.Unlock()
7373
o, ok := c.cache.Get(key)
@@ -83,7 +83,7 @@ func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
8383
// If the specified content already exists in the cache, this sets `added` to false and returns
8484
// "already cached" content (i.e. doesn't replace the content with the new one). Client must call
8585
// `done` callback to decrease the counter when the value will no longer be used.
86-
func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, done func(), added bool) {
86+
func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, done func(bool), added bool) {
8787
c.mu.Lock()
8888
defer c.mu.Unlock()
8989
if o, ok := c.cache.Get(key); ok {
@@ -110,12 +110,18 @@ func (c *Cache) Remove(key string) {
110110
c.cache.Remove(key)
111111
}
112112

113-
func (c *Cache) decreaseOnceFunc(rc *refCounter) func() {
113+
func (c *Cache) decreaseOnceFunc(rc *refCounter) func(bool) {
114114
var once sync.Once
115-
return func() {
115+
return func(evict bool) {
116116
c.mu.Lock()
117117
defer c.mu.Unlock()
118118
once.Do(func() { rc.dec() })
119+
if evict {
120+
rc.finalize()
121+
if cached, ok := c.cache.Get(rc.key); ok && cached == rc {
122+
c.cache.Remove(rc.key)
123+
}
124+
}
119125
}
120126
}
121127

util/lrucache/lrucache_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,13 @@ func TestRemove(t *testing.T) {
9797
return
9898
}
9999

100-
done1()
100+
done1(false)
101101
if len(evicted) != 0 {
102102
t.Errorf("no content must be evicted until all reference are discarded")
103103
return
104104
}
105105

106-
done12()
106+
done12(false)
107107
if len(evicted) != 1 {
108108
t.Errorf("content must be evicted")
109109
return
@@ -138,7 +138,7 @@ func TestEviction(t *testing.T) {
138138
return
139139
}
140140

141-
done1()
141+
done1(false)
142142
if len(evicted) != 1 {
143143
t.Errorf("1 content must be evicted")
144144
return
@@ -148,15 +148,15 @@ func TestEviction(t *testing.T) {
148148
return
149149
}
150150

151-
done2() // effective
152-
done2() // ignored
153-
done2() // ignored
151+
done2(false) // effective
152+
done2(false) // ignored
153+
done2(false) // ignored
154154
if len(evicted) != 1 {
155155
t.Errorf("only 1 content must be evicted")
156156
return
157157
}
158158

159-
done22()
159+
done22(false)
160160
if len(evicted) != 2 {
161161
t.Errorf("2 contents must be evicted")
162162
return

0 commit comments

Comments
 (0)