Skip to content

Commit 11f8c36

Browse files
committed
FallbackStorage retries FileNotFoundExceptions
1 parent 688a30b commit 11f8c36

File tree

3 files changed

+150
-8
lines changed

3 files changed

+150
-8
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,26 @@ package object config {
614614
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
615615
.createOptional
616616

617+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY =
618+
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationDelay")
619+
.doc("The maximum expected delay for files written by one executor to become " +
620+
"available to other executors.")
621+
.version("4.1.0")
622+
.timeConf(TimeUnit.MILLISECONDS)
623+
.checkValue(_ > 0, "Value must be positive.")
624+
.createOptional
625+
626+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT =
627+
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationWait")
628+
.doc(
629+
"When an executor cannot find a file in the fallback storage it waits " +
630+
"this amount of time before attempting to open the file again, " +
631+
f"while not exceeding ${STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key}.")
632+
.version("4.1.0")
633+
.timeConf(TimeUnit.MILLISECONDS)
634+
.checkValue(_ > 0, "Value must be positive.")
635+
.createWithDefaultString("1s")
636+
617637
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
618638
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
619639
.doc("If true, Spark cleans up its fallback storage data during shutting down.")

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,27 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.DataInputStream
20+
import java.io.{DataInputStream, FileNotFoundException}
2121
import java.nio.ByteBuffer
2222

23+
import scala.annotation.tailrec
2324
import scala.concurrent.Future
2425
import scala.reflect.ClassTag
2526

2627
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.{FileSystem, Path}
28+
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
2829

2930
import org.apache.spark.{SparkConf, SparkException}
3031
import org.apache.spark.deploy.SparkHadoopUtil
3132
import org.apache.spark.internal.Logging
3233
import org.apache.spark.internal.LogKeys._
33-
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
34+
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT}
3435
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3536
import org.apache.spark.network.util.JavaUtils
3637
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3738
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
3839
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
39-
import org.apache.spark.util.Utils
40+
import org.apache.spark.util.{Clock, SystemClock, Utils}
4041

