Skip to content
Open
9 changes: 9 additions & 0 deletions internal/hopsfsmount/Dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ func (dir *DirINode) Remove(ctx context.Context, req *fuse.RemoveRequest) error
err := dir.FileSystem.getDFSConnector().Remove(path)
if err == nil {
dir.removeChildInode(Remove, req.Name)
// Invalidate staging file cache for the removed path
if StagingFileCache != nil {
StagingFileCache.Remove(path)
}
logger.Info("Removed path", logger.Fields{Operation: Remove, Path: path})
} else {
logger.Warn("Failed to remove path", logger.Fields{Operation: Remove, Path: path, Error: err})
Expand Down Expand Up @@ -379,6 +383,11 @@ func (srcParent *DirINode) renameInt(operationName, oldName, newName string, dst
return err
}

// Transfer staging file cache entry from old path to new path if it exists
if StagingFileCache != nil {
StagingFileCache.Rename(oldPath, newPath)
}

// disconnect src inode
if srcInode != nil {
srcParent.removeChildInode(Rename, oldName)
Expand Down
110 changes: 90 additions & 20 deletions internal/hopsfsmount/File.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (

// Lock weight ordering (must be followed to avoid deadlocks):
//
// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush)
// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests
// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle
// dataMutex (weight 3) - serializes I/O operations (read/write/truncate/flush)
// fileMutex (weight 2) - protects file metadata (Attrs) and serializes Open requests
// fileHandleMutex (weight 1) - protects activeHandles and fileProxy lifecycle
//
// Rule: A goroutine holding a lock can only acquire locks with LOWER weight.
// Note: dataMutex and fileMutex are independent (never nested with each other).
Expand Down Expand Up @@ -150,6 +150,15 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) {
// Called with fileHandleMutex held by RemoveHandle()
func (file *FileINode) closeStaging() {
if file.fileProxy != nil { // if not already closed
// If caching is enabled and this is a LocalRWFileProxy, add to cache before closing
if lrwfp, ok := file.fileProxy.(*LocalRWFileProxy); ok && StagingFileCache != nil {
stat, statErr := lrwfp.localFile.Stat()
localPath := lrwfp.localFile.Name()
if statErr == nil && localPath != "" {
StagingFileCache.Put(file.AbsolutePath(), localPath, stat.Size(), file.Attrs.Mtime)
}
}

err := file.fileProxy.Close()
if err != nil {
logger.Error("Failed to close staging file", file.logInfo(logger.Fields{Operation: Close, Error: err}))
Expand Down Expand Up @@ -276,6 +285,19 @@ func (file *FileINode) flushAttempt(operation string) error {
file.logInfo(logger.Fields{Operation: operation, Bytes: written}))

file.Attrs.Size = written

// Stat the file to get the server-assigned mtime after upload
// This is needed for cache validation on subsequent reads
if StagingFileCache != nil {
upstreamInfo, err := hdfsAccessor.Stat(file.AbsolutePath())
if err != nil {
logger.Warn("Failed to stat file after upload, mtime may be stale", file.logInfo(logger.Fields{Operation: operation, Error: err}))
} else {
file.Attrs.Mtime = upstreamInfo.Mtime
file.Attrs.Size = upstreamInfo.Size
}
}

return nil
}

Expand Down Expand Up @@ -386,51 +408,89 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*o
logger.Info("Created an empty file in DFS", file.logInfo(logger.Fields{Operation: operation}))
w.Close()
} else {
// Request to write to existing file
_, err := hdfsAccessor.Stat(absPath)
// Request to write to existing file - stat to verify it exists and get metadata
upstreamInfo, err := hdfsAccessor.Stat(absPath)
if err != nil {
logger.Error("Failed to stat file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err}))
return nil, syscall.ENOENT
}

// Check if we have a valid cached staging file for this path
if StagingFileCache != nil {
if localFile, ok := StagingFileCache.Get(absPath, int64(upstreamInfo.Size), upstreamInfo.Mtime); ok {
logger.Debug("Using cached staging file", file.logInfo(logger.Fields{Operation: operation}))
return localFile, nil
}
}
}

stagingFile, err := ioutil.TempFile(StagingDir, "stage")
if err != nil {
logger.Error("Failed to create staging file", file.logInfo(logger.Fields{Operation: operation, Error: err}))
return nil, err
}
os.Remove(stagingFile.Name())
// When caching is enabled, we keep the file on disk for potential reuse.
if StagingFileCache == nil {
os.Remove(stagingFile.Name())
}
logger.Info("Created staging file", file.logInfo(logger.Fields{Operation: operation, TmpFile: stagingFile.Name()}))

if existsInDFS {
if err := file.downloadToStaging(stagingFile, operation); err != nil {
if err := file.downloadToStaging(stagingFile); err != nil {
return nil, err
}
}
return stagingFile, nil
}

func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) error {
func (file *FileINode) downloadToStaging(stagingFile *os.File) error {
hdfsAccessor := file.FileSystem.getDFSConnector()
absPath := file.AbsolutePath()

reader, err := hdfsAccessor.OpenRead(absPath)
if err != nil {
logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Operation: operation, Error: err}))
// TODO remove the staging file if there are no more active handles
logger.Error("Failed to open file in DFS", file.logInfo(logger.Fields{Error: err}))
return err
}

nc, err := io.Copy(stagingFile, reader)
if err != nil {
logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Operation: operation, Error: err}))
logger.Error("Failed to copy content to staging file", file.logInfo(logger.Fields{Error: err}))
return err
}
reader.Close()
logger.Info(fmt.Sprintf("Downloaded a copy to stating dir. %d bytes copied", nc), file.logInfo(logger.Fields{Operation: operation}))
logger.Info(fmt.Sprintf("Downloaded a copy to staging dir. %d bytes copied", nc), file.logInfo(nil))
return nil
}

