Skip to content

Commit 97daa1d

Browse files
committed
Add syntax for DDL for arroyo dialect
1 parent 8c1c36b commit 97daa1d

File tree

5 files changed

+160
-0
lines changed

5 files changed

+160
-0
lines changed

src/ast/dml.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ pub struct CreateTable {
209209
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
210210
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
211211
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
212+
/// Arroyo-specific: Iceberg partition transforms
213+
/// Syntax: PARTITIONED BY (hour(ts), bucket(32, id), truncate(8, color))
214+
/// <https://iceberg.apache.org/spec/#partitioning>
215+
pub arroyo_partitions: Option<Vec<Expr>>,
212216
}
213217

214218
impl Display for CreateTable {
@@ -392,6 +396,9 @@ impl Display for CreateTable {
392396
if let Some(cluster_by) = self.cluster_by.as_ref() {
393397
write!(f, " CLUSTER BY {cluster_by}")?;
394398
}
399+
if let Some(arroyo_partitions) = &self.arroyo_partitions {
400+
write!(f, " PARTITIONED BY ({})", display_comma_separated(arroyo_partitions))?;
401+
}
395402

396403
if let Some(options) = self.options.as_ref() {
397404
write!(

src/ast/helpers/stmt_create_table.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pub struct CreateTableBuilder {
112112
pub catalog: Option<String>,
113113
pub catalog_sync: Option<String>,
114114
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
115+
pub arroyo_partitions: Option<Vec<Expr>>,
115116
}
116117

117118
impl CreateTableBuilder {
@@ -166,6 +167,7 @@ impl CreateTableBuilder {
166167
catalog: None,
167168
catalog_sync: None,
168169
storage_serialization_policy: None,
170+
arroyo_partitions: None,
169171
}
170172
}
171173
pub fn or_replace(mut self, or_replace: bool) -> Self {
@@ -415,6 +417,11 @@ impl CreateTableBuilder {
415417
self
416418
}
417419

420+
pub fn arroyo_partitions(mut self, arroyo_partitions: Option<Vec<Expr>>) -> Self {
421+
self.arroyo_partitions = arroyo_partitions;
422+
self
423+
}
424+
418425
pub fn build(self) -> Statement {
419426
Statement::CreateTable(CreateTable {
420427
or_replace: self.or_replace,
@@ -466,6 +473,7 @@ impl CreateTableBuilder {
466473
catalog: self.catalog,
467474
catalog_sync: self.catalog_sync,
468475
storage_serialization_policy: self.storage_serialization_policy,
476+
arroyo_partitions: self.arroyo_partitions,
469477
})
470478
}
471479
}
@@ -527,6 +535,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
527535
catalog,
528536
catalog_sync,
529537
storage_serialization_policy,
538+
arroyo_partitions,
530539
}) => Ok(Self {
531540
or_replace,
532541
temporary,
@@ -577,6 +586,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
577586
catalog,
578587
catalog_sync,
579588
storage_serialization_policy,
589+
arroyo_partitions,
580590
}),
581591
_ => Err(ParserError::ParserError(format!(
582592
"Expected create table statement, but received: {stmt}"

src/ast/spans.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ impl Spanned for CreateTable {
585585
catalog: _, // todo, Snowflake specific
586586
catalog_sync: _, // todo, Snowflake specific
587587
storage_serialization_policy: _, // todo, Snowflake specific
588+
arroyo_partitions: _, // todo, Arroyo specific
588589
} = self;
589590

590591
union_spans(

src/parser/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6785,6 +6785,18 @@ impl<'a> Parser<'a> {
67856785
None
67866786
};
67876787

6788+
// Parse Arroyo-specific PARTITIONED BY for Iceberg
6789+
let arroyo_partitions = if dialect_of!(self is ArroyoDialect | GenericDialect)
6790+
&& self.parse_keywords(&[Keyword::PARTITIONED, Keyword::BY])
6791+
{
6792+
self.expect_token(&Token::LParen)?;
6793+
let partitions = self.parse_comma_separated(Parser::parse_expr)?;
6794+
self.expect_token(&Token::RParen)?;
6795+
Some(partitions)
6796+
} else {
6797+
None
6798+
};
6799+
67886800
let create_table_config = self.parse_optional_create_table_config()?;
67896801

67906802
let default_charset = if self.parse_keywords(&[Keyword::DEFAULT, Keyword::CHARSET]) {
@@ -6866,6 +6878,7 @@ impl<'a> Parser<'a> {
68666878
.options(create_table_config.options)
68676879
.primary_key(primary_key)
68686880
.strict(strict)
6881+
.arroyo_partitions(arroyo_partitions)
68696882
.build())
68706883
}
68716884

tests/sqlparser_arroyo.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,132 @@ fn test_metadata_field() {
115115
"Expected METADATA FROM option in column definition"
116116
);
117117
}
118+
119+
#[test]
120+
fn test_iceberg_partitioned_by() {
121+
let sql = "CREATE TABLE ice (
122+
ts TIMESTAMP NOT NULL,
123+
id INT NOT NULL,
124+
favorite_color TEXT
125+
) WITH (
126+
connector = 'iceberg',
127+
format = 'parquet',
128+
table_name = 'arroyo_test'
129+
) PARTITIONED BY (
130+
hour(ts),
131+
bucket(32, id),
132+
truncate(8, favorite_color)
133+
)";
134+
135+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
136+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
137+
panic!("not create table")
138+
};
139+
140+
// Verify basic structure
141+
assert_eq!(ct.name.to_string(), "ice");
142+
assert_eq!(ct.columns.len(), 3);
143+
144+
// Verify arroyo_partitions is present
145+
let partitions = ct.arroyo_partitions.as_ref().expect("Expected arroyo_partitions to be Some");
146+
assert_eq!(partitions.len(), 3);
147+
148+
// Check each partition transform
149+
// hour(ts)
150+
match &partitions[0] {
151+
Expr::Function(f) => {
152+
assert_eq!(f.name.to_string(), "hour");
153+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
154+
assert_eq!(list.args.len(), 1);
155+
} else {
156+
panic!("Expected List arguments");
157+
}
158+
}
159+
_ => panic!("Expected Function for hour(ts)"),
160+
}
161+
162+
// bucket(32, id)
163+
match &partitions[1] {
164+
Expr::Function(f) => {
165+
assert_eq!(f.name.to_string(), "bucket");
166+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
167+
assert_eq!(list.args.len(), 2);
168+
} else {
169+
panic!("Expected List arguments");
170+
}
171+
}
172+
_ => panic!("Expected Function for bucket(32, id)"),
173+
}
174+
175+
// truncate(8, favorite_color)
176+
match &partitions[2] {
177+
Expr::Function(f) => {
178+
assert_eq!(f.name.to_string(), "truncate");
179+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
180+
assert_eq!(list.args.len(), 2);
181+
} else {
182+
panic!("Expected List arguments");
183+
}
184+
}
185+
_ => panic!("Expected Function for truncate(8, favorite_color)"),
186+
}
187+
188+
// Test round-trip: the formatted output should parse back to the same structure
189+
let formatted = ct.to_string();
190+
let reparsed = Parser::parse_sql(&ArroyoDialect {}, &formatted).unwrap();
191+
let Statement::CreateTable(ct2) = reparsed.get(0).unwrap() else {
192+
panic!("not create table on reparse")
193+
};
194+
195+
assert_eq!(ct.arroyo_partitions, ct2.arroyo_partitions);
196+
}
197+
198+
#[test]
199+
fn test_iceberg_partitioned_by_single() {
200+
let sql = "CREATE TABLE events (
201+
event_time TIMESTAMP
202+
) WITH (
203+
connector = 'iceberg'
204+
) PARTITIONED BY (day(event_time))";
205+
206+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
207+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
208+
panic!("not create table")
209+
};
210+
211+
let partitions = ct.arroyo_partitions.as_ref().expect("Expected arroyo_partitions");
212+
assert_eq!(partitions.len(), 1);
213+
214+
match &partitions[0] {
215+
Expr::Function(f) => {
216+
assert_eq!(f.name.to_string(), "day");
217+
}
218+
_ => panic!("Expected Function for day(event_time)"),
219+
}
220+
}
221+
222+
#[test]
223+
fn test_iceberg_partitioned_by_identity() {
224+
// Test identity transform (just a column name)
225+
let sql = "CREATE TABLE data (
226+
region TEXT,
227+
value INT
228+
) WITH (
229+
connector = 'iceberg'
230+
) PARTITIONED BY (region)";
231+
232+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
233+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
234+
panic!("not create table")
235+
};
236+
237+
let partitions = ct.arroyo_partitions.as_ref().expect("Expected arroyo_partitions");
238+
assert_eq!(partitions.len(), 1);
239+
240+
match &partitions[0] {
241+
Expr::Identifier(ident) => {
242+
assert_eq!(ident.value, "region");
243+
}
244+
_ => panic!("Expected Identifier for region"),
245+
}
246+
}

0 commit comments

Comments
 (0)