Skip to content

Commit becf538

Browse files
committed
Disable peer decommissioning when shuffle max disk size is 0
1 parent 688a30b commit becf538

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,8 @@ package object config {
627627
"shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " +
628628
"any shuffle migrations, and if there are no other executors available for migration " +
629629
"then shuffle blocks will be lost unless " +
630-
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.")
630+
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured. " +
631+
s"Set to 0 to migrate to fallback storage only.")
631632
.version("3.2.0")
632633
.bytesConf(ByteUnit.BYTE)
633634
.createOptional

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark._
2828
import org.apache.spark.errors.SparkCoreErrors
2929
import org.apache.spark.internal.{config, Logging}
3030
import org.apache.spark.internal.LogKeys._
31+
import org.apache.spark.internal.config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE
3132
import org.apache.spark.shuffle.ShuffleBlockInfo
3233
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
3334
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -319,8 +320,15 @@ private[storage] class BlockManagerDecommissioner(
319320
log"${MDC(TOTAL, localShuffles.size)} local shuffles are added. " +
320321
log"In total, ${MDC(NUM_REMAINED, remainedShuffles)} shuffles are remained.")
321322

323+
// migrate to fallback storage only if
324+
// STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH is set and
325+
// STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE is 0
326+
val fallbackOnly = conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
327+
conf.get(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE).contains(0)
328+
322329
// Update the threads doing migrations
323-
val livePeerSet = bm.getPeers(false).toSet
330+
val livePeerSet = if (fallbackOnly) Set(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
331+
else bm.getPeers(false).toSet
324332
val currentPeerSet = migrationPeers.keys.toSet
325333
val deadPeers = currentPeerSet.diff(livePeerSet)
326334
// Randomize the orders of the peers to avoid hotspot nodes.

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
175175
val conf = new SparkConf(false)
176176
.set("spark.app.id", "testId")
177177
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
178+
.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE, 0L) // migrate to fallback storage only
178179
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
179180
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
180181

@@ -200,8 +201,6 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
200201
when(resolver.getDataFile(shuffleId, mapId)).thenReturn(dataFile)
201202
}
202203

203-
when(bm.getPeers(mc.any()))
204-
.thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
205204
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
206205
when(bm.master).thenReturn(bmm)
207206
val blockTransferService = mock(classOf[BlockTransferService])

0 commit comments

Comments
 (0)