Skip to content

Commit 14a2c76

Browse files
committed
Attemp to fetch failed block from FallbackStorage
1 parent 688a30b commit 14a2c76

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ import scala.util.{Failure, Success}
3232
import io.netty.util.internal.OutOfDirectMemoryError
3333
import org.roaringbitmap.RoaringBitmap
3434

35-
import org.apache.spark.{MapOutputTracker, SparkException, TaskContext}
35+
import org.apache.spark.{MapOutputTracker, SparkEnv, SparkException, TaskContext}
3636
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
3737
import org.apache.spark.errors.SparkCoreErrors
38-
import org.apache.spark.internal.Logging
38+
import org.apache.spark.internal.{config, Logging}
3939
import org.apache.spark.internal.LogKeys._
4040
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
4141
import org.apache.spark.network.shuffle._
@@ -973,14 +973,29 @@ final class ShuffleBlockFetcherIterator(
973973
}
974974

975975
case FailureFetchResult(blockId, mapIndex, address, e) =>
976+
var error = e
976977
var errorMsg: String = null
977978
if (e.isInstanceOf[OutOfDirectMemoryError]) {
978979
val logMessage = log"Block ${MDC(BLOCK_ID, blockId)} fetch failed after " +
979980
log"${MDC(MAX_ATTEMPTS, maxAttemptsOnNettyOOM)} retries due to Netty OOM"
980981
logError(logMessage)
981982
errorMsg = logMessage.message
983+
} else if (
984+
SparkEnv.get.conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
985+
try {
986+
val buf = FallbackStorage.read(SparkEnv.get.conf, blockId)
987+
results.put(SuccessFetchResult(blockId, mapIndex, address, buf.size(), buf,
988+
isNetworkReqDone = false))
989+
result = null
990+
error = null
991+
} catch {
992+
case t: Throwable =>
993+
logInfo(s"Failed to read block from fallback storage: $blockId", t)
994+
}
995+
}
996+
if (error != null) {
997+
throwFetchFailedException(blockId, mapIndex, address, error, Some(errorMsg))
982998
}
983-
throwFetchFailedException(blockId, mapIndex, address, e, Some(errorMsg))
984999

9851000
case DeferFetchRequestResult(request) =>
9861001
val address = request.address

0 commit comments

Comments
 (0)