Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.apache.spark.sql.sedona_sql.UDT
import org.apache.sedona.common.raster.serde.Serde
import org.apache.spark.sql.types.{BinaryType, DataType, UserDefinedType}
import org.geotools.coverage.grid.GridCoverage2D
import org.json4s.JsonDSL._
import org.json4s.JsonAST.JValue

class RasterUDT extends UserDefinedType[GridCoverage2D] {
override def sqlType: DataType = BinaryType
Expand All @@ -41,6 +43,13 @@ class RasterUDT extends UserDefinedType[GridCoverage2D] {

override def userClass: Class[GridCoverage2D] = classOf[GridCoverage2D]

override private[sql] def jsonValue: JValue = {
super.jsonValue mapField {
case ("class", _) => "class" -> this.getClass.getName.stripSuffix("$")
case other: Any => other
}
}

override def equals(other: Any): Boolean = other match {
case _: UserDefinedType[_] => other.isInstanceOf[RasterUDT]
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
package org.apache.sedona.sql

import org.apache.spark.sql.sedona_sql.UDT.RasterUDT
import org.scalatest.funspec.AnyFunSpec
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.junit.rules.TemporaryFolder
import org.scalatest.BeforeAndAfter

class RasterUDTSuite extends TestBaseScala with BeforeAndAfter {

var tempFolder: TemporaryFolder = new TemporaryFolder

class RasterUDTSuite extends AnyFunSpec {
describe("RasterUDT Test") {
it("Case object and new instance should be equals") {
assert(RasterUDT == RasterUDT)
Expand All @@ -35,5 +40,54 @@ class RasterUDTSuite extends AnyFunSpec {
val udt = new RasterUDT
assert(udt.hashCode() == RasterUDT.hashCode())
}

it("Should be able to render and parse JSON schema with RasterUDT") {
// This reproduces the Delta write bug (#2608):
// Delta and Parquet serialize the schema to JSON, then deserialize it.
// Without a jsonValue override, the class name includes a '$' suffix
// from the Scala case object, causing ClassNotFoundException on read.
val rasterDf = sparkSession.sql("SELECT RS_MakeEmptyRaster(1, 10, 10, 0, 0, 1) as raster")
assert(
DataType
.fromJson(rasterDf.schema.json)
.asInstanceOf[StructType]
.equals(rasterDf.schema))
}

it("Should write and read raster DataFrame in Parquet format") {
// Parquet also serializes schema as JSON, triggering the same bug as Delta.
tempFolder.create()
val rasterDf = sparkSession.sql("SELECT RS_MakeEmptyRaster(1, 10, 10, 0, 0, 1) as raster")

rasterDf.write.parquet(tempFolder.getRoot.getPath + "/raster_parquet")

val readDf =
sparkSession.read.parquet(tempFolder.getRoot.getPath + "/raster_parquet")
assert(readDf.schema.fields(0).dataType.isInstanceOf[RasterUDT])
assert(readDf.count() == 1)
}

it("RS_Union_Aggr output should write and read Parquet successfully") {
// Users reported (#2608) that RS_Union_Aggr output can be written to Delta/Parquet
// while RS_MakeEmptyRaster output cannot. RS_Union_Aggr uses ExpressionEncoder
// which resolves the UDT via UDTRegistration (class name without '$' suffix),
// whereas InferredExpression-based functions use the case object singleton
// whose getClass.getName includes '$'.
tempFolder.create()
val rasterDf = sparkSession.sql("""SELECT RS_Union_Aggr(raster) as raster FROM (
| SELECT RS_MakeEmptyRaster(1, 10, 10, 0, 0, 1) as raster
|)""".stripMargin)

rasterDf.write.parquet(tempFolder.getRoot.getPath + "/union_aggr_parquet")

val readDf =
sparkSession.read.parquet(tempFolder.getRoot.getPath + "/union_aggr_parquet")
assert(readDf.schema.fields(0).dataType.isInstanceOf[RasterUDT])
assert(readDf.count() == 1)
}
}

after {
tempFolder.delete()
}
}
Loading