4142
/**
4243
* A fallback storage used by storage decommissioners.
@@ -155,6 +156,61 @@ private[spark] object FallbackStorage extends Logging {
155156
FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength)
156157
}
157158

159+
/**
160+
* Open the file, retry a FileNotFoundException for waitMs milliseconds, unless this would
161+
* exceed the deadline. In the latter case, rethrow the exception.
162+
*/
163+
@tailrec
164+
private def open(
165+
filesystem: FileSystem,
166+
path: Path,
167+
deadlineTs: Long,
168+
waitMs: Long,
169+
clock: Clock): FSDataInputStream = {
170+
try {
171+
filesystem.open(path)
172+
} catch {
173+
case fnf: FileNotFoundException =>
174+
val waitTillTs = clock.getTimeMillis() + waitMs
175+
if (waitTillTs <= deadlineTs) {
176+
logInfo(f"File not found, waiting ${waitMs / 1000}s: $path")
177+
clock.waitTillTime(waitTillTs)
178+
open(filesystem, path, deadlineTs, waitMs, clock)
179+
} else {
180+
throw fnf
181+
}
182+
}
183+
}
184+
185+
/**
186+
* Open the file and retry FileNotFoundExceptions according to
187+
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY and
188+
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT
189+
*/
190+
// Visible for testing
191+
private[spark] def open(
192+
conf: SparkConf,
193+
filesystem: FileSystem,
194+
path: Path,
195+
clock: Clock = new SystemClock()): FSDataInputStream = {
196+
val replicationDelay = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY)
197+
if (replicationDelay.isDefined) {
198+
val replicationDeadline = clock.getTimeMillis() + replicationDelay.get
199+
val replicationWaitMs = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT)
200+
try {
201+
open(filesystem, path, replicationDeadline, replicationWaitMs, clock)
202+
} catch {
203+
case fnf: FileNotFoundException =>
204+
logInfo(
205+
"File not found, exceeded expected replication delay " +
206+
f"of ${replicationDelay.get}s: $path")
207+
throw fnf
208+
}
209+
} else {
210+
filesystem.open(path)
211+
}
212+
}
213+
158214
/**
159215
* Read a ManagedBuffer.
160216
*/
@@ -180,7 +236,7 @@ private[spark] object FallbackStorage extends Logging {
180236
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
181237
val start = startReduceId * 8L
182238
val end = endReduceId * 8L
183-
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
239+
Utils.tryWithResource(open(conf, fallbackFileSystem, indexFile)) { inputStream =>
184240
Utils.tryWithResource(new DataInputStream(inputStream)) { index =>
185241
index.skip(start)
186242
val offset = index.readLong()
@@ -193,7 +249,7 @@ private[spark] object FallbackStorage extends Logging {
193249
logDebug(s"To byte array $size")
194250
val array = new Array[Byte](size.toInt)
195251
val startTimeNs = System.nanoTime()
196-
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
252+
Utils.tryWithResource(open(conf, fallbackFileSystem, dataFile)) { f =>
197253
f.seek(offset)
198254
f.readFully(array)
199255
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@
1616
*/
1717
package org.apache.spark.storage
1818

19-
import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
19+
import java.io.{DataOutputStream, File, FileNotFoundException, FileOutputStream, InputStream, IOException}
2020
import java.nio.file.Files
2121

2222
import scala.concurrent.duration._
2323
import scala.util.Random
2424

2525
import org.apache.hadoop.conf.Configuration
26-
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
26+
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
2727
import org.mockito.{ArgumentMatchers => mc}
2828
import org.mockito.Mockito.{mock, never, verify, when}
2929
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3030

3131
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
3232
import org.apache.spark.LocalSparkContext.withSpark
33+
import org.apache.spark.deploy.SparkHadoopUtil
3334
import org.apache.spark.internal.config._
3435
import org.apache.spark.io.CompressionCodec
3536
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
@@ -39,6 +40,7 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo
3940
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
4041
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
4142
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
43+
import org.apache.spark.util.Clock
4244
import org.apache.spark.util.Utils.tryWithResource
4345

4446
class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
@@ -334,7 +336,44 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
334336
}
335337
}
336338
}
339+
340+
Seq(0, 1, 3, 6).foreach { replicationSeconds =>
341+
test(s"Consider replication delay - ${replicationSeconds}s") {
342+
val replicationMs = replicationSeconds * 1000;
343+
val delay = 5 // max allowed replication (in seconds)
344+
val wait = 2 // time between open file attempts (in seconds)
345+
val conf = getSparkConf()
346+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key, s"${delay}s")
347+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT.key, s"${wait}s")
348+
349+
val filesystem = FileSystem.get(SparkHadoopUtil.get.newConfiguration(conf))
350+
val path = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, "file")
351+
val startMs = 123000000L * 1000L // arbitrary system time
352+
val clock = new DelayedActionClock(replicationMs, startMs)(filesystem.create(path).close())
353+
354+
if (replicationSeconds <= delay) {
355+
// expect open to succeed
356+
val in = FallbackStorage.open(conf, filesystem, path, clock)
357+
assert(in != null)
358+
359+
// how many waits are expected to observe replication
360+
val expectedWaits = Math.ceil(replicationSeconds.toFloat / wait).toInt
361+
assert(clock.timeMs == startMs + expectedWaits * wait * 1000)
362+
assert(clock.waited == expectedWaits)
363+
in.close()
364+
} else {
365+
// expect open to fail
366+
assertThrows[FileNotFoundException](FallbackStorage.open(conf, filesystem, path, clock))
367+
368+
// how many waits are expected to observe delay
369+
val expectedWaits = delay / wait
370+
assert(clock.timeMs == startMs + expectedWaits * wait * 1000)
371+
assert(clock.waited == expectedWaits)
372+
}
373+
}
374+
}
337375
}
376+
338377
class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
339378
with Seekable with PositionedReadable {
340379
override def read: Int = in.read
@@ -378,3 +417,30 @@ class ReadPartialFileSystem extends LocalFileSystem {
378417
new FSDataInputStream(new ReadPartialInputStream(stream))
379418
}
380419
}
420+
421+
class DelayedActionClock(delayMs: Long, startTimeMs: Long)(action: => Unit) extends Clock {
422+
var timeMs: Long = startTimeMs
423+
var waited: Int = 0
424+
var triggered: Boolean = false
425+
426+
if (delayMs == 0) trigger()
427+
428+
private def trigger(): Unit = {
429+
if (!triggered) {
430+
triggered = true
431+
action
432+
}
433+
}
434+
435+
override def getTimeMillis(): Long = timeMs
436+
override def nanoTime(): Long = timeMs * 1000000
437+
override def waitTillTime(targetTime: Long): Long = {
438+
waited += 1
439+
if (targetTime >= startTimeMs + delayMs) {
440+
timeMs = startTimeMs + delayMs
441+
trigger()
442+
}
443+
timeMs = targetTime
444+
targetTime
445+
}
446+
}

0 commit comments

Comments
 (0)