Skip to content

Commit 5468585

Browse files
authored
Spark 3.4: Enhance FunctionRegistry to support more hash functions (#268)
Lead-authored-by: Xinyuan Yang [email protected] Co-authored-by: Cheng Pan [email protected]
1 parent 821adcc commit 5468585

File tree

9 files changed

+362
-58
lines changed

9 files changed

+362
-58
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.spark.sql.clickhouse.cluster
16+
17+
import org.apache.spark.sql.clickhouse.TestUtils.om
18+
import xenon.clickhouse.func._
19+
import java.lang.{Long => JLong}
20+
21+
class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest {
22+
// only for query function names
23+
val dummyRegistry: CompositeFunctionRegistry = {
24+
val dynamicFunctionRegistry = new DynamicFunctionRegistry
25+
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(Seq.empty)
26+
dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible
27+
dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc)
28+
new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
29+
}
30+
31+
def runTest(sparkFuncName: String, ckFuncName: String, stringVal: String): Unit = {
32+
val sparkResult = spark.sql(
33+
s"SELECT $sparkFuncName($stringVal) AS hash_value"
34+
).collect
35+
assert(sparkResult.length == 1)
36+
val sparkHashVal = sparkResult.head.getAs[Long]("hash_value")
37+
38+
val clickhouseResultJsonStr = runClickHouseSQL(
39+
s"SELECT $ckFuncName($stringVal) AS hash_value "
40+
).head.getString(0)
41+
val clickhouseResultJson = om.readTree(clickhouseResultJsonStr)
42+
val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText)
43+
assert(
44+
sparkHashVal == clickhouseHashVal,
45+
s"ck_function: $ckFuncName, spark_function: $sparkFuncName, args: ($stringVal)"
46+
)
47+
}
48+
49+
Seq(
50+
"clickhouse_xxHash64",
51+
"clickhouse_murmurHash3_64",
52+
"clickhouse_murmurHash3_32",
53+
"clickhouse_murmurHash2_64",
54+
"clickhouse_murmurHash2_32",
55+
"clickhouse_cityHash64"
56+
).foreach { sparkFuncName =>
57+
val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName)
58+
test(s"UDF $sparkFuncName") {
59+
Seq(
60+
"spark-clickhouse-connector",
61+
"Apache Spark",
62+
"ClickHouse",
63+
"Yandex",
64+
"热爱",
65+
"在传统的行式数据库系统中,数据按如下顺序存储:",
66+
"🇨🇳"
67+
).map("'" + _ + "'").foreach { stringVal =>
68+
runTest(sparkFuncName, ckFuncName, stringVal)
69+
}
70+
}
71+
}
72+
73+
Seq(
74+
"clickhouse_murmurHash3_64",
75+
"clickhouse_murmurHash3_32",
76+
"clickhouse_murmurHash2_64",
77+
"clickhouse_murmurHash2_32",
78+
"clickhouse_cityHash64"
79+
).foreach { sparkFuncName =>
80+
val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName)
81+
test(s"UDF $sparkFuncName multiple args") {
82+
Seq(
83+
"spark-clickhouse-connector",
84+
"Apache Spark",
85+
"ClickHouse",
86+
"Yandex",
87+
"热爱",
88+
"在传统的行式数据库系统中,数据按如下顺序存储:",
89+
"🇨🇳"
90+
).map("'" + _ + "'").combinations(5).foreach { seq =>
91+
val stringVal = seq.mkString(", ")
92+
runTest(sparkFuncName, ckFuncName, stringVal)
93+
}
94+
}
95+
}
96+
}

spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package xenon.clickhouse.func
16+
17+
import xenon.clickhouse.hash
18+
19+
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694
20+
object CityHash64 extends MultiStringArgsHash {
21+
22+
override protected def funcName: String = "clickhouse_cityHash64"
23+
24+
override val ckFuncNames: Array[String] = Array("cityHash64")
25+
26+
override def applyHash(input: Array[Any]): Long = hash.CityHash64(input)
27+
}

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,57 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
1818

1919
import scala.collection.mutable
2020

