Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions ballista/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ message DmlNode{
INSERT_APPEND = 3;
INSERT_OVERWRITE = 4;
INSERT_REPLACE = 5;
TRUNCATE = 6;
}
Type dml_type = 1;
LogicalPlanNode input = 2;
Expand Down Expand Up @@ -749,6 +750,8 @@ message PhysicalPlanNode {
SortMergeJoinExecNode sort_merge_join = 34;
MemoryScanExecNode memory_scan = 35;
AsyncFuncExecNode async_func = 36;
BufferExecNode buffer = 37;
ArrowScanExecNode arrow_scan = 38;
}
}

Expand All @@ -758,6 +761,16 @@ message PartitionColumn {
}


// Determines how file sink output paths are interpreted.
enum FileOutputMode {
// Infer output mode from the URL (extension/trailing `/` heuristic).
FILE_OUTPUT_MODE_AUTOMATIC = 0;
// Write to a single file at the exact output path.
FILE_OUTPUT_MODE_SINGLE_FILE = 1;
// Write to a directory with generated filenames.
FILE_OUTPUT_MODE_DIRECTORY = 2;
}

message FileSinkConfig {
reserved 6; // writer_mode
reserved 8; // was `overwrite` which has been superseded by `insert_op`
Expand All @@ -770,6 +783,8 @@ message FileSinkConfig {
bool keep_partition_by_columns = 9;
InsertOp insert_op = 10;
string file_extension = 11;
// Determines how the output path is interpreted.
FileOutputMode file_output_mode = 12;
}

enum InsertOp {
Expand Down Expand Up @@ -837,6 +852,14 @@ message PhysicalExprNode {
// Was date_time_interval_expr
reserved 17;

// Unique identifier for this expression to do deduplication during deserialization.
// When serializing, this is set to a unique identifier for each combination of
// expression, process and serialization run.
// When deserializing, if this ID has been seen before, the cached Arc is returned
// instead of creating a new one, enabling reconstruction of referential integrity
// across serde roundtrips.
optional uint64 expr_id = 30;

oneof ExprType {
// column references
PhysicalColumn column = 1;
Expand Down Expand Up @@ -1006,6 +1029,8 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
repeated uint32 projection = 9;
uint32 batch_size = 10;
optional uint32 fetch = 11;
}

message FileGroup {
Expand Down Expand Up @@ -1083,6 +1108,10 @@ message AvroScanExecNode {
FileScanExecConf base_conf = 1;
}

message ArrowScanExecNode {
FileScanExecConf base_conf = 1;
}

message MemoryScanExecNode {
repeated bytes partitions = 1;
datafusion_common.Schema schema = 2;
Expand Down Expand Up @@ -1111,6 +1140,7 @@ message HashJoinExecNode {
datafusion_common.NullEquality null_equality = 7;
JoinFilter filter = 8;
repeated uint32 projection = 9;
bool null_aware = 10;
}

enum StreamPartitionMode {
Expand Down Expand Up @@ -1190,6 +1220,7 @@ enum AggregateMode {
FINAL_PARTITIONED = 2;
SINGLE = 3;
SINGLE_PARTITIONED = 4;
PARTIAL_REDUCE = 5;
}

message PartiallySortedInputOrderMode {
Expand Down Expand Up @@ -1219,6 +1250,8 @@ message MaybePhysicalSortExprs {
message AggLimit {
// wrap into a message to make it optional
uint64 limit = 1;
// Optional ordering direction for TopK aggregation (true = descending, false = ascending)
optional bool descending = 2;
}

message AggregateExecNode {
Expand Down Expand Up @@ -1412,3 +1445,8 @@ message AsyncFuncExecNode {
repeated PhysicalExprNode async_exprs = 2;
repeated string async_expr_names = 3;
}

message BufferExecNode {
PhysicalPlanNode input = 1;
uint64 capacity = 2;
}
27 changes: 21 additions & 6 deletions ballista/core/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ message Map {
bool keys_sorted = 2;
}

message RunEndEncoded {
Field run_ends_field = 1;
Field values_field = 2;
}

enum UnionMode{
sparse = 0;
dense = 1;
Expand Down Expand Up @@ -236,6 +241,12 @@ message ScalarDictionaryValue {
ScalarValue value = 2;
}

message ScalarRunEndEncodedValue {
Field run_ends_field = 1;
Field values_field = 2;
ScalarValue value = 3;
}

message IntervalDayTimeValue {
int32 days = 1;
int32 milliseconds = 2;
Expand Down Expand Up @@ -321,6 +332,8 @@ message ScalarValue{
IntervalMonthDayNanoValue interval_month_day_nano = 31;
ScalarFixedSizeBinary fixed_size_binary_value = 34;
UnionValue union_value = 42;

ScalarRunEndEncodedValue run_end_encoded_value = 45;
}
}

Expand Down Expand Up @@ -351,11 +364,11 @@ message Decimal256{
// Serialized data type
message ArrowType{
oneof arrow_type_enum {
EmptyMessage NONE = 1; // arrow::Type::NA
EmptyMessage BOOL = 2; // arrow::Type::BOOL
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
EmptyMessage INT8 = 4; // arrow::Type::INT8
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage NONE = 1; // arrow::Type::NA
EmptyMessage BOOL = 2; // arrow::Type::BOOL
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
EmptyMessage INT8 = 4; // arrow::Type::INT8
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage INT16 = 6;
EmptyMessage UINT32 = 7;
EmptyMessage INT32 = 8;
Expand Down Expand Up @@ -389,6 +402,7 @@ message ArrowType{
Union UNION = 29;
Dictionary DICTIONARY = 30;
Map MAP = 33;
RunEndEncoded RUN_END_ENCODED = 42;
}
}

Expand Down Expand Up @@ -469,6 +483,7 @@ message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
optional uint32 compression_level = 3; // Optional compression level
optional bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...]
}

message TableParquetOptions {
Expand Down Expand Up @@ -622,4 +637,4 @@ message ColumnStats {
Precision null_count = 3;
Precision distinct_count = 4;
Precision byte_size = 6;
}
}
Loading