Skip to content

Commit a8bd2f3

Browse files
authored
feat(iceberg): Introduce FileWithDeletes Compaction (#23660)
1 parent 063c21d commit a8bd2f3

File tree

17 files changed

+870
-177
lines changed

17 files changed

+870
-177
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,14 @@ hashbrown0_14 = { package = "hashbrown", version = "0.14", features = [
168168
] }
169169
hytra = "0.1"
170170
# branch dev_rebase_main_20250325
171-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0b5d05b20e8a5277a47585141082a7a638238ae1", features = [
171+
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "1d79e119ffb30bda0fafcb3114a17c77f46dc6fd", features = [
172172
"storage-s3",
173173
"storage-gcs",
174174
"storage-azblob",
175175
"storage-azdls",
176176
] }
177-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0b5d05b20e8a5277a47585141082a7a638238ae1" }
178-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0b5d05b20e8a5277a47585141082a7a638238ae1" }
177+
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "1d79e119ffb30bda0fafcb3114a17c77f46dc6fd" }
178+
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "1d79e119ffb30bda0fafcb3114a17c77f46dc6fd" }
179179

180180
indexmap = { version = "2.12.0", features = ["serde"] }
181181
itertools = "0.14.0"
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Test cases for iceberg sink compaction_type validation
2+
# This file tests validation logic for compaction_type parameter
3+
4+
statement ok
5+
DROP TABLE IF EXISTS t_validation;
6+
7+
statement ok
8+
CREATE TABLE t_validation (
9+
id bigint primary key,
10+
data varchar
11+
);
12+
13+
# Test case 1: Error - unknown compaction_type
14+
statement error Unknown compaction type 'invalid-type'
15+
CREATE SINK s_unknown AS SELECT * FROM t_validation WITH (
16+
connector = 'iceberg',
17+
type = 'upsert',
18+
primary_key = 'id',
19+
database.name = 'demo_db',
20+
table.name = 'test_unknown',
21+
catalog.name = 'demo',
22+
catalog.type = 'storage',
23+
warehouse.path = 's3a://icebergdata/demo',
24+
s3.endpoint = 'http://127.0.0.1:9301',
25+
s3.region = 'us-east-1',
26+
s3.access.key = 'hummockadmin',
27+
s3.secret.key = 'hummockadmin',
28+
create_table_if_not_exists = 'true',
29+
compaction.type = 'invalid_type'
30+
);
31+
32+
# Test case 2: Error - COW mode with non-full compaction_type (small_files)
33+
statement error Copy-on-write mode only supports 'full' compaction type
34+
CREATE SINK s_cow_small_files AS SELECT * FROM t_validation WITH (
35+
connector = 'iceberg',
36+
type = 'upsert',
37+
primary_key = 'id',
38+
database.name = 'demo_db',
39+
table.name = 'test_cow_small',
40+
catalog.name = 'demo',
41+
catalog.type = 'storage',
42+
warehouse.path = 's3a://icebergdata/demo',
43+
s3.endpoint = 'http://127.0.0.1:9301',
44+
s3.region = 'us-east-1',
45+
s3.access.key = 'hummockadmin',
46+
s3.secret.key = 'hummockadmin',
47+
create_table_if_not_exists = 'true',
48+
write_mode = 'copy-on-write',
49+
compaction.type = 'small_files'
50+
);
51+
52+
# Test case 3: Error - COW mode with files_with_delete
53+
statement error Copy-on-write mode only supports 'full' compaction type
54+
CREATE SINK s_cow_files_delete AS SELECT * FROM t_validation WITH (
55+
connector = 'iceberg',
56+
type = 'upsert',
57+
primary_key = 'id',
58+
database.name = 'demo_db',
59+
table.name = 'test_cow_delete',
60+
catalog.name = 'demo',
61+
catalog.type = 'storage',
62+
warehouse.path = 's3a://icebergdata/demo',
63+
s3.endpoint = 'http://127.0.0.1:9301',
64+
s3.region = 'us-east-1',
65+
s3.access.key = 'hummockadmin',
66+
s3.secret.key = 'hummockadmin',
67+
create_table_if_not_exists = 'true',
68+
write_mode = 'copy-on-write',
69+
compaction.type = 'files_with_delete'
70+
);
71+
72+
# Test case 4: Error - small_files with conflicting parameter delete_files_count_threshold
73+
statement error `compaction.delete_files_count_threshold` is not supported for 'small-files' compaction type
74+
CREATE SINK s_small_files_conflict AS SELECT * FROM t_validation WITH (
75+
connector = 'iceberg',
76+
type = 'upsert',
77+
primary_key = 'id',
78+
database.name = 'demo_db',
79+
table.name = 'test_small_conflict',
80+
catalog.name = 'demo',
81+
catalog.type = 'storage',
82+
warehouse.path = 's3a://icebergdata/demo',
83+
s3.endpoint = 'http://127.0.0.1:9301',
84+
s3.region = 'us-east-1',
85+
s3.access.key = 'hummockadmin',
86+
s3.secret.key = 'hummockadmin',
87+
create_table_if_not_exists = 'true',
88+
write_mode = 'merge-on-read',
89+
compaction.type = 'small_files',
90+
compaction.delete_files_count_threshold = '5'
91+
);
92+
93+
# Test case 5: Error - files_with_delete with conflicting parameter small_files_threshold_mb
94+
statement error `compaction.small_files_threshold_mb` must not be set for 'files-with-delete' compaction type
95+
CREATE SINK s_delete_files_conflict AS SELECT * FROM t_validation WITH (
96+
connector = 'iceberg',
97+
type = 'upsert',
98+
primary_key = 'id',
99+
database.name = 'demo_db',
100+
table.name = 'test_delete_conflict',
101+
catalog.name = 'demo',
102+
catalog.type = 'storage',
103+
warehouse.path = 's3a://icebergdata/demo',
104+
s3.endpoint = 'http://127.0.0.1:9301',
105+
s3.region = 'us-east-1',
106+
s3.access.key = 'hummockadmin',
107+
s3.secret.key = 'hummockadmin',
108+
create_table_if_not_exists = 'true',
109+
write_mode = 'merge-on-read',
110+
compaction.type = 'files_with_delete',
111+
compaction.small_files_threshold_mb = '128'
112+
);
113+
114+
# Clean up
115+
statement ok
116+
DROP TABLE t_validation;

