-
Notifications
You must be signed in to change notification settings - Fork 417
GC load sstable into memory #9762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Downloading metadta to local should prefer spark.local.dir with fallback to temp dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors SSTable reading to access files directly from remote storage (S3, Azure, GCS) instead of copying them to local temporary files first. The main change introduces a new HadoopBlockReadable class that implements the BlockReadable interface to read data directly from Hadoop FileSystem, eliminating the need for local file I/O during garbage collection operations.
Key changes:
- Introduced
HadoopBlockReadableto read SSTable blocks directly from remote storage via Hadoop FileSystem - Refactored
SSTableReaderto useBlockReadableabstraction instead of requiring local files - Removed local file copying logic from
SSTableReader.forMetaRange(),SSTableReader.forRange(),LakeFSInputFormat, andLakeFSContext
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| HadoopBlockReadable.scala | New class implementing BlockReadable interface for direct remote file access via Hadoop FileSystem |
| SSTableReader.scala | Refactored to use BlockReadable abstraction; removed local file copy logic from factory methods |
| LakeFSInputFormat.scala | Removed local file copying in EntryRecordReader, now uses HadoopBlockReadable directly |
| LakeFSContext.scala | Removed local file copying logic; added spark.local.dir configuration propagation |
| StorageUtils.scala | Added unused logger and whitespace formatting changes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala
Outdated
Show resolved
Hide resolved
clients/spark/src/main/scala/io/treeverse/clients/LakeFSContext.scala
Outdated
Show resolved
Hide resolved
| def forMetaRange( | ||
| configuration: Configuration, | ||
| metaRangeURL: String, | ||
| own: Boolean = true |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The own parameter is no longer used in this method. Since the file is now read directly via HadoopBlockReadable without copying to a local temporary file, this parameter should be removed to avoid confusion.
| def forRange( | ||
| configuration: Configuration, | ||
| rangeURL: String, | ||
| own: Boolean = true |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The own parameter is no longer used in this method. Since the file is now read directly via HadoopBlockReadable without copying to a local temporary file, this parameter should be removed to avoid confusion.
|
|
||
| override def length: Long = fileLength | ||
|
|
||
| override def readBlock(offset: Long, size: Long): IndexedBytes = { |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting size from Long to Int could cause an overflow if the size exceeds Int.MaxValue (approximately 2GB). Consider adding validation to check if size is within the valid range for an Int and throw an appropriate exception if it exceeds the limit.
if (size > Int.MaxValue) {
throw new IllegalArgumentException(s"Block size $size exceeds maximum supported size ${Int.MaxValue}")
}| override def readBlock(offset: Long, size: Long): IndexedBytes = { | |
| override def readBlock(offset: Long, size: Long): IndexedBytes = { | |
| if (size < 0 || size > Int.MaxValue) { | |
| throw new IllegalArgumentException(s"Block size $size is out of valid range [0, ${Int.MaxValue}]") | |
| } |
| this(new BlockReadableFile(new java.io.RandomAccessFile(file, "r")), companion, () => { | ||
| if (own) { | ||
| try { | ||
| file.delete() | ||
| } catch { | ||
| case e: Exception => | ||
| LoggerFactory.getLogger(classOf[SSTableReader[Proto]].toString).warn(s"delete owned file ${file.getName} (keep going): $e") | ||
| } | ||
| } | ||
| }) |
Copilot
AI
Dec 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential resource leak in this auxiliary constructor. If an exception occurs after creating the RandomAccessFile but before the object is fully constructed, the file handle will not be closed. Consider wrapping the construction in a try-catch block to ensure proper cleanup:
def this(file: java.io.File, companion: GeneratedMessageCompanion[Proto], own: Boolean) = {
val raf = new java.io.RandomAccessFile(file, "r")
try {
this(new BlockReadableFile(raf), companion, () => {
if (own) {
try {
file.delete()
} catch {
case e: Exception =>
LoggerFactory.getLogger(classOf[SSTableReader[Proto]].toString).warn(s"delete owned file ${file.getName} (keep going): $e")
}
}
})
} catch {
case e: Throwable =>
raf.close()
throw e
}
}| this(new BlockReadableFile(new java.io.RandomAccessFile(file, "r")), companion, () => { | |
| if (own) { | |
| try { | |
| file.delete() | |
| } catch { | |
| case e: Exception => | |
| LoggerFactory.getLogger(classOf[SSTableReader[Proto]].toString).warn(s"delete owned file ${file.getName} (keep going): $e") | |
| } | |
| } | |
| }) | |
| val raf = new java.io.RandomAccessFile(file, "r") | |
| try { | |
| this(new BlockReadableFile(raf), companion, () => { | |
| if (own) { | |
| try { | |
| file.delete() | |
| } catch { | |
| case e: Exception => | |
| LoggerFactory.getLogger(classOf[SSTableReader[Proto]].toString).warn(s"delete owned file ${file.getName} (keep going): $e") | |
| } | |
| } | |
| }) | |
| } catch { | |
| case e: Throwable => | |
| try { raf.close() } catch { case _: Throwable => () } | |
| throw e | |
| } |
Loading sstable into memory instead of use of copy file into temporary directory.
Close #9738