Skip to content

Commit 1895711

Browse files
PingLiuPingmeta-codesync[bot]
authored andcommitted
feat: Add support for evaluating Iceberg partition transforms (facebookincubator#15477)
Summary: Introduce infrastructure for evaluating Iceberg partition transforms. - TransformExprBuilder converts Iceberg partition specifications into Velox expressions - TransformEvaluator evaluates multiple transform expressions in a single pass using compiled ExprSet. Pull Request resolved: facebookincubator#15477 Reviewed By: mbasmanova Differential Revision: D86893852 Pulled By: Yuhta fbshipit-source-id: 3f839a3a8ea6eeb7e152fbd960e3023d208e122d
1 parent 3cf456f commit 1895711

File tree

12 files changed

+662
-10
lines changed

12 files changed

+662
-10
lines changed

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ velox_add_library(
2020
IcebergSplitReader.cpp
2121
PartitionSpec.cpp
2222
PositionalDeleteFileReader.cpp
23+
TransformEvaluator.cpp
24+
TransformExprBuilder.cpp
25+
)
26+
27+
velox_link_libraries(
28+
velox_hive_iceberg_splitreader
29+
velox_connector
30+
velox_functions_iceberg
31+
Folly::folly
2332
)
2433

2534
velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly)

velox/connectors/hive/iceberg/IcebergDataSink.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919

