Skip to content

Commit 4375159

Browse files
committed
Merge branch 'master' into predictor-support-for-writing-gtiff
2 parents 7997582 + 6165638 commit 4375159

File tree

7 files changed

+64
-12
lines changed

7 files changed

+64
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Added
10-
- Add ZStd compression support for GTiff
10+
- Add ZStd compression support for GTiff [#3580](https://github.com/locationtech/geotrellis/pull/3580)
11+
- Do not depend on private Spark API, avoids sealing violation [#3586](https://github.com/locationtech/geotrellis/pull/3586)
1112
- Add predictor 2 (integer) and predictor 3 (float) support for writing compressed GTiff files [#3588](https://github.com/locationtech/geotrellis/pull/3588)
1213

1314
## [3.8.0] - 2025-04-23

raster/src/main/scala/geotrellis/raster/io/geotiff/compression/Compressor.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@ trait Compressor extends Serializable {
2828
new Compressor {
2929
def wrapped: Compressor = Compressor.this
3030

31-
override def compress(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = {
31+
def compress(bytes: Array[Byte], segmentIndex: Int): Array[Byte] =
3232
wrapped.compress(predictor.encode(bytes, segmentIndex), segmentIndex = segmentIndex)
33-
}
3433

3534
/** Returns the decompressor that can decompress the segments compressed by this compressor */
36-
override def createDecompressor(): Decompressor = wrapped.createDecompressor().withPredictorDecoding(predictor)
35+
def createDecompressor(): Decompressor = wrapped.createDecompressor().withPredictorDecoding(predictor)
3736
}
3837

3938
}

raster/src/main/scala/geotrellis/raster/io/geotiff/compression/FloatingPointPredictor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object FloatingPointPredictor {
6767
bytes
6868
}
6969

70-
override def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = {
70+
def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = {
7171
val rows = rowsInSegment(segmentIndex)
7272
val bytesPerSample = bandType.bytesPerSample
7373
val bytesPerRow = colsPerRow * bandCount * bytesPerSample

raster/src/main/scala/geotrellis/raster/io/geotiff/compression/HorizontalPredictor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ object HorizontalPredictor {
8282
val code: Int = Predictor.PREDICTOR_HORIZONTAL
8383
val checkEndian = true
8484

85-
override def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] =
85+
def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] =
8686
encodeFunc(bytes, segmentIndex)
8787

88-
override def decode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] =
88+
def decode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] =
8989
decodeFunc(bytes, segmentIndex)
9090
}
9191
}

raster/src/main/scala/geotrellis/raster/io/geotiff/compression/Predictor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object Predictor {
4545
val code: Int = PREDICTOR_NONE
4646
val checkEndian = true
4747

48-
override def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = bytes
49-
override def decode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = bytes
48+
def encode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = bytes
49+
def decode(bytes: Array[Byte], segmentIndex: Int): Array[Byte] = bytes
5050
}
5151
case Some(PREDICTOR_HORIZONTAL) =>
5252
HorizontalPredictor(tiffTags)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package geotrellis.spark.join
2+
3+
import org.apache.spark.Partition
4+
import org.apache.spark.internal.Logging
5+
import org.apache.spark.rdd.RDD
6+
7+
import java.io.{IOException, ObjectOutputStream}
8+
import scala.util.control.NonFatal
9+
10+
// https://github.com/apache/spark/blob/686d84453610e463df7df95395ce6ed36a6efacd/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala#L29
11+
private[join] class CartesianPartition(
12+
idx: Int,
13+
@transient private val rdd1: RDD[_],
14+
@transient private val rdd2: RDD[_],
15+
s1Index: Int,
16+
s2Index: Int
17+
) extends Partition {
18+
19+
var s1 = rdd1.partitions(s1Index)
20+
var s2 = rdd2.partitions(s2Index)
21+
override val index: Int = idx
22+
23+
@throws(classOf[IOException])
24+
private def writeObject(oos: ObjectOutputStream): Unit = CartesianPartition.tryOrIOException {
25+
// Update the reference to parent split at the time of task serialization
26+
s1 = rdd1.partitions(s1Index)
27+
s2 = rdd2.partitions(s2Index)
28+
oos.defaultWriteObject()
29+
}
30+
}
31+
32+
object CartesianPartition extends Logging {
33+
/**
34+
* Execute a block of code that returns a value, re-throwing any non-fatal uncaught
35+
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
36+
* read and write methods, since Java's serializer will not report non-IOExceptions properly;
37+
* see SPARK-4080 for more context.
38+
*/
39+
// https://github.com/apache/spark/blob/686d84453610e463df7df95395ce6ed36a6efacd/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala#L35
40+
private def tryOrIOException[T](block: => T): T = {
41+
try {
42+
block
43+
} catch {
44+
case e: IOException =>
45+
logError("Exception encountered", e)
46+
throw e
47+
case NonFatal(e) =>
48+
logError("Exception encountered", e)
49+
throw new IOException(e)
50+
}
51+
}
52+
}

spark/src/main/scala/org/apache/spark/rdd/FilteredCartesianRDD.scala renamed to spark/src/main/scala/geotrellis/spark/join/FilteredCartesianRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
*
2020
* 1. https://github.com/apache/spark/blob/2f8776ccad532fbed17381ff97d302007918b8d8/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
2121
*/
22-
package org.apache.spark.rdd
22+
package geotrellis.spark.join
2323

24+
import org.apache.spark._
25+
import org.apache.spark.rdd.RDD
2426

2527
import scala.reflect.ClassTag
2628

27-
import org.apache.spark._
28-
2929
/** Performs a cartesian join of two RDDs using filter and refine pattern.
3030
*
3131
* During RDD declaration n*m partitions will be generated, one for each possible cartesian mapping.

0 commit comments

Comments
 (0)