Skip to content

Commit f03189e

Browse files
BugenZhaoStanding-Man
authored andcommitted
feat(parser): support ALTER .. [SET | RESET] CONFIG (risingwavelabs#23742)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 2a5a0a6 commit f03189e

File tree

6 files changed

+226
-13
lines changed

6 files changed

+226
-13
lines changed

e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ alter materialized view rl_mv2 set source_rate_limit = 1000;
122122
db error: ERROR: Failed to run the query
123123

124124
Caused by:
125-
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit
125+
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT/CONFIG after SET, found: source_rate_limit
126126
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
127127
^
128128

src/frontend/src/handler/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,12 @@ pub async fn handle(
866866
)
867867
.await
868868
}
869+
AlterTableOperation::SetConfig { .. } => {
870+
bail_not_implemented!("ALTER TABLE SET CONFIG")
871+
}
872+
AlterTableOperation::ResetConfig { .. } => {
873+
bail_not_implemented!("ALTER TABLE RESET CONFIG")
874+
}
869875
AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
870876
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
871877
handler_args,
@@ -915,6 +921,12 @@ pub async fn handle(
915921
)
916922
.await
917923
}
924+
AlterIndexOperation::SetConfig { .. } => {
925+
bail_not_implemented!("ALTER INDEX SET CONFIG")
926+
}
927+
AlterIndexOperation::ResetConfig { .. } => {
928+
bail_not_implemented!("ALTER INDEX RESET CONFIG")
929+
}
918930
},
919931
Statement::AlterView {
920932
materialized,
@@ -1030,6 +1042,18 @@ pub async fn handle(
10301042
}
10311043
alter_mv::handle_alter_mv(handler_args, name, query).await
10321044
}
1045+
AlterViewOperation::SetConfig { .. } => {
1046+
if !materialized {
1047+
bail!("SET CONFIG is only supported for materialized views");
1048+
}
1049+
bail_not_implemented!("ALTER MATERIALIZED VIEW SET CONFIG")
1050+
}
1051+
AlterViewOperation::ResetConfig { .. } => {
1052+
if !materialized {
1053+
bail!("RESET CONFIG is only supported for materialized views");
1054+
}
1055+
bail_not_implemented!("ALTER MATERIALIZED VIEW RESET CONFIG")
1056+
}
10331057
}
10341058
}
10351059