2020
namespace facebook::velox::connector::hive::iceberg {
2121

22+
void registerIcebergInternalFunctions(const std::string_view& prefix) {
23+
static std::once_flag registerFlag;
24+
25+
std::call_once(registerFlag, [prefix]() {
26+
functions::iceberg::registerFunctions(std::string(prefix));
27+
});
28+
}
29+
2230
IcebergInsertTableHandle::IcebergInsertTableHandle(
2331
std::vector<HiveColumnHandlePtr> inputColumns,
2432
LocationHandlePtr locationHandle,
@@ -57,7 +65,11 @@ IcebergDataSink::IcebergDataSink(
5765
commitStrategy,
5866
hiveConfig,
5967
0,
60-
nullptr) {}
68+
nullptr) {
69+
static constexpr std::string_view kDefaultIcebergFunctionPrefix{
70+
"$internal$.iceberg."};
71+
registerIcebergInternalFunctions(kDefaultIcebergFunctionPrefix.data());
72+
}
6173

6274
std::vector<std::string> IcebergDataSink::commitMessage() const {
6375
std::vector<std::string> commitTasks;

velox/connectors/hive/iceberg/IcebergDataSink.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818

1919
#include "velox/connectors/hive/HiveDataSink.h"
2020
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
21+
#include "velox/functions/iceberg/Register.h"
2122

2223
namespace facebook::velox::connector::hive::iceberg {
2324

25+
/// Registers Iceberg partition transform functions with prefix.
26+
/// NOTE: These functions are registered for internal transform usage only.
27+
/// Upstream engines such as Prestissimo and Gluten should register the same
28+
/// functions with different prefixes to avoid conflicts.
29+
void registerIcebergInternalFunctions(const std::string_view& prefix);
30+
2431
/// Represents a request for Iceberg write.
2532
class IcebergInsertTableHandle final : public HiveInsertTableHandle {
2633
public:

velox/connectors/hive/iceberg/PartitionSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct IcebergPartitionSpec {
113113
case TransformType::kTruncate:
114114
return type;
115115
}
116+
VELOX_UNREACHABLE("Unknown transform type");
116117
}
117118
};
118119

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/TransformEvaluator.h"
18+
19+
#include "velox/expression/Expr.h"
20+
21+
namespace facebook::velox::connector::hive::iceberg {
22+
23+
TransformEvaluator::TransformEvaluator(
24+
const std::vector<core::TypedExprPtr>& expressions,
25+
const ConnectorQueryCtx* connectorQueryCtx)
26+
: connectorQueryCtx_(connectorQueryCtx) {
27+
VELOX_CHECK_NOT_NULL(connectorQueryCtx_);
28+
exprSet_ = connectorQueryCtx_->expressionEvaluator()->compile(expressions);
29+
VELOX_CHECK_NOT_NULL(exprSet_);
30+
}
31+
32+
std::vector<VectorPtr> TransformEvaluator::evaluate(
33+
const RowVectorPtr& input) const {
34+
const auto numRows = input->size();
35+
const auto numExpressions = exprSet_->exprs().size();
36+
37+
std::vector<VectorPtr> results(numExpressions);
38+
SelectivityVector rows(numRows);
39+
40+
// Evaluate all expressions in one pass.
41+
connectorQueryCtx_->expressionEvaluator()->evaluate(
42+
exprSet_.get(), rows, *input, results);
43+
44+
return results;
45+
}
46+
47+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/Connector.h"
19+
#include "velox/core/QueryCtx.h"
20+
#include "velox/expression/Expr.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Evaluates multiple expressions efficiently using batch evaluation.
25+
/// Expressions are compiled once in the constructor and reused across multiple
26+
/// input batches.
27+
class TransformEvaluator {
28+
public:
29+
/// Creates an evaluator with the given expressions and connector query
30+
/// context. Compiles the expressions once for reuse across multiple
31+
/// evaluations.
32+
///
33+
/// @param expressions Vector of typed expressions to evaluate. These are
34+
/// typically built using TransformExprBuilder::toExpressions() for Iceberg
35+
/// partition transforms, but can be any valid Velox expressions. The
36+
/// expressions are compiled once during construction.
37+
/// @param connectorQueryCtx Connector query context providing access to the
38+
/// expression evaluator (for compilation and evaluation) and memory pool.
39+
/// Must remain valid for the lifetime of this TransformEvaluator.
40+
TransformEvaluator(
41+
const std::vector<core::TypedExprPtr>& expressions,
42+
const ConnectorQueryCtx* connectorQueryCtx);
43+
44+
/// Evaluates all expressions on the input data in a single pass.
45+
/// Uses the pre-compiled ExprSet from the constructor for efficiency.
46+
///
47+
/// The input RowType must match the RowType used when building the
48+
/// expressions (passed to TransformExprBuilder::toExpressions). The column
49+
/// positions, names and types must align. Create new TransformEvaluator for
50+
/// input that has different RowType with the one when building the
51+
/// expressions.
52+
///
53+
/// @param input Input row vector containing the source data. Must have the
54+
/// same RowType (column positions, names and types) as used when building the
55+
/// expressions in the constructor.
56+
/// @return Vector of result columns, one for each expression, in the same
57+
/// order as the expressions provided to the constructor.
58+
std::vector<VectorPtr> evaluate(const RowVectorPtr& input) const;
59+
60+
private:
61+
const ConnectorQueryCtx* connectorQueryCtx_;
62+
std::unique_ptr<exec::ExprSet> exprSet_;
63+
};
64+
65+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/connectors/hive/iceberg/TransformExprBuilder.h"
17+
#include "velox/core/Expressions.h"
18+
19+
namespace facebook::velox::connector::hive::iceberg {
20+
21+
namespace {
22+
23+
/// Converts a single partition field to a typed expression.
24+
///
25+
/// Builds an expression tree for one partition transform. Identity transforms
26+
/// become FieldAccessTypedExpr, while other transforms (bucket, truncate,
27+
/// year, month, day, hour) become CallTypedExpr with appropriate function
28+
/// names and parameters.
29+
///
30+
/// @param field Partition field containing transform type, source column
31+
/// type, and optional parameter (e.g., bucket count, truncate width).
32+
/// @param inputFieldName Name of the source column in the input RowVector.
33+
/// @param icebergFuncPrefix Prefix of iceberg transform function names.
34+
/// @return Typed expression representing the transform.
35+
core::TypedExprPtr toExpression(
36+
const IcebergPartitionSpec::Field& field,
37+
const std::string& inputFieldName,
38+
const std::string& icebergFuncPrefix) {
39+
// For identity transform, just return a field access expression.
40+
if (field.transformType == TransformType::kIdentity) {
41+
return std::make_shared<core::FieldAccessTypedExpr>(
42+
field.type, inputFieldName);
43+
}
44+
45+
// For other transforms, build a CallTypedExpr with the appropriate function.
46+
std::string functionName;
47+
switch (field.transformType) {
48+
case TransformType::kBucket:
49+
functionName = icebergFuncPrefix + "bucket";
50+
break;
51+
case TransformType::kTruncate:
52+
functionName = icebergFuncPrefix + "truncate";
53+
break;
54+
case TransformType::kYear:
55+
functionName = icebergFuncPrefix + "years";
56+
break;
57+
case TransformType::kMonth:
58+
functionName = icebergFuncPrefix + "months";
59+
break;
60+
case TransformType::kDay:
61+
functionName = icebergFuncPrefix + "days";
62+
break;
63+
case TransformType::kHour:
64+
functionName = icebergFuncPrefix + "hours";
65+
break;
66+
case TransformType::kIdentity:
67+
break;
68+
}
69+
70+
// Build the expression arguments.
71+
std::vector<core::TypedExprPtr> exprArgs;
72+
if (field.parameter.has_value()) {
73+
exprArgs.emplace_back(
74+
std::make_shared<core::ConstantTypedExpr>(
75+
INTEGER(), Variant(field.parameter.value())));
76+
}
77+
exprArgs.emplace_back(
78+
std::make_shared<core::FieldAccessTypedExpr>(field.type, inputFieldName));
79+
80+
return std::make_shared<core::CallTypedExpr>(
81+
field.resultType(), std::move(exprArgs), functionName);
82+
}
83+
84+
} // namespace
85+
86+
std::vector<core::TypedExprPtr> TransformExprBuilder::toExpressions(
87+
const IcebergPartitionSpecPtr& partitionSpec,
88+
const std::vector<column_index_t>& partitionChannels,
89+
const RowTypePtr& inputType,
90+
const std::string& icebergFuncPrefix) {
91+
VELOX_CHECK_EQ(
92+
partitionSpec->fields.size(),
93+
partitionChannels.size(),
94+
"Number of partition fields must match number of partition channels");
95+
96+
const auto numTransforms = partitionChannels.size();
97+
std::vector<core::TypedExprPtr> transformExprs;
98+
transformExprs.reserve(numTransforms);
99+
100+
for (auto i = 0; i < numTransforms; i++) {
101+
const auto channel = partitionChannels[i];
102+
transformExprs.emplace_back(toExpression(
103+
partitionSpec->fields.at(i),
104+
inputType->nameOf(channel),
105+
icebergFuncPrefix));
106+
}
107+
108+
return transformExprs;
109+
}
110+
111+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
20+
#include "velox/expression/Expr.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Converts Iceberg partition specification to Velox expressions.
25+
class TransformExprBuilder {
26+
public:
27+
/// Converts partition specification to a list of typed expressions.
28+
///
29+
/// @param partitionSpec Iceberg partition specification containing transform
30+
/// definitions for each partition field.
31+
/// @param partitionChannels Column indices (0-based) in the input RowVector
32+
/// that correspond to each partition field. Must have the same size as
33+
/// partitionSpec->fields. Provides the positional mapping from partition spec
34+
/// fields to input RowVector columns.
35+
/// @param inputType The row type of the input data. This is necessary for
36+
/// building expressions because the column names in partitionSpec reference
37+
/// table schema names, which might not match the column names in inputType
38+
/// (e.g., inputType may use generated names like c0, c1, c2). The
39+
/// FieldAccessTypedExpr must be built using the actual column names from
40+
/// inputType that will be present at runtime. The partitionChannels provide
41+
/// the positional mapping to locate the correct columns.
42+
/// @param icebergFuncPrefix Prefix for Iceberg transform function names.
43+
/// @return Vector of typed expressions, one for each partition field.
44+
static std::vector<core::TypedExprPtr> toExpressions(
45+
const IcebergPartitionSpecPtr& partitionSpec,
46+
const std::vector<column_index_t>& partitionChannels,
47+
const RowTypePtr& inputType,
48+
const std::string& icebergFuncPrefix);
49+
};
50+
51+
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST)
6464
IcebergTestBase.cpp
6565
Main.cpp
6666
PartitionSpecTest.cpp
67+
TransformTest.cpp
6768
)
6869

6970
add_test(velox_hive_iceberg_insert_test velox_hive_iceberg_insert_test)

0 commit comments

Comments
 (0)