Skip to content

Commit 78f7b15

Browse files
committed
Disable peer decommissioning when shuffle max disk size is 0
1 parent 0ba9a9b commit 78f7b15

File tree

3 files changed

+12
-2
lines changed

3 files changed

+12
-2
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,8 @@ package object config {
632632
"shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " +
633633
"any shuffle migrations, and if there are no other executors available for migration " +
634634
"then shuffle blocks will be lost unless " +
635-
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.")
635+
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured. " +
636+
s"Set to 0 to migrate to fallback storage only.")
636637
.version("3.2.0")
637638
.bytesConf(ByteUnit.BYTE)
638639
.createOptional

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark._
2828
import org.apache.spark.errors.SparkCoreErrors
2929
import org.apache.spark.internal.{config, Logging}
3030
import org.apache.spark.internal.LogKeys._
31+
import org.apache.spark.internal.config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE
3132
import org.apache.spark.shuffle.ShuffleBlockInfo
3233
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
3334
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -319,8 +320,15 @@ private[storage] class BlockManagerDecommissioner(
319320
log"${MDC(TOTAL, localShuffles.size)} local shuffles are added. " +
320321
log"In total, ${MDC(NUM_REMAINED, remainedShuffles)} shuffles are remained.")
321322

323+
// migrate to fallback storage only if
324+
// STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH is set and
325+
// STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE is 0
326+
val fallbackOnly = conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
327+
conf.get(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE).contains(0)
328+
322329
// Update the threads doing migrations
323-
val livePeerSet = bm.getPeers(false).toSet
330+
val livePeerSet = if (fallbackOnly) Set(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
331+
else bm.getPeers(false).toSet
324332
val currentPeerSet = migrationPeers.keys.toSet
325333
val deadPeers = currentPeerSet.diff(livePeerSet)
326334
// Randomize the orders of the peers to avoid hotspot nodes.

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
210210
val conf = new SparkConf(false)
211211
.set("spark.app.id", "testId")
212212
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
213+
.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE, 0L) // migrate to fallback storage only
213214
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
214215
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
215216
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

0 commit comments

Comments
 (0)