21-
trait FunctionRegistry {
21+
trait FunctionRegistry extends Serializable {
2222

2323
def list: Array[String]
2424

2525
def load(name: String): Option[UnboundFunction]
26+
27+
def sparkToClickHouseFunc: Map[String, String]
28+
29+
def clickHouseToSparkFunc: Map[String, String]
30+
}
31+
32+
trait ClickhouseEquivFunction {
33+
val ckFuncNames: Array[String]
2634
}
2735

2836
class CompositeFunctionRegistry(registries: Array[FunctionRegistry]) extends FunctionRegistry {
2937

3038
override def list: Array[String] = registries.flatMap(_.list)
3139

3240
override def load(name: String): Option[UnboundFunction] = registries.flatMap(_.load(name)).headOption
41+
42+
override def sparkToClickHouseFunc: Map[String, String] = registries.flatMap(_.sparkToClickHouseFunc).toMap
43+
44+
override def clickHouseToSparkFunc: Map[String, String] = registries.flatMap(_.clickHouseToSparkFunc).toMap
3345
}
3446

3547
object StaticFunctionRegistry extends FunctionRegistry {
3648

3749
private val functions = Map[String, UnboundFunction](
3850
"ck_xx_hash64" -> ClickHouseXxHash64, // for compatible
39-
"clickhouse_xxHash64" -> ClickHouseXxHash64
51+
"clickhouse_xxHash64" -> ClickHouseXxHash64,
52+
"clickhouse_murmurHash2_32" -> MurmurHash2_32,
53+
"clickhouse_murmurHash2_64" -> MurmurHash2_64,
54+
"clickhouse_murmurHash3_32" -> MurmurHash3_32,
55+
"clickhouse_murmurHash3_64" -> MurmurHash3_64,
56+
"clickhouse_cityHash64" -> CityHash64
4057
)
4158

4259
override def list: Array[String] = functions.keys.toArray
4360

4461
override def load(name: String): Option[UnboundFunction] = functions.get(name)
62+
63+
override val sparkToClickHouseFunc: Map[String, String] =
64+
functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) =>
65+
v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _))
66+
}
67+
68+
override val clickHouseToSparkFunc: Map[String, String] =
69+
functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) =>
70+
v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k))
71+
}
4572
}
4673

4774
class DynamicFunctionRegistry extends FunctionRegistry {
@@ -56,4 +83,14 @@ class DynamicFunctionRegistry extends FunctionRegistry {
5683
override def list: Array[String] = functions.keys.toArray
5784

5885
override def load(name: String): Option[UnboundFunction] = functions.get(name)
86+
87+
override def sparkToClickHouseFunc: Map[String, String] =
88+
functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) =>
89+
v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _))
90+
}
91+
92+
override def clickHouseToSparkFunc: Map[String, String] =
93+
functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) =>
94+
v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k))
95+
}
5996
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package xenon.clickhouse.func
16+
17+
import org.apache.spark.sql.catalyst.InternalRow
18+
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
19+
import org.apache.spark.sql.types._
20+
import org.apache.spark.unsafe.types.UTF8String
21+
22+
abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction {
23+
24+
def applyHash(input: Array[Any]): Long
25+
26+
protected def funcName: String
27+
28+
override val ckFuncNames: Array[String]
29+
30+
override def description: String = s"$name: (value: string, ...) => hash_value: long"
31+
32+
private def isExceptedType(dt: DataType): Boolean =
33+
dt.isInstanceOf[StringType]
34+
35+
final override def name: String = funcName
36+
37+
final override def bind(inputType: StructType): BoundFunction = {
38+
val inputDataTypes = inputType.fields.map(_.dataType)
39+
if (inputDataTypes.forall(isExceptedType)) {
40+
// need to new a ScalarFunction instance for each bind,
41+
// because we do not know the number of arguments in advance
42+
new ScalarFunction[Long] {
43+
override def inputTypes(): Array[DataType] = inputDataTypes
44+
override def name: String = funcName
45+
override def canonicalName: String = s"clickhouse.$name"
46+
override def resultType: DataType = LongType
47+
override def toString: String = name
48+
override def produceResult(input: InternalRow): Long = {
49+
val inputStrings = new Array[Any](input.numFields)
50+
var i = 0
51+
do {
52+
inputStrings(i) = input.getUTF8String(i).getBytes
53+
i += 1
54+
} while (i < input.numFields)
55+
applyHash(inputStrings)
56+
}
57+
}
58+
} else {
59+
throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description")
60+
}
61+
}
62+
63+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package xenon.clickhouse.func
16+
17+
import xenon.clickhouse.hash
18+
import xenon.clickhouse.hash.HashUtils
19+
20+
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460
21+
object MurmurHash2_64 extends MultiStringArgsHash {
22+
23+
override protected def funcName: String = "clickhouse_murmurHash2_64"
24+
25+
override val ckFuncNames: Array[String] = Array("murmurHash2_64")
26+
27+
override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input)
28+
}
29+
30+
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519
31+
object MurmurHash2_32 extends MultiStringArgsHash {
32+
33+
override protected def funcName: String = "clickhouse_murmurHash2_32"
34+
35+
override val ckFuncNames: Array[String] = Array("murmurHash2_32")
36+
37+
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input))
38+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package xenon.clickhouse.func
16+
17+
import xenon.clickhouse.hash
18+
import xenon.clickhouse.hash.HashUtils
19+
20+
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543
21+
object MurmurHash3_64 extends MultiStringArgsHash {
22+
23+
override protected def funcName: String = "clickhouse_murmurHash3_64"
24+
25+
override val ckFuncNames: Array[String] = Array("murmurHash3_64")
26+
27+
override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input)
28+
}
29+
30+
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519
31+
object MurmurHash3_32 extends MultiStringArgsHash {
32+
33+
override protected def funcName: String = "clickhouse_murmurHash3_32"
34+
35+
override val ckFuncNames: Array[String] = Array("murmurHash3_32")
36+
37+
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input))
38+
}

0 commit comments

Comments
 (0)