|
19 | 19 | * |
20 | 20 | * 1. https://github.com/apache/spark/blob/2f8776ccad532fbed17381ff97d302007918b8d8/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala |
21 | 21 | */ |
22 | | -package org.apache.spark.rdd |
| 22 | +package geotrellis.spark.join |
23 | 23 |
|
| 24 | +import org.apache.spark._ |
| 25 | +import org.apache.spark.rdd.RDD |
| 26 | +import org.log4s.getLogger |
24 | 27 |
|
| 28 | +import java.io.{IOException, ObjectOutputStream} |
25 | 29 | import scala.reflect.ClassTag |
| 30 | +import scala.util.control.NonFatal |
26 | 31 |
|
27 | | -import org.apache.spark._ |
| 32 | +private class CartesianPartition( |
| 33 | + idx: Int, |
| 34 | + @transient private val rdd1: RDD[_], |
| 35 | + @transient private val rdd2: RDD[_], |
| 36 | + s1Index: Int, |
| 37 | + s2Index: Int |
| 38 | + ) extends Partition { |
| 39 | + |
| 40 | + @transient private[this] lazy val logger = getLogger |
| 41 | + |
| 42 | + var s1 = rdd1.partitions(s1Index) |
| 43 | + var s2 = rdd2.partitions(s2Index) |
| 44 | + override val index: Int = idx |
| 45 | + |
| 46 | + private def tryOrIOException[T](block: => T): T = { |
| 47 | + try { |
| 48 | + block |
| 49 | + } catch { |
| 50 | + case e: IOException => |
| 51 | + logger.error(e)("Exception encountered") |
| 52 | + throw e |
| 53 | + case NonFatal(e) => |
| 54 | + logger.error(e)("Exception encountered") |
| 55 | + throw new IOException(e) |
| 56 | + } |
| 57 | + } |
| 58 | + |
| 59 | + @throws(classOf[IOException]) |
| 60 | + private def writeObject(oos: ObjectOutputStream): Unit = tryOrIOException { |
| 61 | + // Update the reference to parent split at the time of task serialization |
| 62 | + s1 = rdd1.partitions(s1Index) |
| 63 | + s2 = rdd2.partitions(s2Index) |
| 64 | + oos.defaultWriteObject() |
| 65 | + } |
| 66 | +} |
28 | 67 |
|
29 | 68 | /** Performs a cartesian join of two RDDs using filter and refine pattern. |
30 | 69 | * |
|
0 commit comments