@@ -1072,6 +1096,12 @@ pub async fn handle(
10721096
)
10731097
.await
10741098
}
1099+
AlterSinkOperation::SetConfig { .. } => {
1100+
bail_not_implemented!("ALTER SINK SET CONFIG")
1101+
}
1102+
AlterSinkOperation::ResetConfig { .. } => {
1103+
bail_not_implemented!("ALTER SINK RESET CONFIG")
1104+
}
10751105
AlterSinkOperation::SwapRenameSink { target_sink } => {
10761106
alter_swap_rename::handle_swap_rename(
10771107
handler_args,
@@ -1207,6 +1237,12 @@ pub async fn handle(
12071237
)
12081238
.await
12091239
}
1240+
AlterSourceOperation::SetConfig { .. } => {
1241+
bail_not_implemented!("ALTER SOURCE SET CONFIG")
1242+
}
1243+
AlterSourceOperation::ResetConfig { .. } => {
1244+
bail_not_implemented!("ALTER SOURCE RESET CONFIG")
1245+
}
12101246
},
12111247
Statement::AlterFunction {
12121248
name,

src/sqlparser/src/ast/ddl.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ pub enum AlterTableOperation {
9696
parallelism: SetVariableValue,
9797
deferred: bool,
9898
},
99+
/// `SET CONFIG (key = value, ...)`
100+
SetConfig {
101+
entries: Vec<SqlOption>,
102+
},
103+
/// `RESET CONFIG (key, ...)`
104+
ResetConfig {
105+
keys: Vec<ObjectName>,
106+
},
99107
RefreshSchema,
100108
/// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
101109
SetSourceRateLimit {
@@ -132,6 +140,14 @@ pub enum AlterIndexOperation {
132140
parallelism: SetVariableValue,
133141
deferred: bool,
134142
},
143+
/// `SET CONFIG (key = value, ...)`
144+
SetConfig {
145+
entries: Vec<SqlOption>,
146+
},
147+
/// `RESET CONFIG (key, ...)`
148+
ResetConfig {
149+
keys: Vec<ObjectName>,
150+
},
135151
}
136152

137153
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -171,6 +187,14 @@ pub enum AlterViewOperation {
171187
AsQuery {
172188
query: Box<Query>,
173189
},
190+
/// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191+
SetConfig {
192+
entries: Vec<SqlOption>,
193+
},
194+
/// `RESET CONFIG ( streaming.some_config_key, .. )`
195+
ResetConfig {
196+
keys: Vec<ObjectName>,
197+
},
174198
}
175199

176200
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -189,6 +213,14 @@ pub enum AlterSinkOperation {
189213
parallelism: SetVariableValue,
190214
deferred: bool,
191215
},
216+
/// `SET CONFIG (key = value, ...)`
217+
SetConfig {
218+
entries: Vec<SqlOption>,
219+
},
220+
/// `RESET CONFIG (key, ...)`
221+
ResetConfig {
222+
keys: Vec<ObjectName>,
223+
},
192224
/// `SWAP WITH <sink_name>`
193225
SwapRenameSink {
194226
target_sink: ObjectName,
@@ -241,6 +273,14 @@ pub enum AlterSourceOperation {
241273
parallelism: SetVariableValue,
242274
deferred: bool,
243275
},
276+
/// `SET CONFIG (key = value, ...)`
277+
SetConfig {
278+
entries: Vec<SqlOption>,
279+
},
280+
/// `RESET CONFIG (key, ...)`
281+
ResetConfig {
282+
keys: Vec<ObjectName>,
283+
},
244284
AlterConnectorProps {
245285
alter_props: Vec<SqlOption>,
246286
},
@@ -366,6 +406,12 @@ impl fmt::Display for AlterTableOperation {
366406
if *deferred { " DEFERRED" } else { "" }
367407
)
368408
}
409+
AlterTableOperation::SetConfig { entries } => {
410+
write!(f, "SET CONFIG ({})", display_comma_separated(entries))
411+
}
412+
AlterTableOperation::ResetConfig { keys } => {
413+
write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
414+
}
369415
AlterTableOperation::RefreshSchema => {
370416
write!(f, "REFRESH SCHEMA")
371417
}
@@ -412,6 +458,12 @@ impl fmt::Display for AlterIndexOperation {
412458
if *deferred { " DEFERRED" } else { "" }
413459
)
414460
}
461+
AlterIndexOperation::SetConfig { entries } => {
462+
write!(f, "SET CONFIG ({})", display_comma_separated(entries))
463+
}
464+
AlterIndexOperation::ResetConfig { keys } => {
465+
write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
466+
}
415467
}
416468
}
417469
}
@@ -463,6 +515,12 @@ impl fmt::Display for AlterViewOperation {
463515
AlterViewOperation::AsQuery { query } => {
464516
write!(f, "AS {}", query)
465517
}
518+
AlterViewOperation::SetConfig { entries } => {
519+
write!(f, "SET CONFIG ({})", display_comma_separated(entries))
520+
}
521+
AlterViewOperation::ResetConfig { keys } => {
522+
write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
523+
}
466524
}
467525
}
468526
}
@@ -490,6 +548,12 @@ impl fmt::Display for AlterSinkOperation {
490548
if *deferred { " DEFERRED" } else { "" }
491549
)
492550
}
551+
AlterSinkOperation::SetConfig { entries } => {
552+
write!(f, "SET CONFIG ({})", display_comma_separated(entries))
553+
}
554+
AlterSinkOperation::ResetConfig { keys } => {
555+
write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
556+
}
493557
AlterSinkOperation::SwapRenameSink { target_sink } => {
494558
write!(f, "SWAP WITH {}", target_sink)
495559
}
@@ -571,6 +635,12 @@ impl fmt::Display for AlterSourceOperation {
571635
if *deferred { " DEFERRED" } else { "" }
572636
)
573637
}
638+
AlterSourceOperation::SetConfig { entries } => {
639+
write!(f, "SET CONFIG ({})", display_comma_separated(entries))
640+
}
641+
AlterSourceOperation::ResetConfig { keys } => {
642+
write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
643+
}
574644
AlterSourceOperation::AlterConnectorProps { alter_props } => {
575645
write!(
576646
f,

src/sqlparser/src/keywords.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ define_keywords!(
138138
COMMITTED,
139139
CONCURRENTLY,
140140
CONDITION,
141+
CONFIG,
141142
CONFLICT,
142143
CONFLUENT,
143144
CONNECT,

src/sqlparser/src/parser.rs

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,9 +3270,20 @@ impl Parser<'_> {
32703270
AlterTableOperation::SetBackfillRateLimit { rate_limit }
32713271
} else if let Some(rate_limit) = self.parse_alter_dml_rate_limit()? {
32723272
AlterTableOperation::SetDmlRateLimit { rate_limit }
3273+
} else if self.parse_keyword(Keyword::CONFIG) {
3274+
let entries = self.parse_options()?;
3275+
AlterTableOperation::SetConfig { entries }
32733276
} else {
3274-
return self
3275-
.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT/DML_RATE_LIMIT after SET");
3277+
return self.expected(
3278+
"SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT/DML_RATE_LIMIT/CONFIG after SET",
3279+
);
3280+
}
3281+
} else if self.parse_keyword(Keyword::RESET) {
3282+
if self.parse_keyword(Keyword::CONFIG) {
3283+
let keys = self.parse_parenthesized_object_name_list()?;
3284+
AlterTableOperation::ResetConfig { keys }
3285+
} else {
3286+
return self.expected("CONFIG after RESET");
32763287
}
32773288
} else if self.parse_keyword(Keyword::DROP) {
32783289
let _ = self.parse_keyword(Keyword::COLUMN);
@@ -3325,7 +3336,7 @@ impl Parser<'_> {
33253336
}
33263337
} else {
33273338
return self.expected(
3328-
"ADD or RENAME or OWNER TO or SET or DROP or SWAP or CONNECTOR after ALTER TABLE",
3339+
"ADD or RENAME or OWNER TO or SET or RESET or DROP or SWAP or CONNECTOR after ALTER TABLE",
33293340
);
33303341
};
33313342
Ok(Statement::AlterTable {
@@ -3426,11 +3437,21 @@ impl Parser<'_> {
34263437
parallelism: value,
34273438
deferred,
34283439
}
3440+
} else if self.parse_keyword(Keyword::CONFIG) {
3441+
let entries = self.parse_options()?;
3442+
AlterIndexOperation::SetConfig { entries }
34293443
} else {
3430-
return self.expected("PARALLELISM after SET");
3444+
return self.expected("PARALLELISM or CONFIG after SET");
3445+
}
3446+
} else if self.parse_keyword(Keyword::RESET) {
3447+
if self.parse_keyword(Keyword::CONFIG) {
3448+
let keys = self.parse_parenthesized_object_name_list()?;
3449+
AlterIndexOperation::ResetConfig { keys }
3450+
} else {
3451+
return self.expected("CONFIG after RESET");
34313452
}
34323453
} else {
3433-
return self.expected("RENAME after ALTER INDEX");
3454+
return self.expected("RENAME, SET, or RESET after ALTER INDEX");
34343455
};
34353456

