Skip to content

Commit a86b10a

Browse files
committed
fix: Fix derived table breaks on with joins
ToGraph builds derived tables bottom up. On the returning edge of recursion we add joins or dt postprocessing steps like group by or limit. When there are things that do not fit the implied processing order of a dt, i.e. joins, group by, having, orderby, limit/ofset, we wrap the plan so far in another dt. For example, scan, limit, filter, limit would have (scan, limit) in a dt and the filter and second limit in a dt containing the first one. Now, when a dt as above is to the right of a join, we must start the dt to the right from scratch, meaning that the tables in the dt must be empty and not contain tables from the left side. On the other hand, for a right side that does not introduce a dt break, we must add the tables on the right to the dt where the left side was added. To this effect we set and check allowedInDt correctly also for filters.
1 parent 546fa24 commit a86b10a

File tree

89 files changed

+13537
-167
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+13537
-167
lines changed

axiom/connectors/hive/LocalHiveConnectorMetadata.cpp

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <folly/json.h>
2121
#include <sys/stat.h>
2222
#include <unistd.h>
23+
#include <fstream>
2324
#include "axiom/optimizer/JsonUtil.h"
2425
#include "velox/connectors/Connector.h"
2526
#include "velox/connectors/hive/HiveConnectorSplit.h"
@@ -155,6 +156,17 @@ void LocalHiveConnectorMetadata::initialize() {
155156
format_ = formatName == "dwrf" ? velox::dwio::common::FileFormat::DWRF
156157
: formatName == "parquet" ? velox::dwio::common::FileFormat::PARQUET
157158
: velox::dwio::common::FileFormat::UNKNOWN;
159+
160+
// Reset existing connector query context and schema pool to avoid duplicate
161+
// memory pool names when reinitialize() is called. This ensures the old
162+
// "schemaReader" memory pool is destroyed before creating a new one.
163+
if (connectorQueryCtx_) {
164+
connectorQueryCtx_.reset();
165+
}
166+
if (schemaPool_) {
167+
schemaPool_.reset();
168+
}
169+
158170
makeQueryCtx();
159171
makeConnectorQueryCtx();
160172
readTables(path);
@@ -1145,4 +1157,255 @@ bool LocalHiveConnectorMetadata::dropTable(
11451157
return tables_.erase(tableName) == 1;
11461158
}
11471159

1160+
void LocalHiveConnectorMetadata::saveColumnStats(const std::string& path) {
1161+
ensureInitialized();
1162+
std::lock_guard<std::mutex> l(mutex_);
1163+
1164+
folly::dynamic root = folly::dynamic::object;
1165+
1166+
for (const auto& [tableName, table] : tables_) {
1167+
folly::dynamic tableData = folly::dynamic::object;
1168+
1169+
// Save table-level row count (total cardinality)
1170+
tableData["numRows"] = table->numRows();
1171+
1172+
// Save layout information if available
1173+
const auto& layouts = table->layouts();
1174+
if (!layouts.empty()) {
1175+
folly::dynamic layoutsData = folly::dynamic::array;
1176+
for (const auto* layout : layouts) {
1177+
folly::dynamic layoutData = folly::dynamic::object;
1178+
layoutData["name"] = layout->name();
1179+
// Note: Currently TableLayout does not have its own cardinality field.
1180+
// It uses the Table's numRows(). If layout-specific cardinality
1181+
// is added in the future, save it here.
1182+
layoutsData.push_back(layoutData);
1183+
}
1184+
tableData["layouts"] = layoutsData;
1185+
}
1186+
1187+
folly::dynamic columns = folly::dynamic::object;
1188+
for (const auto& [columnName, column] : table->columns()) {
1189+
const auto* stats = column->stats();
1190+
if (!stats) {
1191+
continue; // Skip columns without statistics
1192+
}
1193+
1194+
folly::dynamic columnStats = folly::dynamic::object;
1195+
columnStats["nonNull"] = stats->nonNull;
1196+
columnStats["nullPct"] = stats->nullPct;
1197+
columnStats["numValues"] = stats->numValues;
1198+
1199+
// Handle optional min value
1200+
if (stats->min.has_value()) {
1201+
columnStats["min"] = stats->min->toJson(column->type());
1202+
columnStats["minType"] =
1203+
std::string(velox::TypeKindName::toName(column->type()->kind()));
1204+
}
1205+
1206+
// Handle optional max value
1207+
if (stats->max.has_value()) {
1208+
columnStats["max"] = stats->max->toJson(column->type());
1209+
columnStats["maxType"] =
1210+
std::string(velox::TypeKindName::toName(column->type()->kind()));
1211+
}
1212+
1213+
// Handle optional fields
1214+
if (stats->maxLength.has_value()) {
1215+
columnStats["maxLength"] = stats->maxLength.value();
1216+
}
1217+
if (stats->avgLength.has_value()) {
1218+
columnStats["avgLength"] = stats->avgLength.value();
1219+
}
1220+
if (stats->ascendingPct.has_value()) {
1221+
columnStats["ascendingPct"] = stats->ascendingPct.value();
1222+
}
1223+
if (stats->descendingPct.has_value()) {
1224+
columnStats["descendingPct"] = stats->descendingPct.value();
1225+
}
1226+
if (stats->numDistinct.has_value()) {
1227+
columnStats["numDistinct"] = stats->numDistinct.value();
1228+
}
1229+
1230+
columns[columnName] = columnStats;
1231+
}
1232+
1233+
tableData["columns"] = columns;
1234+
root[tableName] = tableData;
1235+
}
1236+
1237+
// Write to file
1238+
std::ofstream outFile(path);
1239+
if (!outFile.is_open()) {
1240+
VELOX_USER_FAIL("Failed to open file for writing: {}", path);
1241+
}
1242+
outFile << folly::toPrettyJson(root);
1243+
outFile.close();
1244+
}
1245+
1246+
void LocalHiveConnectorMetadata::loadColumnStats(const std::string& path) {
1247+
ensureInitialized();
1248+
std::lock_guard<std::mutex> l(mutex_);
1249+
1250+
// Read and parse JSON file
1251+
std::ifstream inFile(path);
1252+
if (!inFile.is_open()) {
1253+
VELOX_USER_FAIL("Failed to open file for reading: {}", path);
1254+
}
1255+
1256+
std::string content(
1257+
(std::istreambuf_iterator<char>(inFile)),
1258+
std::istreambuf_iterator<char>());
1259+
inFile.close();
1260+
1261+
folly::dynamic root;
1262+
try {
1263+
root = folly::parseJson(content);
1264+
} catch (const std::exception& e) {
1265+
VELOX_USER_FAIL("Failed to parse JSON: {}", e.what());
1266+
}
1267+
1268+
// Helper function to parse a variant from JSON based on TypeKind
1269+
auto parseVariant = [](const folly::dynamic& jsonValue,
1270+
velox::TypeKind typeKind) -> velox::Variant {
1271+
switch (typeKind) {
1272+
case velox::TypeKind::TINYINT:
1273+
return velox::Variant(static_cast<int8_t>(jsonValue.asInt()));
1274+
case velox::TypeKind::SMALLINT:
1275+
return velox::Variant(static_cast<int16_t>(jsonValue.asInt()));
1276+
case velox::TypeKind::INTEGER:
1277+
return velox::Variant(static_cast<int32_t>(jsonValue.asInt()));
1278+
case velox::TypeKind::BIGINT:
1279+
return velox::Variant(jsonValue.asInt());
1280+
case velox::TypeKind::REAL:
1281+
return velox::Variant(static_cast<float>(jsonValue.asDouble()));
1282+
case velox::TypeKind::DOUBLE:
1283+
return velox::Variant(jsonValue.asDouble());
1284+
case velox::TypeKind::VARCHAR:
1285+
case velox::TypeKind::VARBINARY:
1286+
return velox::Variant(jsonValue.asString());
1287+
case velox::TypeKind::BOOLEAN:
1288+
return velox::Variant(jsonValue.asBool());
1289+
default:
1290+
VELOX_UNSUPPORTED(
1291+
"Unsupported type kind for variant parsing: {}",
1292+
velox::TypeKindName::toName(typeKind));
1293+
}
1294+
};
1295+
1296+
// Iterate through tables in the JSON
1297+
for (const auto& [tableNameDynamic, tableDataDynamic] : root.items()) {
1298+
const std::string tableName = tableNameDynamic.asString();
1299+
1300+
// Find the table in the current metadata (skip if not found)
1301+
auto tableIt = tables_.find(tableName);
1302+
if (tableIt == tables_.end()) {
1303+
continue; // Table doesn't exist in current metadata, skip it
1304+
}
1305+
1306+
auto& table = tableIt->second;
1307+
const auto& tableData = tableDataDynamic;
1308+
1309+
// Load table-level row count (total cardinality) if present
1310+
if (tableData.count("numRows")) {
1311+
table->setNumRows(tableData["numRows"].asInt());
1312+
}
1313+
1314+
// Load layout information if present
1315+
// Note: Currently TableLayout does not have mutable cardinality fields.
1316+
// If layout-specific cardinality is added in the future and needs to be
1317+
// restored, process it here from tableData["layouts"].
1318+
if (tableData.count("layouts")) {
1319+
// Layouts array exists in the saved data.
1320+
// Currently we don't need to restore anything layout-specific,
1321+
// but this structure is here for future extensibility.
1322+
// Future: if layouts have their own row counts, restore them here:
1323+
// const auto& layoutsData = tableData["layouts"];
1324+
// for each layout in layoutsData, restore layout-specific cardinality
1325+
}
1326+
1327+
// Load column statistics
1328+
if (tableData.count("columns")) {
1329+
const auto& columnsData = tableData["columns"];
1330+
for (const auto& [columnNameDynamic, columnStatsDynamic] :
1331+
columnsData.items()) {
1332+
const std::string columnName = columnNameDynamic.asString();
1333+
1334+
// Find the column in the table (skip if not found)
1335+
auto columnIt = table->columns().find(columnName);
1336+
if (columnIt == table->columns().end()) {
1337+
continue; // Column doesn't exist in current table, skip it
1338+
}
1339+
1340+
auto& column = columnIt->second;
1341+
const auto& columnStatsData = columnStatsDynamic;
1342+
1343+
// Create new ColumnStatistics object
1344+
auto stats = std::make_unique<ColumnStatistics>();
1345+
1346+
// Load basic fields
1347+
if (columnStatsData.count("nonNull")) {
1348+
stats->nonNull = columnStatsData["nonNull"].asBool();
1349+
}
1350+
if (columnStatsData.count("nullPct")) {
1351+
stats->nullPct = columnStatsData["nullPct"].asDouble();
1352+
}
1353+
if (columnStatsData.count("numValues")) {
1354+
stats->numValues = columnStatsData["numValues"].asInt();
1355+
}
1356+
1357+
// Load min value if present
1358+
if (columnStatsData.count("min") && columnStatsData.count("minType")) {
1359+
const auto minJson = columnStatsData["min"];
1360+
const auto minTypeStr = columnStatsData["minType"].asString();
1361+
const auto minTypeKind = velox::TypeKindName::toTypeKind(minTypeStr);
1362+
// Parse the min value based on type
1363+
try {
1364+
stats->min = parseVariant(minJson, minTypeKind);
1365+
} catch (const std::exception& e) {
1366+
// If parsing fails, skip this field
1367+
LOG(WARNING) << "Failed to parse min value for column "
1368+
<< columnName << ": " << e.what();
1369+
}
1370+
}
1371+
1372+
// Load max value if present
1373+
if (columnStatsData.count("max") && columnStatsData.count("maxType")) {
1374+
const auto maxJson = columnStatsData["max"];
1375+
const auto maxTypeStr = columnStatsData["maxType"].asString();
1376+
const auto maxTypeKind = velox::TypeKindName::toTypeKind(maxTypeStr);
1377+
// Parse the max value based on type
1378+
try {
1379+
stats->max = parseVariant(maxJson, maxTypeKind);
1380+
} catch (const std::exception& e) {
1381+
// If parsing fails, skip this field
1382+
LOG(WARNING) << "Failed to parse max value for column "
1383+
<< columnName << ": " << e.what();
1384+
}
1385+
}
1386+
1387+
// Load optional fields
1388+
if (columnStatsData.count("maxLength")) {
1389+
stats->maxLength = columnStatsData["maxLength"].asInt();
1390+
}
1391+
if (columnStatsData.count("avgLength")) {
1392+
stats->avgLength = columnStatsData["avgLength"].asInt();
1393+
}
1394+
if (columnStatsData.count("ascendingPct")) {
1395+
stats->ascendingPct = columnStatsData["ascendingPct"].asDouble();
1396+
}
1397+
if (columnStatsData.count("descendingPct")) {
1398+
stats->descendingPct = columnStatsData["descendingPct"].asDouble();
1399+
}
1400+
if (columnStatsData.count("numDistinct")) {
1401+
stats->numDistinct = columnStatsData["numDistinct"].asInt();
1402+
}
1403+
1404+
// Set the statistics on the column
1405+
column->setStats(std::move(stats));
1406+
}
1407+
}
1408+
}
1409+
}
1410+
11481411
} // namespace facebook::axiom::connector::hive

