Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions axiom/optimizer/Optimization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1485,8 +1485,6 @@ Distribution somePartition(const RelationOpPtrVector& inputs) {
}

DistributionType distributionType;
distributionType.numPartitions =
queryCtx()->optimization()->runnerOptions().numWorkers;
distributionType.locus = firstInput->distribution().distributionType.locus;

return {distributionType, std::move(columns)};
Expand Down
1 change: 0 additions & 1 deletion axiom/optimizer/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ std::string Distribution::toString() const {
if (!partition.empty()) {
out << "P ";
exprsToString(partition, out);
out << " " << distributionType.numPartitions << " ways";
}
if (!orderKeys.empty()) {
out << " O ";
Expand Down
7 changes: 3 additions & 4 deletions axiom/optimizer/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,13 @@ enum class ShuffleMode : uint8_t {
kHive,
};

/// Distribution of data. 'numPartitions' is 1 if the data is not partitioned.
/// Distribution of data.
/// There is copartitioning if the DistributionType is the same on both sides
/// and both sides have an equal number of 1:1 type matched partitioning keys.
struct DistributionType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DistributionType is incomplete, but removing numPartitions is not going to fix that. It needs more information about partitioning. It looks like @hdikeman is wrapping up Connector API changes for Table Write and I should have bandwidth to work on adding TableWrite support to the optimizer. As part of that work I expect to come and revisit DistributionType struct.

Copy link
Collaborator Author

@MBkkt MBkkt Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I made it complete in different PR, where partitioning is a function.
But it's quite nontrivial PR that contains different parts, so I decided to start with simple part.

Number of partitions not needed, because count of partitions controlled by optimizer options (numDrivers/numWorkers) when we plan partition.
And it doesn't needed for broadcast/gather, because they have specific behavior.
The only case where is it needed it's write table, but for this case it should be implemented in more abstract way with partition type/function (that internally have number of buckets for hive for an example).
Also table scan, but it's not implement partitioning now

About @hdikeman work, I have PR that implements TableWrite in optimizer, I plan to rebase it after Henry PR with connector api changes will be merged.
So this PR will contains implementation for TableWrite in optimizer.
It will be for TestConnector and for LocalHiveConnectorMetadata but with copartition for hive disabled because it requires some changes in runner.

Does it sounds ok to you?

bool operator==(const DistributionType& other) const = default;

LocusCP locus{nullptr};
int32_t numPartitions{1};
bool isGather{false};
ShuffleMode mode{ShuffleMode::kNone};

Expand Down Expand Up @@ -199,7 +198,7 @@ struct Distribution {
DistributionType distributionType;

// Partitioning columns. The values of these columns determine which of
// 'numPartitions' contains any given row. This does not specify the
// paritions contains any given row. This does not specify the
// partition function (e.g. Hive bucket or range partition).
ExprVector partition;

Expand All @@ -225,7 +224,7 @@ struct Distribution {
// because lineitem has an average of 4 repeats of orderkey.
float spacing{-1};

// True if the data is replicated to 'numPartitions'.
// True if the data is replicated to all partitions.
bool isBroadcast{false};
};

Expand Down
Loading