Skip to content

Commit a7af93d

Browse files
tabVersiontab
andcommitted
feat: add periodic refresh table jobs & refactor ProgressTracker (#23737)
Co-authored-by: tab <[email protected]>
1 parent 83dfa89 commit a7af93d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1260
-465
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/workflows/pull-request.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ steps:
522522
- docker-compose#v5.5.0: *docker-compose
523523
# Only upload zipped files, otherwise the logs is too much.
524524
- ./ci/plugins/upload-failure-logs-zipped
525-
timeout_in_minutes: 25
525+
timeout_in_minutes: 30
526526
retry: *auto-retry
527527

528528
- label: "end-to-end test (deterministic simulation)"

docker/dashboards/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
control substitution on
2+
3+
statement ok
4+
create secret IF NOT EXISTS my_secret with (
5+
backend = 'meta'
6+
) as 'hummockadmin';
7+
8+
statement ok
9+
create connection IF NOT EXISTS my_conn
10+
with (
11+
type = 'iceberg',
12+
warehouse.path = 's3://hummock001/iceberg_connection',
13+
s3.access.key = secret my_secret,
14+
s3.secret.key = secret my_secret,
15+
s3.endpoint = 'http://127.0.0.1:9301',
16+
s3.region = 'us-west-2',
17+
catalog.type = 'storage',
18+
);
19+
20+
statement ok
21+
set iceberg_engine_connection = 'public.my_conn';
22+
23+
statement ok
24+
create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg;
25+
26+
statement ok
27+
insert into t_refresh values (1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c');
28+
29+
statement ok
30+
flush;
31+
32+
statement ok
33+
delete from t_refresh where id = 4;
34+
35+
statement ok
36+
flush;
37+
38+
39+
statement error
40+
create table iceberg_batch_table ( primary key (id) ) with (
41+
connector = 'iceberg',
42+
warehouse.path = 's3://hummock001/iceberg_connection',
43+
s3.access.key = secret my_secret,
44+
s3.secret.key = secret my_secret,
45+
s3.endpoint = 'http://127.0.0.1:9301',
46+
s3.region = 'us-west-2',
47+
catalog.type = 'storage',
48+
table.name = 't_refresh',
49+
database.name = 'public',
50+
refresh_interval_sec = 'abcdef'
51+
);
52+
----
53+
db error: ERROR: Failed to run the query
54+
55+
Caused by:
56+
Invalid Parameter Value: `refresh_interval_sec` must be a positive integer and larger than 0, but got: abcdef (error: invalid digit found in string)
57+
58+
59+
statement error
60+
create table iceberg_batch_table ( primary key (id) ) with (
61+
connector = 'iceberg',
62+
warehouse.path = 's3://hummock001/iceberg_connection',
63+
s3.access.key = secret my_secret,
64+
s3.secret.key = secret my_secret,
65+
s3.endpoint = 'http://127.0.0.1:9301',
66+
s3.region = 'us-west-2',
67+
catalog.type = 'storage',
68+
table.name = 't_refresh',
69+
database.name = 'public',
70+
refresh_interval_sec = '-10'
71+
);
72+
----
73+
db error: ERROR: Failed to run the query
74+
75+
Caused by:
76+
Invalid Parameter Value: `refresh_interval_sec` must be larger than 0, but got: -10
77+
78+
79+
statement error
80+
create table iceberg_batch_table ( primary key (id) ) with (
81+
connector = 'iceberg',
82+
warehouse.path = 's3://hummock001/iceberg_connection',
83+
s3.access.key = secret my_secret,
84+
s3.secret.key = secret my_secret,
85+
s3.endpoint = 'http://127.0.0.1:9301',
86+
s3.region = 'us-west-2',
87+
catalog.type = 'storage',
88+
table.name = 't_refresh',
89+
database.name = 'public',
90+
refresh_interval_sec = '15'
91+
);
92+
----
93+
db error: ERROR: Failed to run the query
94+
95+
Caused by:
96+
Invalid Parameter Value: `refresh_interval_sec` is not allowed when `refresh_mode` is not 'FULL_RELOAD'
97+
98+
99+
statement ok
100+
create table iceberg_batch_table ( primary key (id) ) with (
101+
connector = 'iceberg',
102+
warehouse.path = 's3://hummock001/iceberg_connection',
103+
s3.access.key = secret my_secret,
104+
s3.secret.key = secret my_secret,
105+
s3.endpoint = 'http://127.0.0.1:9301',
106+
s3.region = 'us-west-2',
107+
catalog.type = 'storage',
108+
table.name = 't_refresh',
109+
database.name = 'public',
110+
refresh_mode = 'FULL_RELOAD',
111+
refresh_interval_sec = '5'
112+
);
113+
114+
query ?
115+
select current_status, trigger_interval_secs from rw_refresh_table_state;
116+
----
117+
IDLE 5
118+
119+
120+
# wait for refresh
121+
sleep 20s
122+
123+
124+
query ? retry 3 backoff 1s
125+
select id from t_refresh order by id;
126+
----
127+
1
128+
2
129+
3
130+
5
131+
132+
133+
query ? retry 3 backoff 1s
134+
select id from iceberg_batch_table order by id;
135+
----
136+
1
137+
2
138+
3
139+
5
140+
141+
142+
statement ok
143+
insert into t_refresh values (6, 'aaa', 'a') , (7, 'bbb', 'b') , (8, 'ccc', 'c');
144+
145+
statement ok
146+
flush;
147+
148+
sleep 20s
149+
150+
query ? retry 3 backoff 1s
151+
select id from t_refresh order by id;
152+
----
153+
1
154+
2
155+
3
156+
5
157+
6
158+
7
159+
8
160+
161+
162+
query ? retry 3 backoff 1s
163+
select id from iceberg_batch_table order by id;
164+
----
165+
1
166+
2
167+
3
168+
5
169+
6
170+
7
171+
8
172+
173+
174+
# Cleanup
175+
176+
statement ok
177+
drop table iceberg_batch_table;
178+
179+
statement ok
180+
drop table t_refresh;
181+
182+
statement ok
183+
drop connection my_conn;
184+
185+
statement ok
186+
drop secret my_secret;

e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ flush;
3737

3838
sleep 10s
3939

40-
query ?
40+
query ? retry 5 backoff 1s
4141
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes';
4242
----
4343
t
@@ -68,13 +68,25 @@ create table iceberg_batch_table ( primary key (id) ) with (
6868
catalog.type = 'storage',
6969
table.name = 't_refresh',
7070
database.name = 'public',
71-
refresh_mode = 'FULL_RECOMPUTE'
71+
refresh_mode = 'FULL_RELOAD'
7272
);
7373

7474

75+
query ?
76+
select current_status from rw_refresh_table_state;
77+
----
78+
IDLE
79+
80+
7581
statement ok
7682
refresh table iceberg_batch_table;
7783

84+
query ?
85+
select current_status from rw_refresh_table_state;
86+
----
87+
REFRESHING
88+
89+
7890
sleep 10s
7991

8092
query I
@@ -112,16 +124,23 @@ t
112124
statement ok
113125
refresh table iceberg_batch_table;
114126

115-
sleep 10s
127+
sleep 15s
116128

117-
query I
129+
query I retry 3 backoff 1s
118130
select id from t_refresh order by id;
119131
----
120132
1
121133
2
122134

123135

124-
query I
136+
# make sure the refresh finished
137+
query ?
138+
select current_status from rw_refresh_table_state;
139+
----
140+
IDLE
141+
142+
143+
query I retry 3 backoff 1s
125144
select id from iceberg_batch_table order by id;
126145
----
127146
1

grafana/dashboard/dev/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def section(func: Callable[[Panels], list]):
3838
from . import kinesis_metrics as _
3939
from . import system_params as _
4040
from . import vector_search as _
41+
from . import refresh_manager as _
4142

4243
def generate_panels(panels: Panels):
4344
return [x for s in sections for x in s(panels)]
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from ..common import *
2+
from . import section
3+
4+
@section
5+
def _(outer_panels: Panels):
6+
panels = outer_panels.sub_panel()
7+
return [
8+
outer_panels.row_collapsed(
9+
"Refresh Manager",
10+
[
11+
panels.timeseries_latency(
12+
"Refresh Job Duration",
13+
"Time taken to complete a refresh job",
14+
[
15+
panels.target(
16+
f"{metric('meta_refresh_job_duration')}",
17+
"{{table_id}} - {{status}}"
18+
)
19+
]
20+
),
21+
panels.timeseries_count(
22+
"Refresh Job Finish Rate",
23+
"Number of finished refresh jobs",
24+
[
25+
panels.target(
26+
f"{metric('meta_refresh_job_finish_cnt')}",
27+
"{{status}}"
28+
)
29+
]
30+
),
31+
panels.timeseries_count(
32+
"Refresh Cron Job Triggers",
33+
"Number of cron refresh jobs triggered",
34+
[
35+
panels.target(
36+
f"rate({metric('meta_refresh_cron_job_trigger_cnt')}[$__rate_interval])",
37+
"{{table_id}}"
38+
)
39+
]
40+
),
41+
panels.timeseries_count(
42+
"Refresh Cron Job Misses",
43+
"Number of skipped cron refresh jobs",
44+
[
45+
panels.target(
46+
f"rate({metric('meta_refresh_cron_job_miss_cnt')}[$__rate_interval])",
47+
f"{{table_id}}"
48+
)
49+
]
50+
),
51+
]
52+
)
53+
]

grafana/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

proto/catalog.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,8 @@ message Table {
549549

550550
optional CdcTableType cdc_table_type = 47;
551551

552-
// Current refresh state of the table for refreshable tables
553-
optional RefreshState refresh_state = 48;
552+
reserved "refresh_state";
553+
reserved 48;
554554

555555
// Per-table catalog version, used by schema change. `None` for internal
556556
// tables and tests. Not to be confused with the global catalog version for

proto/meta.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,18 @@ message ListCdcProgressResponse {
406406
map<uint32, CdcProgress> cdc_progress = 1;
407407
}
408408

409+
message ListRefreshTableStatesRequest {}
410+
411+
message ListRefreshTableStatesResponse {
412+
message RefreshTableState {
413+
uint32 table_id = 1;
414+
string current_status = 2;
415+
optional string last_trigger_time = 3;
416+
optional int64 trigger_interval_secs = 4;
417+
}
418+
repeated RefreshTableState states = 1;
419+
}
420+
409421
service StreamManagerService {
410422
rpc Flush(FlushRequest) returns (FlushResponse);
411423
rpc Pause(PauseRequest) returns (PauseResponse);
@@ -429,6 +441,7 @@ service StreamManagerService {
429441
rpc SetSyncLogStoreAligned(SetSyncLogStoreAlignedRequest) returns (SetSyncLogStoreAlignedResponse);
430442
rpc ListCdcProgress(ListCdcProgressRequest) returns (ListCdcProgressResponse);
431443
rpc ListUnmigratedTables(ListUnmigratedTablesRequest) returns (ListUnmigratedTablesResponse);
444+
rpc ListRefreshTableStates(ListRefreshTableStatesRequest) returns (ListRefreshTableStatesResponse);
432445
}
433446

434447
// Below for cluster service.

0 commit comments

Comments
 (0)