@@ -196,7 +196,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
196196 checksumEnabled : Boolean = true ,
197197 checksumAlgorithm : String = " ADLER32" ,
198198 shuffleMetrics : Option [ShuffleReadMetricsReporter ] = None ,
199- doBatchFetch : Boolean = false ): ShuffleBlockFetcherIterator = {
199+ doBatchFetch : Boolean = false ,
200+ fallbackStorage : Option [FallbackStorage ] = None ): ShuffleBlockFetcherIterator = {
200201 val tContext = taskContext.getOrElse(TaskContext .empty())
201202 new ShuffleBlockFetcherIterator (
202203 tContext,
@@ -222,7 +223,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
222223 checksumEnabled,
223224 checksumAlgorithm,
224225 shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),
225- doBatchFetch)
226+ doBatchFetch,
227+ fallbackStorage)
226228 }
227229 // scalastyle:on argcount
228230
@@ -1127,6 +1129,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
11271129 assert(e.getMessage.contains(" fetch failed after 10 retries due to Netty OOM" ))
11281130 }
11291131
1132+ test(" SPARK-XXXXX: missing blocks attempts to read from fallback storage" ) {
1133+ val blockManager = createMockBlockManager()
1134+
1135+ configureMockTransfer(Map .empty)
1136+ val remoteBmId = BlockManagerId (" test-remote-client-1" , " test-remote-host" , 2 )
1137+ val blockId = ShuffleBlockId (0 , 0 , 0 )
1138+ val blocksByAddress = Map [BlockManagerId , Seq [(BlockId , Long , Int )]](
1139+ (remoteBmId, Seq ((blockId, 1L , 0 )))
1140+ )
1141+
1142+ // iterator with no FallbackStorage cannot find the block
1143+ {
1144+ val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress)
1145+ val e = intercept[FetchFailedException ] {
1146+ iterator.next()
1147+ }
1148+ assert(e.getCause != null )
1149+ assert(e.getCause.isInstanceOf [BlockNotFoundException ])
1150+ assert(e.getCause.getMessage.contains(" Block shuffle_0_0_0 not found" ))
1151+ }
1152+
1153+ // iterator with FallbackStorage that does not store the block cannot find it either
1154+ val fallbackStorage = mock(classOf [FallbackStorage ])
1155+
1156+ {
1157+ when(fallbackStorage.read(ShuffleBlockId (0 , 0 , 1 ))).thenReturn(new TestManagedBuffer (127 ))
1158+ val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress,
1159+ fallbackStorage = Some (fallbackStorage))
1160+ val e = intercept[FetchFailedException ] {
1161+ iterator.next()
1162+ }
1163+ assert(e.getCause != null )
1164+ assert(e.getCause.isInstanceOf [BlockNotFoundException ])
1165+ assert(e.getCause.getMessage.contains(" Block shuffle_0_0_0 not found" ))
1166+ }
1167+
1168+ // iterator with FallbackStorage that stores the block can find it
1169+ {
1170+ when(fallbackStorage.read(ShuffleBlockId (0 , 0 , 0 ))).thenReturn(new TestManagedBuffer (127 ))
1171+ val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress,
1172+ fallbackStorage = Some (fallbackStorage))
1173+ assert(iterator.hasNext)
1174+ val (id, _) = iterator.next()
1175+ assert(id === ShuffleBlockId (0 , 0 , 0 ))
1176+ assert(! iterator.hasNext)
1177+ }
1178+ }
1179+
11301180 /**
11311181 * Prepares the transfer to trigger success for all the blocks present in blockChunks. It will
11321182 * trigger failure of block which is not part of blockChunks.
0 commit comments