diff --git a/ballista/core/proto/datafusion.proto b/ballista/core/proto/datafusion.proto index f601d8837..fe81be0d4 100644 --- a/ballista/core/proto/datafusion.proto +++ b/ballista/core/proto/datafusion.proto @@ -278,6 +278,7 @@ message DmlNode{ INSERT_APPEND = 3; INSERT_OVERWRITE = 4; INSERT_REPLACE = 5; + TRUNCATE = 6; } Type dml_type = 1; LogicalPlanNode input = 2; @@ -749,6 +750,8 @@ message PhysicalPlanNode { SortMergeJoinExecNode sort_merge_join = 34; MemoryScanExecNode memory_scan = 35; AsyncFuncExecNode async_func = 36; + BufferExecNode buffer = 37; + ArrowScanExecNode arrow_scan = 38; } } @@ -758,6 +761,16 @@ message PartitionColumn { } +// Determines how file sink output paths are interpreted. +enum FileOutputMode { + // Infer output mode from the URL (extension/trailing `/` heuristic). + FILE_OUTPUT_MODE_AUTOMATIC = 0; + // Write to a single file at the exact output path. + FILE_OUTPUT_MODE_SINGLE_FILE = 1; + // Write to a directory with generated filenames. + FILE_OUTPUT_MODE_DIRECTORY = 2; +} + message FileSinkConfig { reserved 6; // writer_mode reserved 8; // was `overwrite` which has been superseded by `insert_op` @@ -770,6 +783,8 @@ message FileSinkConfig { bool keep_partition_by_columns = 9; InsertOp insert_op = 10; string file_extension = 11; + // Determines how the output path is interpreted. + FileOutputMode file_output_mode = 12; } enum InsertOp { @@ -837,6 +852,14 @@ message PhysicalExprNode { // Was date_time_interval_expr reserved 17; + // Unique identifier for this expression to do deduplication during deserialization. + // When serializing, this is set to a unique identifier for each combination of + // expression, process and serialization run. + // When deserializing, if this ID has been seen before, the cached Arc is returned + // instead of creating a new one, enabling reconstruction of referential integrity + // across serde roundtrips. + optional uint64 expr_id = 30; + oneof ExprType { // column references PhysicalColumn column = 1; @@ -1006,6 +1029,8 @@ message FilterExecNode { PhysicalExprNode expr = 2; uint32 default_filter_selectivity = 3; repeated uint32 projection = 9; + uint32 batch_size = 10; + optional uint32 fetch = 11; } message FileGroup { @@ -1083,6 +1108,10 @@ message AvroScanExecNode { FileScanExecConf base_conf = 1; } +message ArrowScanExecNode { + FileScanExecConf base_conf = 1; +} + message MemoryScanExecNode { repeated bytes partitions = 1; datafusion_common.Schema schema = 2; @@ -1111,6 +1140,7 @@ message HashJoinExecNode { datafusion_common.NullEquality null_equality = 7; JoinFilter filter = 8; repeated uint32 projection = 9; + bool null_aware = 10; } enum StreamPartitionMode { @@ -1190,6 +1220,7 @@ enum AggregateMode { FINAL_PARTITIONED = 2; SINGLE = 3; SINGLE_PARTITIONED = 4; + PARTIAL_REDUCE = 5; } message PartiallySortedInputOrderMode { @@ -1219,6 +1250,8 @@ message MaybePhysicalSortExprs { message AggLimit { // wrap into a message to make it optional uint64 limit = 1; + // Optional ordering direction for TopK aggregation (true = descending, false = ascending) + optional bool descending = 2; } message AggregateExecNode { @@ -1412,3 +1445,8 @@ message AsyncFuncExecNode { repeated PhysicalExprNode async_exprs = 2; repeated string async_expr_names = 3; } + +message BufferExecNode { + PhysicalPlanNode input = 1; + uint64 capacity = 2; +} \ No newline at end of file diff --git a/ballista/core/proto/datafusion_common.proto b/ballista/core/proto/datafusion_common.proto index e88528115..5848455cb 100644 --- a/ballista/core/proto/datafusion_common.proto +++ b/ballista/core/proto/datafusion_common.proto @@ -183,6 +183,11 @@ message Map { bool keys_sorted = 2; } +message RunEndEncoded { + Field run_ends_field = 1; + Field values_field = 2; +} + enum UnionMode{ sparse = 0; dense = 1; @@ -236,6 +241,12 @@ message ScalarDictionaryValue { ScalarValue value = 2; } +message ScalarRunEndEncodedValue { + Field run_ends_field = 1; + Field values_field = 2; + ScalarValue value = 3; +} + message IntervalDayTimeValue { int32 days = 1; int32 milliseconds = 2; @@ -321,6 +332,8 @@ message ScalarValue{ IntervalMonthDayNanoValue interval_month_day_nano = 31; ScalarFixedSizeBinary fixed_size_binary_value = 34; UnionValue union_value = 42; + + ScalarRunEndEncodedValue run_end_encoded_value = 45; } } @@ -351,11 +364,11 @@ message Decimal256{ // Serialized data type message ArrowType{ oneof arrow_type_enum { - EmptyMessage NONE = 1; // arrow::Type::NA - EmptyMessage BOOL = 2; // arrow::Type::BOOL - EmptyMessage UINT8 = 3; // arrow::Type::UINT8 - EmptyMessage INT8 = 4; // arrow::Type::INT8 - EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h + EmptyMessage NONE = 1; // arrow::Type::NA + EmptyMessage BOOL = 2; // arrow::Type::BOOL + EmptyMessage UINT8 = 3; // arrow::Type::UINT8 + EmptyMessage INT8 = 4; // arrow::Type::INT8 + EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h EmptyMessage INT16 = 6; EmptyMessage UINT32 = 7; EmptyMessage INT32 = 8; @@ -389,6 +402,7 @@ message ArrowType{ Union UNION = 29; Dictionary DICTIONARY = 30; Map MAP = 33; + RunEndEncoded RUN_END_ENCODED = 42; } } @@ -469,6 +483,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level + optional bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] } message TableParquetOptions { @@ -622,4 +637,4 @@ message ColumnStats { Precision null_count = 3; Precision distinct_count = 4; Precision byte_size = 6; -} +} \ No newline at end of file