Skip to content

Commit eec6ff6

Browse files
kbendickrdblue
authored andcommitted
ORC: Fix importing ORC files with float and double columns and test (#3332)
1 parent 4ba2157 commit eec6ff6

File tree

3 files changed

+106
-11
lines changed

3 files changed

+106
-11
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,8 @@ project(":iceberg-spark3-extensions") {
10991099
exclude group: 'org.apache.arrow'
11001100
}
11011101

1102+
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
1103+
testCompile project(path: ':iceberg-orc', configuration: 'testArtifacts')
11021104
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
11031105
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
11041106
testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')

orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.iceberg.hadoop.HadoopInputFile;
4545
import org.apache.iceberg.io.InputFile;
4646
import org.apache.iceberg.mapping.NameMapping;
47-
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4847
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
4948
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
5049
import org.apache.iceberg.types.Conversions;
@@ -196,11 +195,17 @@ private static Optional<ByteBuffer> fromOrcMin(Type type, ColumnStatistics colum
196195
min = Math.toIntExact((long) min);
197196
}
198197
} else if (columnStats instanceof DoubleColumnStatistics) {
199-
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
200-
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
201-
Preconditions.checkNotNull(fieldMetrics,
202-
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
203-
min = fieldMetrics.lowerBound();
198+
if (fieldMetrics != null) {
199+
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
200+
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
201+
min = fieldMetrics.lowerBound();
202+
} else {
203+
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
204+
min = replaceNaN(((DoubleColumnStatistics) columnStats).getMinimum(), Double.NEGATIVE_INFINITY);
205+
if (type.typeId() == Type.TypeID.FLOAT) {
206+
min = ((Double) min).floatValue();
207+
}
208+
}
204209
} else if (columnStats instanceof StringColumnStatistics) {
205210
min = ((StringColumnStatistics) columnStats).getMinimum();
206211
} else if (columnStats instanceof DecimalColumnStatistics) {
@@ -234,11 +239,17 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
234239
max = Math.toIntExact((long) max);
235240
}
236241
} else if (columnStats instanceof DoubleColumnStatistics) {
237-
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
238-
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
239-
Preconditions.checkNotNull(fieldMetrics,
240-
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
241-
max = fieldMetrics.upperBound();
242+
if (fieldMetrics != null) {
243+
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
244+
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
245+
max = fieldMetrics.upperBound();
246+
} else {
247+
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
248+
max = replaceNaN(((DoubleColumnStatistics) columnStats).getMaximum(), Double.POSITIVE_INFINITY);
249+
if (type.typeId() == Type.TypeID.FLOAT) {
250+
max = ((Double) max).floatValue();
251+
}
252+
}
242253
} else if (columnStats instanceof StringColumnStatistics) {
243254
max = ((StringColumnStatistics) columnStats).getMaximum();
244255
} else if (columnStats instanceof DecimalColumnStatistics) {
@@ -262,6 +273,10 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
262273
return Optional.ofNullable(Conversions.toByteBuffer(type, truncateIfNeeded(Bound.UPPER, type, max, metricsMode)));
263274
}
264275

276+
private static Object replaceNaN(double value, double replacement) {
277+
return Double.isNaN(value) ? replacement : value;
278+
}
279+
265280
private static Object truncateIfNeeded(Bound bound, Type type, Object value, MetricsMode metricsMode) {
266281
// Out of the two types which could be truncated, string or binary, ORC only supports string bounds.
267282
// Therefore, truncation will be applied if needed only on string type.

spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,16 @@
3333
import org.apache.avro.io.DatumWriter;
3434
import org.apache.iceberg.AssertHelpers;
3535
import org.apache.iceberg.DataFile;
36+
import org.apache.iceberg.Files;
37+
import org.apache.iceberg.MetricsConfig;
38+
import org.apache.iceberg.data.Record;
39+
import org.apache.iceberg.data.orc.GenericOrcWriter;
40+
import org.apache.iceberg.io.FileAppender;
41+
import org.apache.iceberg.io.OutputFile;
3642
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
43+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3744
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
45+
import org.apache.iceberg.types.Types;
3846
import org.apache.spark.sql.Dataset;
3947
import org.apache.spark.sql.Row;
4048
import org.apache.spark.sql.RowFactory;
@@ -51,6 +59,8 @@
5159
import org.junit.Test;
5260
import org.junit.rules.TemporaryFolder;
5361

62+
import static org.apache.iceberg.types.Types.NestedField.optional;
63+
5464
public class TestAddFilesProcedure extends SparkExtensionsTestBase {
5565

5666
private final String sourceTableName = "source_table";
@@ -507,6 +517,42 @@ public void invalidDataImportPartitioned() {
507517
catalogName, tableName, fileTableDir.getAbsolutePath()));
508518
}
509519

520+
@Test
521+
public void addOrcFileWithDoubleAndFloatColumns() throws Exception {
522+
// Spark Session Catalog cannot load metadata tables
523+
// with "The namespace in session catalog must have exactly one name part"
524+
Assume.assumeFalse(catalogName.equals("spark_catalog"));
525+
526+
// Create an ORC file
527+
File outputFile = temp.newFile("test.orc");
528+
final int numRows = 5;
529+
List<Record> expectedRecords = createOrcFile(outputFile, numRows);
530+
String createIceberg =
531+
"CREATE TABLE %s (x float, y double, z long) USING iceberg";
532+
sql(createIceberg, tableName);
533+
534+
Object result = scalarSql("CALL %s.system.add_files('%s', '`orc`.`%s`')",
535+
catalogName, tableName, outputFile.getPath());
536+
Assert.assertEquals(1L, result);
537+
538+
List<Object[]> expected = expectedRecords.stream()
539+
.map(record -> new Object[]{record.get(0), record.get(1), record.get(2)})
540+
.collect(Collectors.toList());
541+
542+
// x goes 2.00, 1.99, 1.98, ...
543+
assertEquals("Iceberg table contains correct data",
544+
expected,
545+
sql("SELECT * FROM %s ORDER BY x DESC", tableName));
546+
547+
List<Object[]> actualRecordCount = sql("select %s from %s.files",
548+
DataFile.RECORD_COUNT.name(),
549+
tableName);
550+
List<Object[]> expectedRecordCount = Lists.newArrayList();
551+
expectedRecordCount.add(new Object[]{(long) numRows});
552+
assertEquals("Iceberg file metadata should have correct metadata count",
553+
expectedRecordCount, actualRecordCount);
554+
}
555+
510556
private static final StructField[] struct = {
511557
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
512558
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
@@ -597,4 +643,36 @@ private void createPartitionedHiveTable() {
597643
partitionedDF.write().insertInto(sourceTableName);
598644
partitionedDF.write().insertInto(sourceTableName);
599645
}
646+
647+
// Update this to not write a file for import using Iceberg's ID numbers
648+
public List<Record> createOrcFile(File orcFile, int numRows) throws IOException {
649+
// Needs to be deleted but depend on the rule to delete the file for us again at the end.
650+
if (orcFile.exists()) {
651+
orcFile.delete();
652+
}
653+
final org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema(
654+
optional(1, "x", Types.FloatType.get()),
655+
optional(2, "y", Types.DoubleType.get()),
656+
optional(3, "z", Types.LongType.get())
657+
);
658+
659+
List<Record> records = Lists.newArrayListWithExpectedSize(numRows);
660+
for (int i = 0; i < numRows; i += 1) {
661+
Record record = org.apache.iceberg.data.GenericRecord.create(icebergSchema);
662+
record.setField("x", ((float) (100 - i)) / 100F + 1.0F); // 2.0f, 1.99f, 1.98f, ...
663+
record.setField("y", ((double) i) / 100.0D + 2.0D); // 2.0d, 2.01d, 2.02d, ...
664+
record.setField("z", 5_000_000_000L + i);
665+
records.add(record);
666+
}
667+
668+
OutputFile outFile = Files.localOutput(orcFile);
669+
try (FileAppender<Record> appender = org.apache.iceberg.orc.ORC.write(outFile)
670+
.schema(icebergSchema)
671+
.metricsConfig(MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "none")))
672+
.createWriterFunc(GenericOrcWriter::buildWriter)
673+
.build()) {
674+
appender.addAll(records);
675+
}
676+
return records;
677+
}
600678
}

0 commit comments

Comments
 (0)