proto/iceberg_compaction.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ message IcebergCompactionTask {
3939

4040
// Small data file compaction task.
4141
SMALL_FILES = 2;
42+
43+
FILES_WITH_DELETE = 3;
4244
}
4345

4446
TaskType task_type = 3;

src/common/src/config/storage.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,6 @@ pub struct StorageConfig {
196196
pub time_travel_version_cache_capacity: u64,
197197

198198
// iceberg compaction
199-
#[serde(default = "default::storage::iceberg_compaction_target_file_size_mb")]
200-
pub iceberg_compaction_target_file_size_mb: u32,
201199
#[serde(default = "default::storage::iceberg_compaction_enable_validate")]
202200
pub iceberg_compaction_enable_validate: bool,
203201
#[serde(default = "default::storage::iceberg_compaction_max_record_batch_rows")]
@@ -224,10 +222,6 @@ pub struct StorageConfig {
224222
/// The smoothing factor for size estimation in iceberg compaction.(default: 0.3)
225223
#[serde(default = "default::storage::iceberg_compaction_size_estimation_smoothing_factor")]
226224
pub iceberg_compaction_size_estimation_smoothing_factor: f64,
227-
// For Small File Compaction
228-
/// The threshold for small file compaction in MB.
229-
#[serde(default = "default::storage::iceberg_compaction_small_file_threshold_mb")]
230-
pub iceberg_compaction_small_file_threshold_mb: u32,
231225
/// Multiplier for pending waiting parallelism budget for iceberg compaction task queue.
232226
/// Effective pending budget = `ceil(max_task_parallelism * multiplier)`. Default 4.0.
233227
/// Set < 1.0 to reduce buffering (may increase `PullTask` RPC frequency); set higher to batch more tasks.
@@ -1033,10 +1027,6 @@ pub mod default {
10331027
10
10341028
}
10351029

1036-
pub fn iceberg_compaction_target_file_size_mb() -> u32 {
1037-
1024
1038-
}
1039-
10401030
pub fn iceberg_compaction_enable_validate() -> bool {
10411031
false
10421032
}
@@ -1077,10 +1067,6 @@ pub mod default {
10771067
0.3
10781068
}
10791069

1080-
pub fn iceberg_compaction_small_file_threshold_mb() -> u32 {
1081-
32
1082-
}
1083-
10841070
pub fn iceberg_compaction_pending_parallelism_budget_multiplier() -> f32 {
10851071
4.0
10861072
}

src/config/docs.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,7 @@ This page is automatically generated by `./risedev generate-example-config`
171171
| iceberg_compaction_min_size_per_partition_mb | | 1024 |
172172
| iceberg_compaction_pending_parallelism_budget_multiplier | Multiplier for pending waiting parallelism budget for iceberg compaction task queue. Effective pending budget = `ceil(max_task_parallelism * multiplier)`. Default 4.0. Set < 1.0 to reduce buffering (may increase `PullTask` RPC frequency); set higher to batch more tasks. | 4.0 |
173173
| iceberg_compaction_size_estimation_smoothing_factor | The smoothing factor for size estimation in iceberg compaction.(default: 0.3) | 0.3 |
174-
| iceberg_compaction_small_file_threshold_mb | The threshold for small file compaction in MB. | 32 |
175174
| iceberg_compaction_target_binpack_group_size_mb | | 102400 |
176-
| iceberg_compaction_target_file_size_mb | | 1024 |
177175
| iceberg_compaction_task_parallelism_ratio | The ratio of iceberg compaction max parallelism to the number of CPU cores | 4.0 |
178176
| iceberg_compaction_write_parquet_max_row_group_rows | | 102400 |
179177
| imm_merge_threshold | The threshold for the number of immutable memtables to merge to a new imm. | 0 |

src/config/example.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ compactor_max_overlap_sst_count = 64
256256
compactor_max_preload_meta_file_count = 32
257257
vector_file_block_size_kb = 1024
258258
time_travel_version_cache_capacity = 10
259-
iceberg_compaction_target_file_size_mb = 1024
260259
iceberg_compaction_enable_validate = false
261260
iceberg_compaction_max_record_batch_rows = 1024
262261
iceberg_compaction_min_size_per_partition_mb = 1024
@@ -267,7 +266,6 @@ iceberg_compaction_enable_heuristic_output_parallelism = false
267266
iceberg_compaction_max_concurrent_closes = 8
268267
iceberg_compaction_enable_dynamic_size_estimation = true
269268
iceberg_compaction_size_estimation_smoothing_factor = 0.3
270-
iceberg_compaction_small_file_threshold_mb = 32
271269
iceberg_compaction_pending_parallelism_budget_multiplier = 4.0
272270
iceberg_compaction_target_binpack_group_size_mb = 102400
273271

src/connector/src/allow_alter_on_fly_fields.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<Stri
178178
"snapshot_expiration_retain_last".to_owned(),
179179
"snapshot_expiration_clear_expired_files".to_owned(),
180180
"snapshot_expiration_clear_expired_meta_data".to_owned(),
181-
"max_snapshots_num_before_compaction".to_owned(),
181+
"compaction.max_snapshots_num".to_owned(),
182+
"compaction.small_files_threshold_mb".to_owned(),
183+
"compaction.delete_files_count_threshold".to_owned(),
184+
"compaction.trigger_snapshot_count".to_owned(),
185+
"compaction.target_file_size_mb".to_owned(),
186+
"compaction.type".to_owned(),
182187
].into_iter().collect(),
183188
).unwrap();
184189
// KafkaConfig

0 commit comments

Comments
 (0)