Skip to content

Commit 63a3511

Browse files
authored
Merge branch 'main' into fix/3692
2 parents bb67c02 + 5695cbb commit 63a3511

File tree

3 files changed

+103
-13
lines changed

3 files changed

+103
-13
lines changed

common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,17 @@ class StableMergeSortOpExec(descString: String) extends OperatorExecutor {
225225
/**
226226
* Compare two non-null values using their attribute type.
227227
*
228-
* For DOUBLE:
229-
* - Uses java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
230-
* - Callers if desired should define how NaN interacts with ASC/DESC and null policy.
228+
* Type semantics:
229+
* - INTEGER, LONG: numeric ascending via Java primitive compares.
230+
* - DOUBLE: java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
231+
* - BOOLEAN: false < true.
232+
* - TIMESTAMP: java.sql.Timestamp#compareTo.
233+
* - STRING: String#compareTo (UTF-16, lexicographic).
234+
* - BINARY: unsigned lexicographic order over byte arrays:
235+
* - Compare byte-by-byte treating each as 0..255 (mask 0xff).
236+
* - The first differing byte decides the order.
237+
* - If all compared bytes are equal, the shorter array sorts first.
238+
* - Example: [] < [0x00] < [0x00,0x00] < [0x00,0x01] < [0x7F] < [0x80] < [0xFF].
231239
*/
232240
private def compareTypedNonNullValues(
233241
leftValue: Any,
@@ -258,6 +266,11 @@ class StableMergeSortOpExec(descString: String) extends OperatorExecutor {
258266
.compareTo(rightValue.asInstanceOf[java.sql.Timestamp])
259267
case AttributeType.STRING =>
260268
leftValue.asInstanceOf[String].compareTo(rightValue.asInstanceOf[String])
269+
case AttributeType.BINARY =>
270+
java.util.Arrays.compareUnsigned(
271+
leftValue.asInstanceOf[Array[Byte]],
272+
rightValue.asInstanceOf[Array[Byte]]
273+
)
261274
case other =>
262275
throw new IllegalStateException(s"Unsupported attribute type $other in StableMergeSort")
263276
}

common/workflow-operator/src/main/scala/org/apache/amber/operator/visualization/scatterplot/ScatterplotOpDesc.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,20 @@ class ScatterplotOpDesc extends PythonOperatorDescriptor {
123123

124124
def createPlotlyFigure(): String = {
125125
assert(xColumn.nonEmpty && yColumn.nonEmpty)
126-
val colorColExpr = if (colorColumn.nonEmpty) {
127-
s"color='$colorColumn'"
128-
} else {
129-
""
130-
}
131-
var argDetails = ""
132-
if (xLogScale) argDetails = argDetails + ", log_x=True"
133-
if (yLogScale) argDetails = argDetails + ", log_y=True"
134-
if (hoverName.nonEmpty) argDetails = argDetails + s""", hover_name='$hoverName'"""
126+
127+
val args = scala.collection.mutable.ArrayBuffer[String](
128+
s"x='$xColumn'",
129+
s"y='$yColumn'",
130+
s"opacity=$alpha"
131+
)
132+
if (colorColumn.nonEmpty) args += s"color='$colorColumn'"
133+
if (xLogScale) args += "log_x=True"
134+
if (yLogScale) args += "log_y=True"
135+
if (hoverName.nonEmpty) args += s"hover_name='$hoverName'"
136+
137+
val joined = args.mkString(", ")
135138
s"""
136-
| fig = go.Figure(px.scatter(table, x='$xColumn', y='$yColumn', opacity=$alpha, $colorColExpr $argDetails))
139+
| fig = go.Figure(px.scatter(table, $joined))
137140
| fig.update_layout(margin=dict(l=0, r=0, t=0, b=0))
138141
|""".stripMargin
139142
}

common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
247247
assert(desc.map(_.getField[Boolean]("bool")) == List(true, true, false, false))
248248
}
249249

250+
it should "sort BINARY ascending (unsigned lexicographic) incl. empty and high-bit bytes" in {
251+
val schema = schemaOf("bin" -> AttributeType.BINARY)
252+
253+
val bytesEmpty = Array[Byte]() // []
254+
val bytes00 = Array(0x00.toByte) // [00]
255+
val bytes0000 = Array(0x00.toByte, 0x00.toByte) // [00,00]
256+
val bytes0001 = Array(0x00.toByte, 0x01.toByte) // [00,01]
257+
val bytes7F = Array(0x7f.toByte) // [7F]
258+
val bytes80 = Array(0x80.toByte) // [80] (-128)
259+
val bytesFF = Array(0xff.toByte) // [FF] (-1)
260+
261+
val inputTuples = List(bytes80, bytes0000, bytesEmpty, bytesFF, bytes0001, bytes00, bytes7F)
262+
.map(arr => tupleOf(schema, "bin" -> arr))
263+
264+
val sorted = runStableMergeSort(schema, inputTuples) { _.keys = sortKeysBuffer(sortKey("bin")) }
265+
266+
val actualUnsigned = sorted.map(_.getField[Array[Byte]]("bin").toSeq.map(b => b & 0xff))
267+
val expectedUnsigned =
268+
List(bytesEmpty, bytes00, bytes0000, bytes0001, bytes7F, bytes80, bytesFF)
269+
.map(_.toSeq.map(b => b & 0xff))
270+
271+
assert(actualUnsigned == expectedUnsigned)
272+
}
273+
250274
// ===========================================================================
251275
// B. Floating-point & Null/NaN policy
252276
// ===========================================================================
@@ -334,6 +358,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
334358
)
335359
}
336360

