Skip to content

Commit 8c1c36b

Browse files
committed
Implement METADATA FROM syntax
1 parent 2279ac9 commit 8c1c36b

File tree

4 files changed

+63
-2
lines changed

4 files changed

+63
-2
lines changed

src/ast/ddl.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::ast::{
3737
ValueWithSpan,
3838
};
3939
use crate::keywords::Keyword;
40-
use crate::tokenizer::Token;
40+
use crate::tokenizer::{Span, Token};
4141

4242
/// An `ALTER TABLE` (`Statement::AlterTable`) operation
4343
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
@@ -1655,6 +1655,20 @@ pub enum ColumnOption {
16551655
/// false if 'GENERATED ALWAYS' is skipped (option starts with AS)
16561656
generated_keyword: bool,
16571657
},
1658+
/// `METADATA FROM 'key'`
1659+
///
1660+
/// A special type of column that gets its value from metadata
1661+
/// associated with the record.
1662+
///
1663+
/// Example:
1664+
/// ```sql
1665+
/// CREATE TABLE logs (
1666+
/// id TEXT,
1667+
/// kafka_topic STRING METADATA FROM 'topic',
1668+
/// log TEXT
1669+
/// )
1670+
/// ```
1671+
MetadataField(String, Span),
16581672
/// BigQuery specific: Explicit column options in a view [1] or table [2]
16591673
/// Syntax
16601674
/// ```sql
@@ -1745,6 +1759,7 @@ impl fmt::Display for ColumnOption {
17451759
Collation(n) => write!(f, "COLLATE {n}"),
17461760
Comment(v) => write!(f, "COMMENT '{}'", escape_single_quote_string(v)),
17471761
OnUpdate(expr) => write!(f, "ON UPDATE {expr}"),
1762+
MetadataField(key, _) => write!(f, "METADATA FROM '{}'", escape_single_quote_string(key)),
17481763
Generated {
17491764
generated_as,
17501765
sequence_options,

src/ast/spans.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ impl Spanned for ColumnOption {
775775
ColumnOption::OnConflict(..) => Span::empty(),
776776
ColumnOption::Policy(..) => Span::empty(),
777777
ColumnOption::Tags(..) => Span::empty(),
778+
ColumnOption::MetadataField(_, span) => *span,
778779
}
779780
}
780781
}

src/parser/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7084,6 +7084,17 @@ impl<'a> Parser<'a> {
70847084
Ok(Some(ColumnOption::Null))
70857085
} else if self.parse_keyword(Keyword::DEFAULT) {
70867086
Ok(Some(ColumnOption::Default(self.parse_expr()?)))
7087+
} else if self.parse_keywords(&[Keyword::METADATA, Keyword::FROM])
7088+
&& dialect_of!(self is ArroyoDialect | GenericDialect)
7089+
{
7090+
// Parse metadata field syntax: METADATA FROM 'key'
7091+
let next_token = self.next_token();
7092+
match next_token.token {
7093+
Token::SingleQuotedString(value, ..) => {
7094+
Ok(Some(ColumnOption::MetadataField(value, next_token.span)))
7095+
}
7096+
_ => self.expected("string literal for metadata key", next_token),
7097+
}
70877098
} else if dialect_of!(self is ClickHouseDialect| GenericDialect)
70887099
&& self.parse_keyword(Keyword::MATERIALIZED)
70897100
{

tests/sqlparser_arroyo.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
#![warn(clippy::all)]
1414

15-
use sqlparser::ast::{BinaryOperator, Expr, Ident, Statement, TableConstraint};
15+
use sqlparser::ast::{BinaryOperator, ColumnOption, Expr, Ident, Statement, TableConstraint};
1616
use sqlparser::dialect::ArroyoDialect;
1717
use sqlparser::parser::Parser;
1818
use sqlparser::test_utils;
@@ -81,3 +81,37 @@ fn test_watermark_without_expr() {
8181
}]
8282
);
8383
}
84+
85+
#[test]
86+
fn test_metadata_field() {
87+
let sql = "CREATE TABLE logs (
88+
id TEXT,
89+
kafka_topic STRING METADATA FROM 'topic',
90+
log TEXT
91+
)";
92+
93+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
94+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
95+
panic!("not create table")
96+
};
97+
98+
assert_eq!(ct.columns.len(), 3);
99+
100+
// Check the middle column with METADATA FROM
101+
let column = &ct.columns[1];
102+
assert_eq!(column.name, Ident::new("kafka_topic"));
103+
104+
// Check for the METADATA FROM option
105+
let mut found_metadata = false;
106+
for option_def in &column.options {
107+
if let ColumnOption::MetadataField(key, _) = &option_def.option {
108+
found_metadata = true;
109+
assert_eq!(key, "topic");
110+
}
111+
}
112+
113+
assert!(
114+
found_metadata,
115+
"Expected METADATA FROM option in column definition"
116+
);
117+
}

0 commit comments

Comments
 (0)