diff --git a/internal/hopsfsmount/Dir.go b/internal/hopsfsmount/Dir.go index 5581d77..a40aa30 100644 --- a/internal/hopsfsmount/Dir.go +++ b/internal/hopsfsmount/Dir.go @@ -341,6 +341,10 @@ func (dir *DirINode) Remove(ctx context.Context, req *fuse.RemoveRequest) error err := dir.FileSystem.getDFSConnector().Remove(path) if err == nil { dir.removeChildInode(Remove, req.Name) + // Invalidate staging file cache for the removed path + if StagingFileCache != nil { + StagingFileCache.Remove(path) + } logger.Info("Removed path", logger.Fields{Operation: Remove, Path: path}) } else { logger.Warn("Failed to remove path", logger.Fields{Operation: Remove, Path: path, Error: err}) @@ -379,6 +383,11 @@ func (srcParent *DirINode) renameInt(operationName, oldName, newName string, dst return err } + // Transfer staging file cache entry from old path to new path if it exists + if StagingFileCache != nil { + StagingFileCache.Rename(oldPath, newPath) + } + // disconnect src inode if srcInode != nil { srcParent.removeChildInode(Rename, oldName) diff --git a/internal/hopsfsmount/File.go b/internal/hopsfsmount/File.go index e1c89e1..ff9d176 100644 --- a/internal/hopsfsmount/File.go +++ b/internal/hopsfsmount/File.go @@ -24,9 +24,9 @@ import ( // Lock weight ordering (must be followed to avoid deadlocks): // -// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush) -// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests -// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle +// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush) +// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests +// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle // // Rule: A goroutine holding a lock can only acquire locks with LOWER weight. // Note: dataMutex and fileMutex are independent (never nested with each other). @@ -150,6 +150,15 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) { // Called with fileHandleMutex held by RemoveHandle() func (file *FileINode) closeStaging() { if file.fileProxy != nil { // if not already closed + // If caching is enabled and this is a LocalRWFileProxy, add to cache before closing + if lrwfp, ok := file.fileProxy.(*LocalRWFileProxy); ok && StagingFileCache != nil { + stat, statErr := lrwfp.localFile.Stat() + localPath := lrwfp.localFile.Name() + if statErr == nil && localPath != "" { + StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size(), file.Attrs.Mtime) + } + } + err := file.fileProxy.Close() if err != nil { logger.Error("Failed to close staging file", file.logInfo(logger.Fields{Operation: Close, Error: err})) @@ -276,6 +285,19 @@ func (file *FileINode) flushAttempt(operation string) error { file.logInfo(logger.Fields{Operation: operation, Bytes: written})) file.Attrs.Size = written + + // Stat the file to get the server-assigned mtime after upload + // This is needed for cache validation on subsequent reads + if StagingFileCache != nil { + upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath()) + if err != nil { + logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err})) + } else { + file.Attrs.Mtime = upstreamInfo.Mtime + file.Attrs.Size = upstreamInfo.Size + } + } + return nil } @@ -386,12 +408,20 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o logger.Info("Created an empty file in DFS", file.logInfo(logger.Fields{Operation: operation})) w.Close() } else { - // Request to write to existing file - _, err := hdfsAccessor.Stat(absPath) + // Request to write to existing file - stat to verify it exists and get metadata + upstreamInfo, err := hdfsAccessor.Stat(absPath) if err != nil { logger.Error("Failed to stat file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err})) return nil, syscall.ENOENT } + + // Check if we have a valid cached staging file for this path + if StagingFileCache != nil { + if localFile, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + logger.Debug("Using cached staging file", file.logInfo(logger.Fields{Operation: operation})) + return localFile, nil + } + } } stagingFile, err := ioutil.TempFile(StagingDir, "stage") @@ -399,38 +429,68 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o logger.Error("Failed to create staging file", file.logInfo(logger.Fields{Operation: operation, Error: err})) return nil, err } - os.Remove(stagingFile.Name()) + // When caching is enabled, we keep the file on disk for potential reuse. + if StagingFileCache == nil { + os.Remove(stagingFile.Name()) + } logger.Info("Created staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()})) if existsInDFS { - if err := file.downloadToStaging(stagingFile, operation); err != nil { + if err := file.downloadToStaging(stagingFile); err != nil { return nil, err } } return stagingFile, nil } -func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) error { +func (file *FileINode) downloadToStaging(stagingFile *os.File) error { hdfsAccessor := file.FileSystem.getDFSConnector() absPath := file.AbsolutePath() reader, err := hdfsAccessor.OpenRead(absPath) if err != nil { - logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err})) - // TODO remove the staging file if there are no more active handles + logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Error: err})) return err } nc, err := io.Copy(stagingFile, reader) if err != nil { - logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Operation: operation, Error: err})) + logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Error: err})) return err } reader.Close() - logger.Info(fmt.Sprintf("Downloaded a copy to stating dir. %d bytes copied", nc), file.logInfo(logger.Fields{Operation: operation})) + logger.Info(fmt.Sprintf("Downloaded a copy to staging dir. %d bytes copied", nc), file.logInfo(nil)) return nil } +// createStagingFileForRead creates a staging file, downloads content from DFS, and returns a FileProxy. +// This is used when opening an existing file for reading and caching is enabled. +// Returns a LocalRWFileProxy if successful, or nil if failed. +func (file *FileINode) createStagingFileForRead() *LocalRWFileProxy { + stagingFile, err := os.CreateTemp(StagingDir, "stage") + if err != nil { + logger.Error("Failed to create staging file for read", file.logInfo(logger.Fields{Error: err})) + return nil + } + + if err := file.downloadToStaging(stagingFile); err != nil { + stagingFile.Close() + os.Remove(stagingFile.Name()) + return nil + } + + // Seek back to start for reading + if _, err := stagingFile.Seek(0, 0); err != nil { + stagingFile.Close() + os.Remove(stagingFile.Name()) + return nil + } + + logger.Info("Downloaded file to cache", file.logInfo(logger.Fields{TmpFile: stagingFile.Name()})) + //TODO: Should it be in RO mode? + return &LocalRWFileProxy{localFile: stagingFile, file: file} +} + // Creates new file handle // Lock order: fileHandleMutex (1) alone // Called from Open which holds fileMutex (2), so order is: fileMutex (2) → fileHandleMutex (1) @@ -463,14 +523,24 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F fh.File.fileProxy = file.fileProxy logger.Info("Opened file, Returning existing handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } else { - // we alway open the file in RO mode. when the client writes to the file - // then we upgrade the handle. However, if the file is already opened in - // in RW state then we use the existing RW handle - reader, err := file.FileSystem.getDFSConnector().OpenRead(file.AbsolutePath()) - if err != nil { - logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err})) - return nil, err - } else { + absPath := file.AbsolutePath() + hdfsAccessor := file.FileSystem.getDFSConnector() + + // Try to get from cache or download to cache + if StagingFileCache != nil { + fh.File.fileProxy = StagingFileCache.GetOrLoad(file, hdfsAccessor) + } + + // If no local file proxy was set, open from HopsFS in RO mode + if fh.File.fileProxy == nil { + reader, err := hdfsAccessor.OpenRead(absPath) + if err != nil { + logger.Warn( + "Opening file failed", + fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err}), + ) + return nil, err + } fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file} logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags})) } diff --git a/internal/hopsfsmount/LocalCache.go b/internal/hopsfsmount/LocalCache.go new file mode 100644 index 0000000..cfcb8a6 --- /dev/null +++ b/internal/hopsfsmount/LocalCache.go @@ -0,0 +1,303 @@ +// Copyright (c) Hopsworks AB. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package hopsfsmount + +import ( + "container/list" + "fmt" + "os" + "sync" + "time" + + "hopsworks.ai/hopsfsmount/internal/hopsfsmount/logger" +) + +// LocalCache manages cached staging files using LRU eviction. +// When files are written and closed, their local staging copies are kept +// in this cache for faster reopening instead of downloading from DFS again. +type LocalCache struct { + mu sync.Mutex + maxEntries int + entries map[string]*CacheEntry + lruList *list.List // front = most recently used, back = least recently used +} + +// CacheEntry represents a cached staging file +type CacheEntry struct { + hdfsPath string + localPath string + size int64 + mtime time.Time // modification time when cached, used to detect upstream changes + lruElement *list.Element +} + +// StagingFileCache Local cache instance, initialized in config.go if caching is enabled +var StagingFileCache *LocalCache + +// NewLocalCache creates a new cache with the given maximum number of entries. +// When the cache is full, the least recently used entry is evicted. +func NewLocalCache(maxEntries int) *LocalCache { + return &LocalCache{ + maxEntries: maxEntries, + entries: make(map[string]*CacheEntry), + lruList: list.New(), + } +} + +// Get retrieves a cached file for the given HDFS path. +// The upstreamSize and upstreamMtime parameters are the current metadata from HopsFS, +// used to validate that the cached file hasn't been modified by another client. +// Returns an open file handle and true if found and valid, or (nil, false) if not cached or stale. +// If the cache entry is stale (metadata mismatch) or file can't be opened, it is automatically removed. +// Moves the entry to the front of the LRU list on successful access. +func (c *LocalCache) Get(hdfsPath string, upstreamSize int64, upstreamMtime time.Time) (*os.File, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, ok := c.entries[hdfsPath] + if !ok { + logger.Debug("Cache miss for staging file", logger.Fields{ + Path: hdfsPath, + }) + return nil, false + } + + // Validate cache entry against upstream metadata + // If size or mtime differs, the file was modified by another client + if entry.size != upstreamSize || !entry.mtime.Equal(upstreamMtime) { + logger.Debug(fmt.Sprintf("Cached staging file is stale, invalidating. cached[size=%d, mtime=%v] upstream[size=%d, mtime=%v]", + entry.size, entry.mtime, upstreamSize, upstreamMtime), logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + }) + c.removeEntry(hdfsPath) + return nil, false + } + + // Try to open the cached file + localFile, err := os.OpenFile(entry.localPath, os.O_RDWR, 0600) + if err != nil { + logger.Warn("Failed to open cached staging file, removing from cache", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + Error: err, + }) + c.removeEntry(hdfsPath) + return nil, false + } + + // Move to front of LRU list (most recently used) + c.lruList.MoveToFront(entry.lruElement) + + logger.Debug("Cache hit for staging file", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + }) + + return localFile, true +} + +// Put adds a staging file to the cache. If the cache is full, the least +// recently used entry is evicted first. If an entry already exists for +// this path, it is updated and moved to the front of the LRU list. +// The mtime parameter should be the modification time from HopsFS, used +// to detect if the file was modified by another client. +func (c *LocalCache) Put(hdfsPath string, localPath string, size int64, mtime time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + + // Check if entry already exists + if existing, ok := c.entries[hdfsPath]; ok { + // Update existing entry + existing.localPath = localPath + existing.size = size + existing.mtime = mtime + c.lruList.MoveToFront(existing.lruElement) + logger.Debug("Updated existing cache entry", logger.Fields{ + Path: hdfsPath, + TmpFile: localPath, + FileSize: size, + }) + return + } + + // Evict oldest entries if cache is full + for len(c.entries) >= c.maxEntries { + c.evictOldest() + } + + entry := &CacheEntry{ + hdfsPath: hdfsPath, + localPath: localPath, + size: size, + mtime: mtime, + } + entry.lruElement = c.lruList.PushFront(entry) + c.entries[hdfsPath] = entry + + logger.Debug("Added staging file to cache", logger.Fields{ + Path: hdfsPath, + TmpFile: localPath, + FileSize: size, + Entries: len(c.entries), + }) +} + +// Remove explicitly removes an entry from the cache. +// This should be called when a file is deleted in DFS. +func (c *LocalCache) Remove(hdfsPath string) { + c.mu.Lock() + defer c.mu.Unlock() + + c.removeEntry(hdfsPath) +} + +// Rename transfers a cache entry from oldPath to newPath. +// If the entry doesn't exist for oldPath, this is a no-op. +// If an entry already exists for newPath, it is replaced. +func (c *LocalCache) Rename(oldPath, newPath string) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, ok := c.entries[oldPath] + if !ok { + // No cache entry for old path, nothing to transfer + logger.Debug("Cache rename: no entry for old path", logger.Fields{ + From: oldPath, + To: newPath, + }) + return + } + + // Remove any existing entry at the new path + if _, exists := c.entries[newPath]; exists { + c.removeEntry(newPath) + } + + // Update the entry's hdfsPath and move to new key + delete(c.entries, oldPath) + entry.hdfsPath = newPath + c.entries[newPath] = entry + + // Move to front of LRU (most recently used) + c.lruList.MoveToFront(entry.lruElement) + + logger.Debug("Cache entry renamed", logger.Fields{ + From: oldPath, + To: newPath, + TmpFile: entry.localPath, + }) +} + +// removeEntry removes an entry without locking (internal use only) +func (c *LocalCache) removeEntry(hdfsPath string) { + entry, ok := c.entries[hdfsPath] + if !ok { + return + } + + // Remove from LRU list + c.lruList.Remove(entry.lruElement) + + // Delete local file + if err := os.Remove(entry.localPath); err != nil && !os.IsNotExist(err) { + logger.Warn("Failed to remove cached staging file", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + Error: err, + }) + } + + // Remove from map + delete(c.entries, hdfsPath) + + logger.Debug("Removed staging file from cache", logger.Fields{ + Path: hdfsPath, + TmpFile: entry.localPath, + }) +} + +// evictOldest removes the least recently used entry from the cache. +// Must be called with mutex held. +func (c *LocalCache) evictOldest() { + oldest := c.lruList.Back() + if oldest == nil { + return + } + + entry := oldest.Value.(*CacheEntry) + c.removeEntry(entry.hdfsPath) +} + +// Clear removes all entries from the cache. +// This should be called during shutdown. +func (c *LocalCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + for hdfsPath := range c.entries { + c.removeEntry(hdfsPath) + } + + logger.Debug("Cleared staging file cache", nil) +} + +// Size returns the current number of entries in the cache. +func (c *LocalCache) Size() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.entries) +} + +// ShouldCache returns true if a file should be downloaded to the local cache. +// Checks disk space and file size limits. +func (c *LocalCache) ShouldCache(file *FileINode) bool { + // Check if file exceeds max cacheable size + if LocalCacheMaxFileSize > 0 && int64(file.Attrs.Size) > LocalCacheMaxFileSize { + logger.Debug("File too large for caching", logger.Fields{ + Path: file.AbsolutePath(), + FileSize: file.Attrs.Size, + }) + return false + } + + if err := file.checkDiskSpace(); err != nil { + logger.Warn("Not enough disk space for caching", logger.Fields{ + Path: file.AbsolutePath(), + Error: err, + }) + return false + } + return true +} + +// GetOrLoad tries to get a file from cache, or downloads it to cache if not found. +// Returns a FileProxy if successful (either from cache or freshly downloaded), or nil if caching is not possible. +func (c *LocalCache) GetOrLoad(file *FileINode, hdfsAccessor HdfsAccessor) FileProxy { + absPath := file.AbsolutePath() + + upstreamInfo, err := hdfsAccessor.Stat(absPath) + if err != nil { + logger.Warn("Failed to stat file for cache validation, skipping cache", logger.Fields{ + Path: absPath, + Error: err, + }) + return nil + } + + // Update file.Attrs with upstream metadata so closeStaging can use correct mtime for caching + file.Attrs.Size = upstreamInfo.Size + file.Attrs.Mtime = upstreamInfo.Mtime + + if cachedFile, ok := c.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok { + return &LocalRWFileProxy{localFile: cachedFile, file: file} + } + + if !c.ShouldCache(file) { + return nil + } + + // Download to staging file + return file.createStagingFileForRead() +} diff --git a/internal/hopsfsmount/config.go b/internal/hopsfsmount/config.go index 43351fe..6b987b5 100644 --- a/internal/hopsfsmount/config.go +++ b/internal/hopsfsmount/config.go @@ -43,6 +43,8 @@ var FallBackUser = "root" var FallBackGroup = "root" var UserUmask string = "" var Umask os.FileMode +var LocalCacheMaxEntries int = 50 +var LocalCacheMaxFileSize int64 = 10 * 1024 * 1024 // 10 MB default func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.BoolVar(&LazyMount, "lazy", false, "Allows to mount HopsFS filesystem before HopsFS is available") @@ -72,6 +74,8 @@ func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { flag.StringVar(&FallBackUser, "fallBackUser", "root", "Local user name if the DFS user is not found on the local file system") flag.StringVar(&FallBackGroup, "fallBackGroup", "root", "Local group name if the DFS group is not found on the local file system.") flag.StringVar(&UserUmask, "umask", "", "Umask for the file system. Must be a 4 digit octal number.") + flag.IntVar(&LocalCacheMaxEntries, "localCacheSize", 50, "Max staging files to cache locally (0 to disable)") + flag.Int64Var(&LocalCacheMaxFileSize, "localCacheMaxFileSize", 10*1024*1024, "Max file size in bytes to cache locally (default: 10MB)") flag.Usage = usage flag.Parse() @@ -112,6 +116,14 @@ func ParseArgsAndInitLogger(retryPolicy *RetryPolicy) { } logger.Info(fmt.Sprintf("Using umask: %o", Umask), nil) + // Initialize staging file cache if enabled + if LocalCacheMaxEntries > 0 { + StagingFileCache = NewLocalCache(LocalCacheMaxEntries) + logger.Info(fmt.Sprintf("Staging file cache enabled with max %d entries", LocalCacheMaxEntries), nil) + } else { + logger.Info("Staging file cache disabled", nil) + } + logger.Info(fmt.Sprintf("Staging dir is:%s, Using TLS: %v, RetryAttempts: %d, LogFile: %s", StagingDir, Tls, retryPolicy.MaxAttempts, LogFile), nil) logger.Info(fmt.Sprintf("hopsfs-mount: current head GITCommit: %s Built time: %s Built by: %s ", GITCOMMIT, BUILDTIME, HOSTNAME), nil) }