From 099d5fa8a32fca41bf615a4e8e8cb77b3edbe7b4 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 11:38:07 +0100 Subject: [PATCH 01/16] add localcache --- internal/hopsfsmount/Dir.go | 9 ++ internal/hopsfsmount/File.go | 35 +++++- internal/hopsfsmount/config.go | 10 ++ internal/hopsfsmount/localcache.go | 184 +++++++++++++++++++++++++++++ 4 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 internal/hopsfsmount/localcache.go diff --git a/internal/hopsfsmount/Dir.go b/internal/hopsfsmount/Dir.go index 5581d77..1b8a576 100644 --- a/internal/hopsfsmount/Dir.go +++ b/internal/hopsfsmount/Dir.go @@ -341,6 +341,10 @@ func (dir *DirINode) Remove(ctx context.Context, req *fuse.RemoveRequest) error err := dir.FileSystem.getDFSConnector().Remove(path) if err == nil { dir.removeChildInode(Remove, req.Name) + // Invalidate staging file cache for the removed path + if StagingFileCache != nil { + StagingFileCache.Remove(path) + } logger.Info("Removed path", logger.Fields{Operation: Remove, Path: path}) } else { logger.Warn("Failed to remove path", logger.Fields{Operation: Remove, Path: path, Error: err}) @@ -379,6 +383,11 @@ func (srcParent *DirINode) renameInt(operationName, oldName, newName string, dst return err } + // Invalidate staging file cache for old path (the cached content is now stale) + if StagingFileCache != nil { + StagingFileCache.Remove(oldPath) + } + // disconnect src inode if srcInode != nil { srcParent.removeChildInode(Rename, oldName) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index e1c89e1..445b3c8 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -24,9 +24,9 @@ import ( // Lock weight ordering (must be followed to avoid deadlocks): // -// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush) -// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests -// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle +// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush) +// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests +// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle // // Rule: A goroutine holding a lock can only acquire locks with LOWER weight. // Note: dataMutex and fileMutex are independent (never nested with each other). @@ -150,6 +150,15 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) { // Called with fileHandleMutex held by RemoveHandle() func (file *FileINode) closeStaging() { if file.fileProxy != nil { // if not already closed + // If caching is enabled and this is a LocalRWFileProxy, add to cache before closing + if lrwfp, ok := file.fileProxy.(*LocalRWFileProxy); ok && StagingFileCache != nil { + stat, statErr := lrwfp.localFile.Stat() + localPath := lrwfp.localFile.Name() + if statErr == nil && localPath != "" { + StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size()) + } + } + err := file.fileProxy.Close() if err != nil { logger.Error("Failed to close staging file", file.logInfo(logger.Fields{Operation: Close, Error: err})) @@ -394,12 +403,30 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o } } + // Check if we have a cached staging file for this path + if StagingFileCache != nil { + if cachedPath, ok := StagingFileCache.Get(absPath); ok { + localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) + if err == nil { + logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath})) + return localFile, nil + } + // Cache entry is invalid (file was deleted externally), remove it + logger.Warn("Cached staging file not accessible, removing from cache", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) + StagingFileCache.Remove(absPath) + } + } + stagingFile, err := ioutil.TempFile(StagingDir, "stage") if err != nil { logger.Error("Failed to create staging file", file.logInfo(logger.Fields{Operation: operation, Error: err})) return nil, err } - os.Remove(stagingFile.Name()) + // Only unlink the staging file immediately if caching is disabled. + // When caching is enabled, we keep the file on disk for potential reuse. + if StagingFileCache == nil { + os.Remove(stagingFile.Name()) + } logger.Info("Created staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()})) if existsInDFS { diff --git a/internal/hopsfsmount/config.go b/internal/hopsfsmount/config.go index 43351fe..eac3d4a 100644 --- a/internal/hopsfsmount/config.go +++ b/internal/hopsfsmount/config.go @@ -43,6 +43,7 @@ var FallBackUser = "root" var FallBackGroup = "root" var UserUmask string = "" var Umask os.FileMode +var LocalCacheMaxEntries int = 50 func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.BoolVar(&LazyMount, "lazy", false, "Allows to mount HopsFS filesystem before HopsFS is available") @@ -72,6 +73,7 @@ func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.StringVar(&FallBackUser, "fallBackUser", "root", "Local user name if the DFS user is not found on the local file system") flag.StringVar(&FallBackGroup, "fallBackGroup", "root", "Local group name if the DFS group is not found on the local file system.") flag.StringVar(&UserUmask, "umask", "", "Umask for the file system. Must be a 4 digit octal number.") + flag.IntVar(&LocalCacheMaxEntries, "localCacheSize", 50, "Max staging files to cache locally (0 to disable)") flag.Usage = usage flag.Parse() @@ -112,6 +114,14 @@ func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { } logger.Info(fmt.Sprintf("Using umask: %o", Umask), nil) + // Initialize staging file cache if enabled + if LocalCacheMaxEntries > 0 { + StagingFileCache = NewLocalCache(LocalCacheMaxEntries) + logger.Info(fmt.Sprintf("Staging file cache enabled with max %d entries", LocalCacheMaxEntries), nil) + } else { + logger.Info("Staging file cache disabled", nil) + } + logger.Info(fmt.Sprintf("Staging dir is:%s, Using TLS: %v, RetryAttempts: %d, LogFile: %s", StagingDir, Tls, retryPolicy.MaxAttempts, LogFile), nil) logger.Info(fmt.Sprintf("hopsfs-mount: current head GITCommit: %s Built time: %s Built by: %s ", GITCOMMIT, BUILDTIME, HOSTNAME), nil) } diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/localcache.go new file mode 100644 index 0000000..0ae9d40 --- /dev/null +++ b/internal/hopsfsmount/localcache.go @@ -0,0 +1,184 @@ +// Copyright (c) Hopsworks AB. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package hopsfsmount + +import ( + "container/list" + "os" + "sync" + + "hopsworks.ai/hopsfsmount/internal/hopsfsmount/logger" +) + +// LocalCache manages cached staging files using LRU eviction. +// When files are written and closed, their local staging copies are kept +// in this cache for faster reopening instead of downloading from DFS again. +type LocalCache struct { + mu sync.Mutex + maxEntries int + entries map[string]*CacheEntry + lruList *list.List // front = most recently used, back = least recently used +} + +// CacheEntry represents a cached staging file +type CacheEntry struct { + hdfsPath string + localPath string + size int64 + lruElement *list.Element +} + +// Global cache instance, initialized in config.go if caching is enabled +var StagingFileCache *LocalCache + +// NewLocalCache creates a new cache with the given maximum number of entries. +// When the cache is full, the least recently used entry is evicted. +func NewLocalCache(maxEntries int) *LocalCache { + return &LocalCache{ + maxEntries: maxEntries, + entries: make(map[string]*CacheEntry), + lruList: list.New(), + } +} + +// Get retrieves a cached file path for the given HDFS path. +// Returns the local file path and true if found, or ("", false) if not cached. +// Moves the entry to the front of the LRU list on access. +func (c *LocalCache) Get(hdfsPath string) (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, ok := c.entries[hdfsPath] + if !ok { + return "", false + } + + // Move to front of LRU list (most recently used) + c.lruList.MoveToFront(entry.lruElement) + + logger.Debug("Cache hit for staging file", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + }) + + return entry.localPath, true +} + +// Put adds a staging file to the cache. If the cache is full, the least +// recently used entry is evicted first. If an entry already exists for +// this path, it is updated and moved to the front of the LRU list. +func (c *LocalCache) Put(hdfsPath string, localPath string, size int64) { + c.mu.Lock() + defer c.mu.Unlock() + + // Check if entry already exists + if existing, ok := c.entries[hdfsPath]; ok { + // Update existing entry + existing.localPath = localPath + existing.size = size + c.lruList.MoveToFront(existing.lruElement) + logger.Debug("Updated existing cache entry", logger.Fields{ + Path: hdfsPath, + TmpFile: localPath, + FileSize: size, + }) + return + } + + // Evict oldest entries if cache is full + for len(c.entries) >= c.maxEntries { + c.evictOldest() + } + + // Create new entry + entry := &CacheEntry{ + hdfsPath: hdfsPath, + localPath: localPath, + size: size, + } + entry.lruElement = c.lruList.PushFront(entry) + c.entries[hdfsPath] = entry + + logger.Info("Added staging file to cache", logger.Fields{ + Path: hdfsPath, + TmpFile: localPath, + FileSize: size, + Entries: len(c.entries), + }) +} + +// Remove explicitly removes an entry from the cache. +// This should be called when a file is deleted or renamed in DFS. +func (c *LocalCache) Remove(hdfsPath string) { + c.mu.Lock() + defer c.mu.Unlock() + + c.removeEntry(hdfsPath) +} + +// removeEntry removes an entry without locking (internal use only) +func (c *LocalCache) removeEntry(hdfsPath string) { + entry, ok := c.entries[hdfsPath] + if !ok { + return + } + + // Remove from LRU list + c.lruList.Remove(entry.lruElement) + + // Delete local file + if err := os.Remove(entry.localPath); err != nil && !os.IsNotExist(err) { + logger.Warn("Failed to remove cached staging file", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + Error: err, + }) + } + + // Remove from map + delete(c.entries, hdfsPath) + + logger.Debug("Removed staging file from cache", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + }) +} + +// evictOldest removes the least recently used entry from the cache. +// Must be called with mutex held. +func (c *LocalCache) evictOldest() { + oldest := c.lruList.Back() + if oldest == nil { + return + } + + entry := oldest.Value.(*CacheEntry) + logger.Info("Evicting oldest cache entry", logger.Fields{ + Path: entry.hdfsPath, + TmpFile: entry.localPath, + FileSize: entry.size, + }) + + c.removeEntry(entry.hdfsPath) +} + +// Clear removes all entries from the cache. +// This should be called during shutdown. +func (c *LocalCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + for hdfsPath := range c.entries { + c.removeEntry(hdfsPath) + } + + logger.Info("Cleared staging file cache", nil) +} + +// Size returns the current number of entries in the cache. +func (c *LocalCache) Size() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.entries) +} From f274a0563395b36c06f12b3ac148afff6ff511f0 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 11:56:45 +0100 Subject: [PATCH 02/16] add metadata check with upstream --- internal/hopsfsmount/File.go | 31 ++++++++++++++++-------------- internal/hopsfsmount/localcache.go | 29 ++++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 445b3c8..74de16d 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -155,7 +155,9 @@ func (file *FileINode) closeStaging() { stat, statErr := lrwfp.localFile.Stat() localPath := lrwfp.localFile.Name() if statErr == nil && localPath != "" { - StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size()) + // Store with current mtime from Attrs (set after successful flush to DFS) + // This allows us to detect if the file was modified by another client + StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size(), file.Attrs.Mtime) } } @@ -395,25 +397,26 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o logger.Info("Created an empty file in DFS", file.logInfo(logger.Fields{Operation: operation})) w.Close() } else { - // Request to write to existing file - _, err := hdfsAccessor.Stat(absPath) + // Request to write to existing file - stat to verify it exists and get metadata + upstreamInfo, err := hdfsAccessor.Stat(absPath) if err != nil { logger.Error("Failed to stat file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err})) return nil, syscall.ENOENT } - } - // Check if we have a cached staging file for this path - if StagingFileCache != nil { - if cachedPath, ok := StagingFileCache.Get(absPath); ok { - localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) - if err == nil { - logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath})) - return localFile, nil + // Check if we have a valid cached staging file for this path + if StagingFileCache != nil { + if cachedPath, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + // Cache is valid (metadata matches upstream), try to reuse it + localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) + if err == nil { + logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath})) + return localFile, nil + } + // Local file was deleted externally, remove from cache + logger.Warn("Cached staging file not accessible, removing from cache", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) + StagingFileCache.Remove(absPath) } - // Cache entry is invalid (file was deleted externally), remove it - logger.Warn("Cached staging file not accessible, removing from cache", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) - StagingFileCache.Remove(absPath) } } diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/localcache.go index 0ae9d40..ef7424e 100644 --- a/internal/hopsfsmount/localcache.go +++ b/internal/hopsfsmount/localcache.go @@ -7,6 +7,7 @@ import ( "container/list" "os" "sync" + "time" "hopsworks.ai/hopsfsmount/internal/hopsfsmount/logger" ) @@ -26,6 +27,7 @@ type CacheEntry struct { hdfsPath string localPath string size int64 + mtime time.Time // modification time when cached, used to detect upstream changes lruElement *list.Element } @@ -43,9 +45,12 @@ func NewLocalCache(maxEntries int) *LocalCache { } // Get retrieves a cached file path for the given HDFS path. -// Returns the local file path and true if found, or ("", false) if not cached. -// Moves the entry to the front of the LRU list on access. -func (c *LocalCache) Get(hdfsPath string) (string, bool) { +// The upstreamSize and upstreamMtime parameters are the current metadata from HopsFS, +// used to validate that the cached file hasn't been modified by another client. +// Returns the local file path and true if found and valid, or ("", false) if not cached or stale. +// If the cache entry is stale (metadata mismatch), it is automatically removed. +// Moves the entry to the front of the LRU list on successful access. +func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time.Time) (string, bool) { c.mu.Lock() defer c.mu.Unlock() @@ -54,6 +59,18 @@ func (c *LocalCache) Get(hdfsPath string) (string, bool) { return "", false } + // Validate cache entry against upstream metadata + // If size or mtime differs, the file was modified by another client + if entry.size != upstreamSize || !entry.mtime.Equal(upstreamMtime) { + logger.Info("Cached staging file is stale (upstream modified), invalidating", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + FileSize: upstreamSize, + }) + c.removeEntry(hdfsPath) + return "", false + } + // Move to front of LRU list (most recently used) c.lruList.MoveToFront(entry.lruElement) @@ -68,7 +85,9 @@ func (c *LocalCache) Get(hdfsPath string) (string, bool) { // Put adds a staging file to the cache. If the cache is full, the least // recently used entry is evicted first. If an entry already exists for // this path, it is updated and moved to the front of the LRU list. -func (c *LocalCache) Put(hdfsPath string, localPath string, size int64) { +// The mtime parameter should be the modification time from HopsFS, used +// to detect if the file was modified by another client. +func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime time.Time) { c.mu.Lock() defer c.mu.Unlock() @@ -77,6 +96,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64) { // Update existing entry existing.localPath = localPath existing.size = size + existing.mtime = mtime c.lruList.MoveToFront(existing.lruElement) logger.Debug("Updated existing cache entry", logger.Fields{ Path: hdfsPath, @@ -96,6 +116,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64) { hdfsPath: hdfsPath, localPath: localPath, size: size, + mtime: mtime, } entry.lruElement = c.lruList.PushFront(entry) c.entries[hdfsPath] = entry From ac75fff7c367765cf36990713d141d66f187ae56 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 12:25:36 +0100 Subject: [PATCH 03/16] use local cache when opening a file --- internal/hopsfsmount/File.go | 42 +++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 74de16d..f027092 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -493,14 +493,40 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F fh.File.fileProxy = file.fileProxy logger.Info("Opened file, Returning existing handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } else { - // we alway open the file in RO mode. when the client writes to the file - // then we upgrade the handle. However, if the file is already opened in - // in RW state then we use the existing RW handle - reader, err := file.FileSystem.getDFSConnector().OpenRead(file.AbsolutePath()) - if err != nil { - logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) - return nil, err - } else { + // Check if we have a valid cached staging file for this path + // This avoids reading from HopsFS if we have a local copy + absPath := file.AbsolutePath() + usedCache := false + + if StagingFileCache != nil { + // Stat the file to get current upstream metadata for cache validation + hdfsAccessor := file.FileSystem.getDFSConnector() + upstreamInfo, err := hdfsAccessor.Stat(absPath) + if err == nil { + if cachedPath, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + // Try to open the cached file + localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) + if err == nil { + fh.File.fileProxy = &LocalRWFileProxy{localFile: localFile, file: file} + logger.Info("Opened file using cached staging file", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, TmpFile: cachedPath})) + usedCache = true + } else { + // Local file was deleted externally, remove from cache + logger.Warn("Cached staging file not accessible, removing from cache", fh.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) + StagingFileCache.Remove(absPath) + } + } + } + } + + if !usedCache { + // No valid cache, open from HopsFS in RO mode + // When the client writes to the file, we upgrade the handle + reader, err := file.FileSystem.getDFSConnector().OpenRead(absPath) + if err != nil { + logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) + return nil, err + } fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } From ecb591e53b46706c17427c8c2daed267feb6a5e7 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 12:34:33 +0100 Subject: [PATCH 04/16] change log level for cache hit --- internal/hopsfsmount/localcache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/localcache.go index ef7424e..ecedb5c 100644 --- a/internal/hopsfsmount/localcache.go +++ b/internal/hopsfsmount/localcache.go @@ -74,7 +74,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time // Move to front of LRU list (most recently used) c.lruList.MoveToFront(entry.lruElement) - logger.Debug("Cache hit for staging file", logger.Fields{ + logger.Info("Cache hit for staging file", logger.Fields{ Path: hdfsPath, TmpFile: entry.localPath, }) From b57138f73ca75184d1aaacd4a0082e92a9e8e255 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:07:46 +0100 Subject: [PATCH 05/16] update cache when reading files --- internal/hopsfsmount/File.go | 64 ++++++++++++++++++++++++++---- internal/hopsfsmount/localcache.go | 9 +++-- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index f027092..5f40518 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -461,6 +461,34 @@ func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) return nil } +// createStagingFileForRead creates a staging file and downloads content from DFS for read caching. +// This is used when opening an existing file for reading and caching is enabled. +// Unlike createStagingFile, this doesn't create or stat the file in DFS (caller already did that). +func (file *FileINode) createStagingFileForRead(operation string) (*os.File, error) { + stagingFile, err := ioutil.TempFile(StagingDir, "stage") + if err != nil { + logger.Error("Failed to create staging file for read", file.logInfo(logger.Fields{Operation: operation, Error: err})) + return nil, err + } + + logger.Info("Created staging file for read caching", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()})) + + if err := file.downloadToStaging(stagingFile, operation); err != nil { + stagingFile.Close() + os.Remove(stagingFile.Name()) + return nil, err + } + + // Seek back to start for reading + if _, err := stagingFile.Seek(0, 0); err != nil { + stagingFile.Close() + os.Remove(stagingFile.Name()) + return nil, err + } + + return stagingFile, nil +} + // Creates new file handle // Lock order: fileHandleMutex (1) alone // Called from Open which holds fileMutex (2), so order is: fileMutex (2) → fileHandleMutex (1) @@ -520,15 +548,35 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F } if !usedCache { - // No valid cache, open from HopsFS in RO mode - // When the client writes to the file, we upgrade the handle - reader, err := file.FileSystem.getDFSConnector().OpenRead(absPath) - if err != nil { - logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) - return nil, err + // No valid cache - decide whether to download to local cache or stream from HopsFS + if StagingFileCache != nil { + // Caching enabled: download to staging file for future reuse + if err := file.checkDiskSpace(); err != nil { + // Not enough disk space, fall back to remote streaming + logger.Warn("Not enough disk space for caching, using remote read", fh.logInfo(logger.Fields{Operation: operation, Error: err})) + } else { + stagingFile, err := file.createStagingFileForRead(operation) + if err == nil { + fh.File.fileProxy = &LocalRWFileProxy{localFile: stagingFile, file: file} + logger.Info("Opened file, downloaded to cache", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, TmpFile: stagingFile.Name()})) + usedCache = true + } else { + logger.Warn("Failed to create staging file for read cache, using remote read", fh.logInfo(logger.Fields{Operation: operation, Error: err})) + } + } + } + + if !usedCache { + // Caching disabled or failed, open from HopsFS in RO mode + // When the client writes to the file, we upgrade the handle + reader, err := file.FileSystem.getDFSConnector().OpenRead(absPath) + if err != nil { + logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) + return nil, err + } + fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} + logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } - fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} - logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } } } diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/localcache.go index ecedb5c..2bffc8d 100644 --- a/internal/hopsfsmount/localcache.go +++ b/internal/hopsfsmount/localcache.go @@ -63,9 +63,12 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time // If size or mtime differs, the file was modified by another client if entry.size != upstreamSize || !entry.mtime.Equal(upstreamMtime) { logger.Info("Cached staging file is stale (upstream modified), invalidating", logger.Fields{ - Path: hdfsPath, - TmpFile: entry.localPath, - FileSize: upstreamSize, + Path: hdfsPath, + TmpFile: entry.localPath, + "cached_size": entry.size, + "cached_mtime": entry.mtime, + "upstream_size": upstreamSize, + "upstream_mtime": upstreamMtime, }) c.removeEntry(hdfsPath) return "", false From ae84316f9426fd5dd661993b3bf2a608fb232721 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:12:51 +0100 Subject: [PATCH 06/16] more logs --- internal/hopsfsmount/localcache.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/localcache.go index 2bffc8d..6e97fea 100644 --- a/internal/hopsfsmount/localcache.go +++ b/internal/hopsfsmount/localcache.go @@ -5,6 +5,7 @@ package hopsfsmount import ( "container/list" + "fmt" "os" "sync" "time" @@ -56,19 +57,19 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time entry, ok := c.entries[hdfsPath] if !ok { + logger.Info("Cache miss for staging file", logger.Fields{ + Path: hdfsPath, + }) return "", false } // Validate cache entry against upstream metadata // If size or mtime differs, the file was modified by another client if entry.size != upstreamSize || !entry.mtime.Equal(upstreamMtime) { - logger.Info("Cached staging file is stale (upstream modified), invalidating", logger.Fields{ - Path: hdfsPath, - TmpFile: entry.localPath, - "cached_size": entry.size, - "cached_mtime": entry.mtime, - "upstream_size": upstreamSize, - "upstream_mtime": upstreamMtime, + logger.Info(fmt.Sprintf("Cached staging file is stale, invalidating. cached[size=%d, mtime=%v] upstream[size=%d, mtime=%v]", + entry.size, entry.mtime, upstreamSize, upstreamMtime), logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, }) c.removeEntry(hdfsPath) return "", false @@ -101,7 +102,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime ti existing.size = size existing.mtime = mtime c.lruList.MoveToFront(existing.lruElement) - logger.Debug("Updated existing cache entry", logger.Fields{ + logger.Info("Updated existing cache entry", logger.Fields{ Path: hdfsPath, TmpFile: localPath, FileSize: size, @@ -163,7 +164,7 @@ func (c *LocalCache) removeEntry(hdfsPath string) { // Remove from map delete(c.entries, hdfsPath) - logger.Debug("Removed staging file from cache", logger.Fields{ + logger.Info("Removed staging file from cache", logger.Fields{ Path: hdfsPath, TmpFile: entry.localPath, }) From 8a5f3b79a783706d4ce868ad7703c400899a2b59 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:19:06 +0100 Subject: [PATCH 07/16] fix bugs --- internal/hopsfsmount/File.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 5f40518..f152c60 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -525,12 +525,20 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F // This avoids reading from HopsFS if we have a local copy absPath := file.AbsolutePath() usedCache := false + statSucceeded := false if StagingFileCache != nil { // Stat the file to get current upstream metadata for cache validation hdfsAccessor := file.FileSystem.getDFSConnector() upstreamInfo, err := hdfsAccessor.Stat(absPath) - if err == nil { + if err != nil { + logger.Warn("Failed to stat file for cache validation, skipping cache", fh.logInfo(logger.Fields{Operation: operation, Error: err})) + } else { + statSucceeded = true + // Update file.Attrs with upstream metadata so closeStaging can use correct mtime for caching + file.Attrs.Size = upstreamInfo.Size + file.Attrs.Mtime = upstreamInfo.Mtime + if cachedPath, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { // Try to open the cached file localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) @@ -549,7 +557,8 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F if !usedCache { // No valid cache - decide whether to download to local cache or stream from HopsFS - if StagingFileCache != nil { + // Only attempt caching if stat succeeded (we have valid metadata for cache validation later) + if StagingFileCache != nil && statSucceeded { // Caching enabled: download to staging file for future reuse if err := file.checkDiskSpace(); err != nil { // Not enough disk space, fall back to remote streaming From cb53bba33ab06c830247d7b24ac3dd1ecafd815a Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:25:07 +0100 Subject: [PATCH 08/16] add debugging logs --- internal/hopsfsmount/File.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index f152c60..b4363e6 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -527,6 +527,8 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F usedCache := false statSucceeded := false + logger.Info(fmt.Sprintf("Checking cache for file, StagingFileCache=%v", StagingFileCache != nil), fh.logInfo(logger.Fields{Operation: operation})) + if StagingFileCache != nil { // Stat the file to get current upstream metadata for cache validation hdfsAccessor := file.FileSystem.getDFSConnector() From 292619d2edcbc52399fd5f31f579043880dfc161 Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:40:27 +0100 Subject: [PATCH 09/16] fix bug with direct write --- internal/hopsfsmount/File.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index b4363e6..418d9e5 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -287,6 +287,17 @@ func (file *FileINode) flushAttempt(operation string) error { file.logInfo(logger.Fields{Operation: operation, Bytes: written})) file.Attrs.Size = written + + // Stat the file to get the server-assigned mtime after upload + // This is needed for cache validation on subsequent reads + upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) + if err != nil { + logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err})) + } else { + file.Attrs.Mtime = upstreamInfo.Mtime + file.Attrs.Size = upstreamInfo.Size + } + return nil } From 428787ede7eb007f3ace99794858a9f7b0648d6d Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 3 Dec 2025 13:58:14 +0100 Subject: [PATCH 10/16] fix bug with direct write --- internal/hopsfsmount/File.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 418d9e5..7c86b55 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -84,7 +84,8 @@ func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error { } // update the local cache (fileMutex already held) file.Attrs.Size = uint64(fileInfo.Size()) - file.Attrs.Mtime = fileInfo.ModTime() + // Truncate to second precision to match HopsFS precision for cache consistency + file.Attrs.Mtime = fileInfo.ModTime().Truncate(time.Second) return file.Attrs.ConvertAttrToFuse(a) } file.unlockFileHandles() @@ -293,6 +294,8 @@ func (file *FileINode) flushAttempt(operation string) error { upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) if err != nil { logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err})) + // Truncate to second precision to match HopsFS precision + file.Attrs.Mtime = file.Attrs.Mtime.Truncate(time.Second) } else { file.Attrs.Mtime = upstreamInfo.Mtime file.Attrs.Size = upstreamInfo.Size From 1df975fd877b53fa9e93cd6f4f7a3790b2da9ec0 Mon Sep 17 00:00:00 2001 From: Hamid Date: Mon, 8 Dec 2025 10:50:51 +0100 Subject: [PATCH 11/16] fix some bugs --- internal/hopsfsmount/Dir.go | 4 +- internal/hopsfsmount/File.go | 3 +- .../{localcache.go => LocalCache.go} | 41 ++++++++++++++++++- 3 files changed, 42 insertions(+), 6 deletions(-) rename internal/hopsfsmount/{localcache.go => LocalCache.go} (83%) diff --git a/internal/hopsfsmount/Dir.go b/internal/hopsfsmount/Dir.go index 1b8a576..a40aa30 100644 --- a/internal/hopsfsmount/Dir.go +++ b/internal/hopsfsmount/Dir.go @@ -383,9 +383,9 @@ func (srcParent *DirINode) renameInt(operationName, oldName, newName string, dst return err } - // Invalidate staging file cache for old path (the cached content is now stale) + // Transfer staging file cache entry from old path to new path if it exists if StagingFileCache != nil { - StagingFileCache.Remove(oldPath) + StagingFileCache.Rename(oldPath, newPath) } // disconnect src inode diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 7c86b55..e2b9ab2 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -84,8 +84,7 @@ func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error { } // update the local cache (fileMutex already held) file.Attrs.Size = uint64(fileInfo.Size()) - // Truncate to second precision to match HopsFS precision for cache consistency - file.Attrs.Mtime = fileInfo.ModTime().Truncate(time.Second) + file.Attrs.Mtime = fileInfo.ModTime() return file.Attrs.ConvertAttrToFuse(a) } file.unlockFileHandles() diff --git a/internal/hopsfsmount/localcache.go b/internal/hopsfsmount/LocalCache.go similarity index 83% rename from internal/hopsfsmount/localcache.go rename to internal/hopsfsmount/LocalCache.go index 6e97fea..a4cb33e 100644 --- a/internal/hopsfsmount/localcache.go +++ b/internal/hopsfsmount/LocalCache.go @@ -32,7 +32,7 @@ type CacheEntry struct { lruElement *list.Element } -// Global cache instance, initialized in config.go if caching is enabled +// StagingFileCache Local cache instance, initialized in config.go if caching is enabled var StagingFileCache *LocalCache // NewLocalCache creates a new cache with the given maximum number of entries. @@ -134,7 +134,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime ti } // Remove explicitly removes an entry from the cache. -// This should be called when a file is deleted or renamed in DFS. +// This should be called when a file is deleted in DFS. func (c *LocalCache) Remove(hdfsPath string) { c.mu.Lock() defer c.mu.Unlock() @@ -142,6 +142,43 @@ func (c *LocalCache) Remove(hdfsPath string) { c.removeEntry(hdfsPath) } +// Rename transfers a cache entry from oldPath to newPath. +// If the entry doesn't exist for oldPath, this is a no-op. +// If an entry already exists for newPath, it is replaced. +func (c *LocalCache) Rename(oldPath, newPath string) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, ok := c.entries[oldPath] + if !ok { + // No cache entry for old path, nothing to transfer + logger.Info("Cache rename: no entry for old path", logger.Fields{ + From: oldPath, + To: newPath, + }) + return + } + + // Remove any existing entry at the new path + if _, exists := c.entries[newPath]; exists { + c.removeEntry(newPath) + } + + // Update the entry's hdfsPath and move to new key + delete(c.entries, oldPath) + entry.hdfsPath = newPath + c.entries[newPath] = entry + + // Move to front of LRU (most recently used) + c.lruList.MoveToFront(entry.lruElement) + + logger.Info("Cache entry renamed", logger.Fields{ + From: oldPath, + To: newPath, + TmpFile: entry.localPath, + }) +} + // removeEntry removes an entry without locking (internal use only) func (c *LocalCache) removeEntry(hdfsPath string) { entry, ok := c.entries[hdfsPath] From 079c40e9c1e43a9f9eb80746c8cc34c1c12d308b Mon Sep 17 00:00:00 2001 From: Hamid Date: Mon, 8 Dec 2025 12:02:41 +0100 Subject: [PATCH 12/16] bug fix --- internal/hopsfsmount/File.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index e2b9ab2..0895c6d 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -289,15 +289,15 @@ func (file *FileINode) flushAttempt(operation string) error { file.Attrs.Size = written // Stat the file to get the server-assigned mtime after upload - // This is needed for cache validation on subsequent reads - upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) - if err != nil { - logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err})) - // Truncate to second precision to match HopsFS precision - file.Attrs.Mtime = file.Attrs.Mtime.Truncate(time.Second) - } else { - file.Attrs.Mtime = upstreamInfo.Mtime - file.Attrs.Size = upstreamInfo.Size + // This is only needed for cache validation on subsequent reads + if StagingFileCache != nil { + upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) + if err != nil { + logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err})) + } else { + file.Attrs.Mtime = upstreamInfo.Mtime + file.Attrs.Size = upstreamInfo.Size + } } return nil @@ -438,7 +438,6 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o logger.Error("Failed to create staging file", file.logInfo(logger.Fields{Operation: operation, Error: err})) return nil, err } - // Only unlink the staging file immediately if caching is disabled. // When caching is enabled, we keep the file on disk for potential reuse. if StagingFileCache == nil { os.Remove(stagingFile.Name()) From 3ba146d29925b02540c9f54ac4f392ebc8c2abb6 Mon Sep 17 00:00:00 2001 From: Hamid Date: Mon, 8 Dec 2025 16:19:49 +0100 Subject: [PATCH 13/16] more changes --- internal/hopsfsmount/File.go | 126 +++++++++-------------------- internal/hopsfsmount/LocalCache.go | 71 ++++++++++++++-- 2 files changed, 101 insertions(+), 96 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 0895c6d..712443e 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -419,16 +419,9 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o // Check if we have a valid cached staging file for this path if StagingFileCache != nil { - if cachedPath, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { - // Cache is valid (metadata matches upstream), try to reuse it - localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) - if err == nil { - logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath})) - return localFile, nil - } - // Local file was deleted externally, remove from cache - logger.Warn("Cached staging file not accessible, removing from cache", file.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) - StagingFileCache.Remove(absPath) + if localFile, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation})) + return localFile, nil } } } @@ -445,60 +438,59 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o logger.Info("Created staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()})) if existsInDFS { - if err := file.downloadToStaging(stagingFile, operation); err != nil { + if err := file.downloadToStaging(stagingFile); err != nil { return nil, err } } return stagingFile, nil } -func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) error { +func (file *FileINode) downloadToStaging(stagingFile *os.File) error { hdfsAccessor := file.FileSystem.getDFSConnector() absPath := file.AbsolutePath() reader, err := hdfsAccessor.OpenRead(absPath) if err != nil { - logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err})) - // TODO remove the staging file if there are no more active handles + logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Error: err})) return err } nc, err := io.Copy(stagingFile, reader) if err != nil { - logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Operation: operation, Error: err})) + logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Error: err})) return err } reader.Close() - logger.Info(fmt.Sprintf("Downloaded a copy to stating dir. %d bytes copied", nc), file.logInfo(logger.Fields{Operation: operation})) + logger.Info(fmt.Sprintf("Downloaded a copy to staging dir. %d bytes copied", nc), file.logInfo(nil)) return nil } -// createStagingFileForRead creates a staging file and downloads content from DFS for read caching. +// createStagingFileForRead creates a staging file, downloads content from DFS, and returns a FileProxy. // This is used when opening an existing file for reading and caching is enabled. -// Unlike createStagingFile, this doesn't create or stat the file in DFS (caller already did that). -func (file *FileINode) createStagingFileForRead(operation string) (*os.File, error) { - stagingFile, err := ioutil.TempFile(StagingDir, "stage") +// Returns a LocalRWFileProxy if successful, or nil if failed. +func (file *FileINode) createStagingFileForRead() *LocalRWFileProxy { + stagingFile, err := os.CreateTemp(StagingDir, "stage") if err != nil { - logger.Error("Failed to create staging file for read", file.logInfo(logger.Fields{Operation: operation, Error: err})) - return nil, err + logger.Error("Failed to create staging file for read", file.logInfo(logger.Fields{Error: err})) + return nil } - logger.Info("Created staging file for read caching", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()})) - - if err := file.downloadToStaging(stagingFile, operation); err != nil { + if err := file.downloadToStaging(stagingFile); err != nil { stagingFile.Close() os.Remove(stagingFile.Name()) - return nil, err + return nil } // Seek back to start for reading if _, err := stagingFile.Seek(0, 0); err != nil { stagingFile.Close() os.Remove(stagingFile.Name()) - return nil, err + return nil } - return stagingFile, nil + logger.Info("Downloaded file to cache", file.logInfo(logger.Fields{TmpFile: stagingFile.Name()})) + //TODO: Should it be in RO mode? + return &LocalRWFileProxy{localFile: stagingFile, file: file} } // Creates new file handle @@ -533,73 +525,29 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F fh.File.fileProxy = file.fileProxy logger.Info("Opened file, Returning existing handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } else { - // Check if we have a valid cached staging file for this path - // This avoids reading from HopsFS if we have a local copy absPath := file.AbsolutePath() - usedCache := false - statSucceeded := false - - logger.Info(fmt.Sprintf("Checking cache for file, StagingFileCache=%v", StagingFileCache != nil), fh.logInfo(logger.Fields{Operation: operation})) + hdfsAccessor := file.FileSystem.getDFSConnector() + // Try to get from cache or download to cache if StagingFileCache != nil { - // Stat the file to get current upstream metadata for cache validation - hdfsAccessor := file.FileSystem.getDFSConnector() - upstreamInfo, err := hdfsAccessor.Stat(absPath) - if err != nil { - logger.Warn("Failed to stat file for cache validation, skipping cache", fh.logInfo(logger.Fields{Operation: operation, Error: err})) - } else { - statSucceeded = true - // Update file.Attrs with upstream metadata so closeStaging can use correct mtime for caching - file.Attrs.Size = upstreamInfo.Size - file.Attrs.Mtime = upstreamInfo.Mtime - - if cachedPath, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { - // Try to open the cached file - localFile, err := os.OpenFile(cachedPath, os.O_RDWR, 0600) - if err == nil { - fh.File.fileProxy = &LocalRWFileProxy{localFile: localFile, file: file} - logger.Info("Opened file using cached staging file", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, TmpFile: cachedPath})) - usedCache = true - } else { - // Local file was deleted externally, remove from cache - logger.Warn("Cached staging file not accessible, removing from cache", fh.logInfo(logger.Fields{Operation: operation, TmpFile: cachedPath, Error: err})) - StagingFileCache.Remove(absPath) - } - } - } + fh.File.fileProxy = StagingFileCache.GetOrLoad(file, hdfsAccessor) } - if !usedCache { - // No valid cache - decide whether to download to local cache or stream from HopsFS - // Only attempt caching if stat succeeded (we have valid metadata for cache validation later) - if StagingFileCache != nil && statSucceeded { - // Caching enabled: download to staging file for future reuse - if err := file.checkDiskSpace(); err != nil { - // Not enough disk space, fall back to remote streaming - logger.Warn("Not enough disk space for caching, using remote read", fh.logInfo(logger.Fields{Operation: operation, Error: err})) - } else { - stagingFile, err := file.createStagingFileForRead(operation) - if err == nil { - fh.File.fileProxy = &LocalRWFileProxy{localFile: stagingFile, file: file} - logger.Info("Opened file, downloaded to cache", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, TmpFile: stagingFile.Name()})) - usedCache = true - } else { - logger.Warn("Failed to create staging file for read cache, using remote read", fh.logInfo(logger.Fields{Operation: operation, Error: err})) - } - } - } - - if !usedCache { - // Caching disabled or failed, open from HopsFS in RO mode - // When the client writes to the file, we upgrade the handle - reader, err := file.FileSystem.getDFSConnector().OpenRead(absPath) - if err != nil { - logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) - return nil, err - } - fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} - logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) + // If no local file proxy was set, open from HopsFS in RO mode + if fh.File.fileProxy == nil { + reader, err := hdfsAccessor.OpenRead(absPath) + if err != nil { + logger.Warn( + "Opening file failed", + fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err}), + ) + return nil, err } + fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} + logger.Info( + "Opened file, RO handle", + fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags}), + ) } } } diff --git a/internal/hopsfsmount/LocalCache.go b/internal/hopsfsmount/LocalCache.go index a4cb33e..0b02743 100644 --- a/internal/hopsfsmount/LocalCache.go +++ b/internal/hopsfsmount/LocalCache.go @@ -45,13 +45,13 @@ func NewLocalCache(maxEntries int) *LocalCache { } } -// Get retrieves a cached file path for the given HDFS path. +// Get retrieves a cached file for the given HDFS path. // The upstreamSize and upstreamMtime parameters are the current metadata from HopsFS, // used to validate that the cached file hasn't been modified by another client. -// Returns the local file path and true if found and valid, or ("", false) if not cached or stale. -// If the cache entry is stale (metadata mismatch), it is automatically removed. +// Returns an open file handle and true if found and valid, or (nil, false) if not cached or stale. +// If the cache entry is stale (metadata mismatch) or file can't be opened, it is automatically removed. // Moves the entry to the front of the LRU list on successful access. -func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time.Time) (string, bool) { +func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time.Time) (*os.File, bool) { c.mu.Lock() defer c.mu.Unlock() @@ -60,7 +60,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time logger.Info("Cache miss for staging file", logger.Fields{ Path: hdfsPath, }) - return "", false + return nil, false } // Validate cache entry against upstream metadata @@ -72,7 +72,19 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time TmpFile: entry.localPath, }) c.removeEntry(hdfsPath) - return "", false + return nil, false + } + + // Try to open the cached file + localFile, err := os.OpenFile(entry.localPath, os.O_RDWR, 0600) + if err != nil { + logger.Warn("Failed to open cached staging file, removing from cache", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + Error: err, + }) + c.removeEntry(hdfsPath) + return nil, false } // Move to front of LRU list (most recently used) @@ -83,7 +95,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time TmpFile: entry.localPath, }) - return entry.localPath, true + return localFile, true } // Put adds a staging file to the cache. If the cache is full, the least @@ -244,3 +256,48 @@ func (c *LocalCache) Size() int { defer c.mu.Unlock() return len(c.entries) } + +// ShouldCache returns true if a file should be downloaded to the local cache. +// Returns true if caching should proceed, false otherwise. +// TODO: We need to consider more parameters like file size, etc. here! +func (c *LocalCache) ShouldCache(file *FileINode) bool { + + if err := file.checkDiskSpace(); err != nil { + logger.Warn("Not enough disk space for caching", logger.Fields{ + Path: file.AbsolutePath(), + Error: err, + }) + return false + } + return true +} + +// GetOrLoad tries to get a file from cache, or downloads it to cache if not found. +// Returns a FileProxy if successful (either from cache or freshly downloaded), or nil if caching is not possible. +func (c *LocalCache) GetOrLoad(file *FileINode, hdfsAccessor HdfsAccessor) FileProxy { + absPath := file.AbsolutePath() + + upstreamInfo, err := hdfsAccessor.Stat(absPath) + if err != nil { + logger.Warn("Failed to stat file for cache validation, skipping cache", logger.Fields{ + Path: absPath, + Error: err, + }) + return nil + } + + // Update file.Attrs with upstream metadata so closeStaging can use correct mtime for caching + file.Attrs.Size = upstreamInfo.Size + file.Attrs.Mtime = upstreamInfo.Mtime + + if cachedFile, ok := c.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + return &LocalRWFileProxy{localFile: cachedFile, file: file} + } + + if !c.ShouldCache(file) { + return nil + } + + // Download to staging file + return file.createStagingFileForRead() +} From 4c03daddda58b5cedacb9de8dacfe557f41732ec Mon Sep 17 00:00:00 2001 From: Hamid Date: Mon, 8 Dec 2025 16:38:52 +0100 Subject: [PATCH 14/16] add localCacheMaxFileSize to avoid downloading big files. --- internal/hopsfsmount/LocalCache.go | 11 +++++++++-- internal/hopsfsmount/config.go | 2 ++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/hopsfsmount/LocalCache.go b/internal/hopsfsmount/LocalCache.go index 0b02743..b76a113 100644 --- a/internal/hopsfsmount/LocalCache.go +++ b/internal/hopsfsmount/LocalCache.go @@ -258,9 +258,16 @@ func (c *LocalCache) Size() int { } // ShouldCache returns true if a file should be downloaded to the local cache. -// Returns true if caching should proceed, false otherwise. -// TODO: We need to consider more parameters like file size, etc. here! +// Checks disk space and file size limits. func (c *LocalCache) ShouldCache(file *FileINode) bool { + // Check if file exceeds max cacheable size + if LocalCacheMaxFileSize > 0 && int64(file.Attrs.Size) > LocalCacheMaxFileSize { + logger.Info("File too large for caching", logger.Fields{ + Path: file.AbsolutePath(), + FileSize: file.Attrs.Size, + }) + return false + } if err := file.checkDiskSpace(); err != nil { logger.Warn("Not enough disk space for caching", logger.Fields{ diff --git a/internal/hopsfsmount/config.go b/internal/hopsfsmount/config.go index eac3d4a..6b987b5 100644 --- a/internal/hopsfsmount/config.go +++ b/internal/hopsfsmount/config.go @@ -44,6 +44,7 @@ var FallBackGroup = "root" var UserUmask string = "" var Umask os.FileMode var LocalCacheMaxEntries int = 50 +var LocalCacheMaxFileSize int64 = 10 * 1024 * 1024 // 10 MB default func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.BoolVar(&LazyMount, "lazy", false, "Allows to mount HopsFS filesystem before HopsFS is available") @@ -74,6 +75,7 @@ func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.StringVar(&FallBackGroup, "fallBackGroup", "root", "Local group name if the DFS group is not found on the local file system.") flag.StringVar(&UserUmask, "umask", "", "Umask for the file system. Must be a 4 digit octal number.") flag.IntVar(&LocalCacheMaxEntries, "localCacheSize", 50, "Max staging files to cache locally (0 to disable)") + flag.Int64Var(&LocalCacheMaxFileSize, "localCacheMaxFileSize", 10*1024*1024, "Max file size in bytes to cache locally (default: 10MB)") flag.Usage = usage flag.Parse() From ddbc496ce7377ce2a17cc555e831308ffee5447e Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 10 Dec 2025 16:57:58 +0100 Subject: [PATCH 15/16] clean-up logs --- internal/hopsfsmount/File.go | 6 +----- internal/hopsfsmount/LocalCache.go | 26 ++++++++++---------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 712443e..9d7559a 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -420,7 +420,7 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o // Check if we have a valid cached staging file for this path if StagingFileCache != nil { if localFile, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { - logger.Info("Using cached staging file", file.logInfo(logger.Fields{Operation: operation})) + logger.Debug("Using cached staging file", file.logInfo(logger.Fields{Operation: operation})) return localFile, nil } } @@ -544,10 +544,6 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F return nil, err } fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} - logger.Info( - "Opened file, RO handle", - fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags}), - ) } } } diff --git a/internal/hopsfsmount/LocalCache.go b/internal/hopsfsmount/LocalCache.go index b76a113..58aeadc 100644 --- a/internal/hopsfsmount/LocalCache.go +++ b/internal/hopsfsmount/LocalCache.go @@ -57,7 +57,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time entry, ok := c.entries[hdfsPath] if !ok { - logger.Info("Cache miss for staging file", logger.Fields{ + logger.Debug("Cache miss for staging file", logger.Fields{ Path: hdfsPath, }) return nil, false @@ -66,7 +66,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time // Validate cache entry against upstream metadata // If size or mtime differs, the file was modified by another client if entry.size != upstreamSize || !entry.mtime.Equal(upstreamMtime) { - logger.Info(fmt.Sprintf("Cached staging file is stale, invalidating. cached[size=%d, mtime=%v] upstream[size=%d, mtime=%v]", + logger.Debug(fmt.Sprintf("Cached staging file is stale, invalidating. cached[size=%d, mtime=%v] upstream[size=%d, mtime=%v]", entry.size, entry.mtime, upstreamSize, upstreamMtime), logger.Fields{ Path: hdfsPath, TmpFile: entry.localPath, @@ -90,7 +90,7 @@ func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time // Move to front of LRU list (most recently used) c.lruList.MoveToFront(entry.lruElement) - logger.Info("Cache hit for staging file", logger.Fields{ + logger.Debug("Cache hit for staging file", logger.Fields{ Path: hdfsPath, TmpFile: entry.localPath, }) @@ -114,7 +114,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime ti existing.size = size existing.mtime = mtime c.lruList.MoveToFront(existing.lruElement) - logger.Info("Updated existing cache entry", logger.Fields{ + logger.Debug("Updated existing cache entry", logger.Fields{ Path: hdfsPath, TmpFile: localPath, FileSize: size, @@ -137,7 +137,7 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime ti entry.lruElement = c.lruList.PushFront(entry) c.entries[hdfsPath] = entry - logger.Info("Added staging file to cache", logger.Fields{ + logger.Debug("Added staging file to cache", logger.Fields{ Path: hdfsPath, TmpFile: localPath, FileSize: size, @@ -164,7 +164,7 @@ func (c *LocalCache) Rename(oldPath, newPath string) { entry, ok := c.entries[oldPath] if !ok { // No cache entry for old path, nothing to transfer - logger.Info("Cache rename: no entry for old path", logger.Fields{ + logger.Debug("Cache rename: no entry for old path", logger.Fields{ From: oldPath, To: newPath, }) @@ -184,7 +184,7 @@ func (c *LocalCache) Rename(oldPath, newPath string) { // Move to front of LRU (most recently used) c.lruList.MoveToFront(entry.lruElement) - logger.Info("Cache entry renamed", logger.Fields{ + logger.Debug("Cache entry renamed", logger.Fields{ From: oldPath, To: newPath, TmpFile: entry.localPath, @@ -213,7 +213,7 @@ func (c *LocalCache) removeEntry(hdfsPath string) { // Remove from map delete(c.entries, hdfsPath) - logger.Info("Removed staging file from cache", logger.Fields{ + logger.Debug("Removed staging file from cache", logger.Fields{ Path: hdfsPath, TmpFile: entry.localPath, }) @@ -228,12 +228,6 @@ func (c *LocalCache) evictOldest() { } entry := oldest.Value.(*CacheEntry) - logger.Info("Evicting oldest cache entry", logger.Fields{ - Path: entry.hdfsPath, - TmpFile: entry.localPath, - FileSize: entry.size, - }) - c.removeEntry(entry.hdfsPath) } @@ -247,7 +241,7 @@ func (c *LocalCache) Clear() { c.removeEntry(hdfsPath) } - logger.Info("Cleared staging file cache", nil) + logger.Debug("Cleared staging file cache", nil) } // Size returns the current number of entries in the cache. @@ -262,7 +256,7 @@ func (c *LocalCache) Size() int { func (c *LocalCache) ShouldCache(file *FileINode) bool { // Check if file exceeds max cacheable size if LocalCacheMaxFileSize > 0 && int64(file.Attrs.Size) > LocalCacheMaxFileSize { - logger.Info("File too large for caching", logger.Fields{ + logger.Debug("File too large for caching", logger.Fields{ Path: file.AbsolutePath(), FileSize: file.Attrs.Size, }) From 09062ddb9ce8ad1294b1feffeba495361d48848e Mon Sep 17 00:00:00 2001 From: Hamid Date: Wed, 10 Dec 2025 17:37:26 +0100 Subject: [PATCH 16/16] more cleanup --- internal/hopsfsmount/File.go | 5 ++--- internal/hopsfsmount/LocalCache.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index 9d7559a..ff9d176 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -155,8 +155,6 @@ func (file *FileINode) closeStaging() { stat, statErr := lrwfp.localFile.Stat() localPath := lrwfp.localFile.Name() if statErr == nil && localPath != "" { - // Store with current mtime from Attrs (set after successful flush to DFS) - // This allows us to detect if the file was modified by another client StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size(), file.Attrs.Mtime) } } @@ -289,7 +287,7 @@ func (file *FileINode) flushAttempt(operation string) error { file.Attrs.Size = written // Stat the file to get the server-assigned mtime after upload - // This is only needed for cache validation on subsequent reads + // This is needed for cache validation on subsequent reads if StagingFileCache != nil { upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) if err != nil { @@ -544,6 +542,7 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F return nil, err } fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} + logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } } } diff --git a/internal/hopsfsmount/LocalCache.go b/internal/hopsfsmount/LocalCache.go index 58aeadc..cfcb8a6 100644 --- a/internal/hopsfsmount/LocalCache.go +++ b/internal/hopsfsmount/LocalCache.go @@ -127,7 +127,6 @@ func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime ti c.evictOldest() } - // Create new entry entry := &CacheEntry{ hdfsPath: hdfsPath, localPath: localPath,