axiom/connectors/hive/LocalHiveConnectorMetadata.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ class LocalTable : public Table {
178178
numRows_ += n;
179179
}
180180

181+
void setNumRows(int64_t numRows) {
182+
numRows_ = numRows;
183+
}
184+
181185
/// Samples 'samplePct' % rows of the table and sets the num distincts
182186
/// estimate for the columns. uses 'pool' for temporary data.
183187
void sampleNumDistincts(float samplePct, velox::memory::MemoryPool* pool);
@@ -280,6 +284,21 @@ class LocalHiveConnectorMetadata : public HiveConnectorMetadata {
280284
return dropTable(nullptr, tableName, true);
281285
}
282286

287+
/// Serializes column statistics (cardinality, numRows, min/max values, etc.)
288+
/// to JSON format. For each table, saves the total row count (numRows) and
289+
/// layout information. Each column is recorded with its statistics. The
290+
/// serialization is written to 'path' in the file system. If layouts have
291+
/// row count or cardinality information, it will also be saved.
292+
void saveColumnStats(const std::string& path);
293+
294+
/// Reads the serialization made by saveColumnStats() and stores the values
295+
/// into the same members of the right columns of the right tables. Restores
296+
/// table-level row counts (total cardinality) and any layout-specific
297+
/// cardinality if present. Note that the file can have more tables and
298+
/// columns than exist in the connector at the time of loading. The extra
299+
/// will be ignored.
300+
void loadColumnStats(const std::string& path);
301+
283302
private:
284303
void ensureInitialized() const override;
285304
void makeQueryCtx();

0 commit comments

Comments
 (0)