Skip to content

Commit f841479

Browse files
authored
feat: manage all catalogs of the iceberg table in a transactional manner (#23597)
1 parent ebfe5ec commit f841479

File tree

21 files changed

+537
-319
lines changed

21 files changed

+537
-319
lines changed

e2e_test/iceberg/test_case/pure_slt/iceberg_engine.slt

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -90,34 +90,6 @@ DROP TABLE full_type_t;
9090

9191
# test connector
9292

93-
statement error
94-
create table datagen_t (
95-
id BIGINT,
96-
item_name VARCHAR,
97-
description VARCHAR,
98-
initial_bid BIGINT,
99-
reserve BIGINT,
100-
date_time TIMESTAMP,
101-
expires TIMESTAMP,
102-
seller BIGINT,
103-
category BIGINT,
104-
extra VARCHAR)
105-
with (
106-
connector = 'datagen',
107-
fields.id.kind = 'sequence',
108-
fields.id.start = '1',
109-
commit_checkpoint_interval = 1,
110-
source_rate_limit = 1000
111-
) engine = iceberg;
112-
----
113-
db error: ERROR: Failed to run the query
114-
115-
Caused by:
116-
Not supported: source_rate_limit for iceberg table engine during table creation
117-
HINT: Please remove source_rate_limit from WITH options.
118-
119-
120-
12193
statement ok
12294
create table datagen_t (
12395
id BIGINT,

e2e_test/iceberg/test_case/pure_slt/iceberg_engine_append_only.slt

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -92,35 +92,6 @@ DROP TABLE full_type_t;
9292

9393
# test connector
9494

95-
statement error
96-
create table datagen_t (
97-
id BIGINT,
98-
item_name VARCHAR,
99-
description VARCHAR,
100-
initial_bid BIGINT,
101-
reserve BIGINT,
102-
date_time TIMESTAMP,
103-
expires TIMESTAMP,
104-
seller BIGINT,
105-
category BIGINT,
106-
extra VARCHAR)
107-
APPEND ONLY
108-
with (
109-
connector = 'datagen',
110-
fields.id.kind = 'sequence',
111-
fields.id.start = '1',
112-
commit_checkpoint_interval = 1,
113-
source_rate_limit = 1000
114-
) engine = iceberg;
115-
----
116-
db error: ERROR: Failed to run the query
117-
118-
Caused by:
119-
Not supported: source_rate_limit for iceberg table engine during table creation
120-
HINT: Please remove source_rate_limit from WITH options.
121-
122-
123-
12495
statement ok
12596
create table datagen_t (
12697
id BIGINT,

src/common/src/catalog/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
137137
pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
138138
pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
139139

140+
pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
141+
pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
142+
140143
/// The local system catalog reader in the frontend node.
141144
pub trait SysCatalogReader: Sync + Send + 'static {
142145
/// Reads the data of the system catalog table.

src/frontend/src/catalog/table_catalog.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use std::collections::{HashMap, HashSet};
1818
use fixedbitset::FixedBitSet;
1919
use itertools::Itertools;
2020
use risingwave_common::catalog::{
21-
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22-
StreamJobStatus, TableDesc, TableId, TableVersionId,
21+
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22+
ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
2323
};
2424
use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
2525
use risingwave_common::id::{JobId, SourceId};
@@ -211,9 +211,6 @@ pub struct TableCatalog {
211211
pub cdc_table_type: Option<ExternalCdcTableType>,
212212
}
213213

214-
pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
215-
pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
216-
217214
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
218215
#[cfg_attr(test, derive(Default))]
219216
pub enum TableType {

src/frontend/src/handler/create_sink.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use pgwire::pg_response::{PgResponse, StatementType};
2424
use risingwave_common::array::arrow::IcebergArrowConvert;
2525
use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
2626
use risingwave_common::bail;
27-
use risingwave_common::catalog::{ColumnCatalog, ConnectionId, ObjectId, Schema, UserId};
27+
use risingwave_common::catalog::{
28+
ColumnCatalog, ConnectionId, ICEBERG_SINK_PREFIX, ObjectId, Schema, UserId,
29+
};
2830
use risingwave_common::license::Feature;
2931
use risingwave_common::secret::LocalSecretManager;
3032
use risingwave_common::system_param::reader::SystemParamsRead;
@@ -577,6 +579,13 @@ pub async fn handle_create_sink(
577579
return Ok(resp);
578580
}
579581

582+
if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
583+
return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
584+
"Sink name cannot start with reserved prefix '{}'",
585+
ICEBERG_SINK_PREFIX
586+
))));
587+
}
588+
580589
let (mut sink, graph, target_table_catalog, dependencies) = {
581590
let SinkPlanContext {
582591
query,

src/frontend/src/handler/create_source.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SC
116116
mod additional_column;
117117
use additional_column::check_and_add_timestamp_column;
118118
pub use additional_column::handle_addition_columns;
119+
use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
119120
use risingwave_common::id::SourceId;
120121

121122
use crate::stream_fragmenter::GraphJobType;
@@ -1073,6 +1074,17 @@ pub async fn handle_create_source(
10731074
return Ok(resp);
10741075
}
10751076

1077+
if stmt
1078+
.source_name
1079+
.base_name()
1080+
.starts_with(ICEBERG_SOURCE_PREFIX)
1081+
{
1082+
return Err(RwError::from(InvalidInputSyntax(format!(
1083+
"Source name cannot start with reserved prefix '{}'",
1084+
ICEBERG_SOURCE_PREFIX
1085+
))));
1086+
}
1087+
10761088
if handler_args.with_options.is_empty() {
10771089
return Err(RwError::from(InvalidInputSyntax(
10781090
"missing WITH clause".to_owned(),

src/frontend/src/handler/create_table.rs

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use pgwire::pg_response::{PgResponse, StatementType};
2626
use prost::Message as _;
2727
use risingwave_common::catalog::{
2828
CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29-
ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
29+
ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, ObjectId, RISINGWAVE_ICEBERG_ROW_ID,
30+
ROW_ID_COLUMN_NAME, TableId,
3031
};
3132
use risingwave_common::config::MetaBackend;
3233
use risingwave_common::global_jvm::Jvm;
@@ -61,12 +62,12 @@ use risingwave_sqlparser::ast::{
6162
use risingwave_sqlparser::parser::{IncludeOption, Parser};
6263
use thiserror_ext::AsReport;
6364

65+
use super::RwPgResponse;
6466
use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65-
use super::{RwPgResponse, alter_streaming_rate_limit};
6667
use crate::binder::{Clause, SecureCompareContext, bind_data_type};
6768
use crate::catalog::root_catalog::SchemaPath;
6869
use crate::catalog::source_catalog::SourceCatalog;
69-
use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70+
use crate::catalog::table_catalog::TableVersion;
7071
use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
7172
use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
7273
use crate::expr::{Expr, ExprImpl, ExprRewriter};
@@ -99,7 +100,6 @@ use risingwave_connector::sink::iceberg::{
99100
SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
100101
};
101102
use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
102-
use risingwave_pb::meta::PbThrottleTarget;
103103

104104
use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
105105

@@ -1394,7 +1394,7 @@ fn bind_cdc_table_schema(
13941394

13951395
#[allow(clippy::too_many_arguments)]
13961396
pub async fn handle_create_table(
1397-
mut handler_args: HandlerArgs,
1397+
handler_args: HandlerArgs,
13981398
table_name: ObjectName,
13991399
column_defs: Vec<ColumnDef>,
14001400
wildcard_idx: Option<usize>,
@@ -1422,21 +1422,6 @@ pub async fn handle_create_table(
14221422
risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
14231423
risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
14241424
};
1425-
if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1426-
// HACK: since we don't have atomic DDL, table with connector may lose data.
1427-
// FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1428-
if let Some(_rate_limit) = handler_args.with_options.insert(
1429-
OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1430-
"0".to_owned(),
1431-
) {
1432-
// prevent user specified rate limit
1433-
return Err(ErrorCode::NotSupported(
1434-
"source_rate_limit for iceberg table engine during table creation".to_owned(),
1435-
"Please remove source_rate_limit from WITH options.".to_owned(),
1436-
)
1437-
.into());
1438-
}
1439-
}
14401425

14411426
if let Either::Right(resp) = session.check_relation_name_duplicated(
14421427
table_name.clone(),
@@ -2174,8 +2159,6 @@ pub async fn create_iceberg_engine_table(
21742159
)
21752160
.await?;
21762161

2177-
let has_connector = source.is_some();
2178-
21792162
// before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
21802163
// If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
21812164
let _ = Jvm::get_or_init()?;
@@ -2211,17 +2194,6 @@ pub async fn create_iceberg_engine_table(
22112194
res?
22122195
}
22132196

2214-
// TODO: remove it together with rate limit rewrite after we support atomic DDL in meta side.
2215-
if has_connector {
2216-
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
2217-
handler_args,
2218-
PbThrottleTarget::TableWithSource,
2219-
table_name,
2220-
-1,
2221-
)
2222-
.await?;
2223-
}
2224-
22252197
Ok(())
22262198
}
22272199

src/frontend/src/handler/drop_sink.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use pgwire::pg_response::{PgResponse, StatementType};
16+
use risingwave_common::catalog::ICEBERG_SINK_PREFIX;
1617
use risingwave_sqlparser::ast::ObjectName;
1718

1819
use super::RwPgResponse;
@@ -56,6 +57,14 @@ pub async fn handle_drop_sink(
5657
sink
5758
};
5859

60+
if sink_name.starts_with(ICEBERG_SINK_PREFIX) {
61+
return Err(crate::error::ErrorCode::NotSupported(
62+
"Dropping Iceberg sinks is not supported".to_owned(),
63+
"Please use DROP TABLE command.".to_owned(),
64+
)
65+
.into());
66+
}
67+
5968
let sink_id = sink.id;
6069

6170
let catalog_writer = session.catalog_writer()?;

src/frontend/src/handler/drop_source.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use pgwire::pg_response::{PgResponse, StatementType};
16+
use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
1617
use risingwave_sqlparser::ast::ObjectName;
1718

1819
use super::RwPgResponse;
@@ -68,6 +69,14 @@ pub async fn handle_drop_source(
6869
}
6970
};
7071

72+
if source_name.starts_with(ICEBERG_SOURCE_PREFIX) {
73+
return Err(crate::error::ErrorCode::NotSupported(
74+
"Dropping Iceberg sources is not supported".to_owned(),
75+
"Please use DROP TABLE command.".to_owned(),
76+
)
77+
.into());
78+
}
79+
7180
session.check_privilege_for_drop_alter(schema_name, &*source)?;
7281

7382
let catalog_writer = session.catalog_writer()?;

0 commit comments

Comments
 (0)