34363457
Ok(Statement::AlterIndex {
@@ -3507,8 +3528,11 @@ impl Parser<'_> {
35073528
&& let Some(rate_limit) = self.parse_alter_backfill_rate_limit()?
35083529
{
35093530
AlterViewOperation::SetBackfillRateLimit { rate_limit }
3531+
} else if self.parse_keyword(Keyword::CONFIG) && materialized {
3532+
let entries = self.parse_options()?;
3533+
AlterViewOperation::SetConfig { entries }
35103534
} else {
3511-
return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET");
3535+
return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT/CONFIG after SET");
35123536
}
35133537
} else if self.parse_keyword(Keyword::RESET) {
35143538
if self.parse_keyword(Keyword::RESOURCE_GROUP) && materialized {
@@ -3518,8 +3542,11 @@ impl Parser<'_> {
35183542
resource_group: None,
35193543
deferred,
35203544
}
3545+
} else if self.parse_keyword(Keyword::CONFIG) && materialized {
3546+
let keys = self.parse_parenthesized_object_name_list()?;
3547+
AlterViewOperation::ResetConfig { keys }
35213548
} else {
3522-
return self.expected("RESOURCE_GROUP after RESET");
3549+
return self.expected("RESOURCE_GROUP or CONFIG after RESET");
35233550
}
35243551
} else {
35253552
return self.expected(&format!(
@@ -3597,8 +3624,20 @@ impl Parser<'_> {
35973624
}
35983625
} else if let Some(rate_limit) = self.parse_alter_sink_rate_limit()? {
35993626
AlterSinkOperation::SetSinkRateLimit { rate_limit }
3627+
} else if self.parse_keyword(Keyword::CONFIG) {
3628+
let entries = self.parse_options()?;
3629+
AlterSinkOperation::SetConfig { entries }
3630+
} else {
3631+
return self.expected(
3632+
"SCHEMA/PARALLELISM/SINK_RATE_LIMIT/STREAMING_ENABLE_UNALIGNED_JOIN/CONFIG after SET",
3633+
);
3634+
}
3635+
} else if self.parse_keyword(Keyword::RESET) {
3636+
if self.parse_keyword(Keyword::CONFIG) {
3637+
let keys = self.parse_parenthesized_object_name_list()?;
3638+
AlterSinkOperation::ResetConfig { keys }
36003639
} else {
3601-
return self.expected("SCHEMA/PARALLELISM after SET");
3640+
return self.expected("CONFIG after RESET");
36023641
}
36033642
} else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) {
36043643
let target_sink = self.parse_object_name()?;
@@ -3609,7 +3648,8 @@ impl Parser<'_> {
36093648
alter_props: changed_props,
36103649
}
36113650
} else {
3612-
return self.expected("RENAME or OWNER TO or SET or CONNECTOR WITH after ALTER SINK");
3651+
return self
3652+
.expected("RENAME or OWNER TO or SET or RESET or CONNECTOR WITH after ALTER SINK");
36133653
};
36143654