// createStagingFileForRead creates a staging file, downloads content from DFS, and returns a FileProxy.
// This is used when opening an existing file for reading and caching is enabled.
// Returns a LocalRWFileProxy if successful, or nil if failed.
func (file *FileINode) createStagingFileForRead() *LocalRWFileProxy {
stagingFile, err := os.CreateTemp(StagingDir, "stage")
if err != nil {
logger.Error("Failed to create staging file for read", file.logInfo(logger.Fields{Error: err}))
return nil
}

if err := file.downloadToStaging(stagingFile); err != nil {
stagingFile.Close()
os.Remove(stagingFile.Name())
return nil
}

// Seek back to start for reading
if _, err := stagingFile.Seek(0, 0); err != nil {
stagingFile.Close()
os.Remove(stagingFile.Name())
return nil
}

logger.Info("Downloaded file to cache", file.logInfo(logger.Fields{TmpFile: stagingFile.Name()}))
//TODO: Should it be in RO mode?
return &LocalRWFileProxy{localFile: stagingFile, file: file}
}

// Creates new file handle
// Lock order: fileHandleMutex (1) alone
// Called from Open which holds fileMutex (2), so order is: fileMutex (2) → fileHandleMutex (1)
Expand Down Expand Up @@ -463,14 +523,24 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F
fh.File.fileProxy = file.fileProxy
logger.Info("Opened file, Returning existing handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags}))
} else {
// we alway open the file in RO mode. when the client writes to the file
// then we upgrade the handle. However, if the file is already opened in
// in RW state then we use the existing RW handle
reader, err := file.FileSystem.getDFSConnector().OpenRead(file.AbsolutePath())
if err != nil {
logger.Warn("Opening file failed", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err}))
return nil, err
} else {
absPath := file.AbsolutePath()
hdfsAccessor := file.FileSystem.getDFSConnector()

// Try to get from cache or download to cache
if StagingFileCache != nil {
fh.File.fileProxy = StagingFileCache.GetOrLoad(file, hdfsAccessor)
}

// If no local file proxy was set, open from HopsFS in RO mode
if fh.File.fileProxy == nil {
reader, err := hdfsAccessor.OpenRead(absPath)
if err != nil {
logger.Warn(
"Opening file failed",
fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags, Error: err}),
)
return nil, err
}
fh.File.fileProxy = &RemoteROFileProxy{hdfsReader: reader, file: file}
logger.Info("Opened file, RO handle", fh.logInfo(logger.Fields{Operation: operation, Flags: fh.fileFlags}))
}
Expand Down
Loading