361+
it should "sort BINARY descending with nulls last and preserve stability for equal byte arrays" in {
362+
val schema = schemaOf("bin" -> AttributeType.BINARY, "id" -> AttributeType.STRING)
363+
364+
val key00 = Array(0x00.toByte)
365+
val keyFF = Array(0xff.toByte)
366+
367+
val inputTuples = List(
368+
tupleOf(schema, "bin" -> keyFF, "id" -> "ff-1"),
369+
tupleOf(schema, "bin" -> key00, "id" -> "00-1"),
370+
tupleOf(
371+
schema,
372+
"bin" -> key00,
373+
"id" -> "00-2"
374+
), // equal to previous; stability should keep order
375+
tupleOf(schema, "bin" -> null, "id" -> "null-1")
376+
)
377+
378+
val sorted = runStableMergeSort(schema, inputTuples) {
379+
_.keys = sortKeysBuffer(sortKey("bin", SortPreference.DESC))
380+
}
381+
382+
val idsInOrder = sorted.map(_.getField[String]("id"))
383+
assert(idsInOrder == List("ff-1", "00-1", "00-2", "null-1"))
384+
}
337385
// ===========================================================================
338386
// C. Multi-key semantics (lexicographic)
339387
// ===========================================================================
@@ -475,6 +523,32 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
475523
)
476524
}
477525

526+
it should "use INTEGER secondary key to break ties when primary BINARY keys are equal" in {
527+
val schema = schemaOf(
528+
"bin" -> AttributeType.BINARY,
529+
"score" -> AttributeType.INTEGER,
530+
"label" -> AttributeType.STRING
531+
)
532+
533+
val key00 = Array(0x00.toByte)
534+
val key01 = Array(0x01.toByte)
535+
536+
val inputTuples = List(
537+
tupleOf(schema, "bin" -> key01, "score" -> 1, "label" -> "01-score1"),
538+
tupleOf(schema, "bin" -> key00, "score" -> 9, "label" -> "00-score9"),
539+
tupleOf(schema, "bin" -> key01, "score" -> 2, "label" -> "01-score2")
540+
)
541+
542+
val sorted = runStableMergeSort(schema, inputTuples) { desc =>
543+
desc.keys = sortKeysBuffer(
544+
sortKey("bin", SortPreference.ASC), // primary: binary ascending
545+
sortKey("score", SortPreference.DESC) // secondary: integer descending
546+
)
547+
}
548+
549+
val labelsInOrder = sorted.map(_.getField[String]("label"))
550+
assert(labelsInOrder == List("00-score9", "01-score2", "01-score1"))
551+
}
478552
// ===========================================================================
479553
// D. Stability & operational behaviors
480554
// ===========================================================================

0 commit comments

Comments
 (0)