36153655
Ok(Statement::AlterSink {
@@ -3697,8 +3737,18 @@ impl Parser<'_> {
36973737
parallelism: value,
36983738
deferred,
36993739
}
3740+
} else if self.parse_keyword(Keyword::CONFIG) {
3741+
let entries = self.parse_options()?;
3742+
AlterSourceOperation::SetConfig { entries }
37003743
} else {
3701-
return self.expected("SCHEMA, SOURCE_RATE_LIMIT or PARALLELISM after SET");
3744+
return self.expected("SCHEMA, SOURCE_RATE_LIMIT, PARALLELISM or CONFIG after SET");
3745+
}
3746+
} else if self.parse_keyword(Keyword::RESET) {
3747+
if self.parse_keyword(Keyword::CONFIG) {
3748+
let keys = self.parse_parenthesized_object_name_list()?;
3749+
AlterSourceOperation::ResetConfig { keys }
3750+
} else {
3751+
return self.expected("CONFIG after RESET");
37023752
}
37033753
} else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) {
37043754
let format_encode = self.parse_schema()?.unwrap();
@@ -3717,8 +3767,9 @@ impl Parser<'_> {
37173767
alter_props: with_options,
37183768
}
37193769
} else {
3720-
return self
3721-
.expected("RENAME, ADD COLUMN, OWNER TO, CONNECTOR or SET after ALTER SOURCE");
3770+
return self.expected(
3771+
"RENAME, ADD COLUMN, OWNER TO, CONNECTOR, SET or RESET after ALTER SOURCE",
3772+
);
37223773
};
37233774

37243775
Ok(Statement::AlterSource {
@@ -4196,6 +4247,17 @@ impl Parser<'_> {
41964247
Ok(ObjectName(idents))
41974248
}
41984249

4250+
/// Parse a parenthesized comma-separated list of object names
4251+
pub fn parse_parenthesized_object_name_list(&mut self) -> ModalResult<Vec<ObjectName>> {
4252+
if self.consume_token(&Token::LParen) {
4253+
let names = self.parse_comma_separated(Parser::parse_object_name)?;
4254+
self.expect_token(&Token::RParen)?;
4255+
Ok(names)
4256+
} else {
4257+
self.expected("a list of object names in parentheses")
4258+
}
4259+
}
4260+
41994261
/// Parse identifiers strictly i.e. don't parse keywords
42004262
pub fn parse_identifiers_non_keywords(&mut self) -> ModalResult<Vec<Ident>> {
42014263
let mut idents = vec![];

0 commit comments

Comments
 (0)