Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
682af22
feat(refresh): implement coordinated refresh progress tracking
tabVersion Nov 11, 2025
837a81d
refactor(refresh): enhance query retry logic and logging
Nov 12, 2025
91bb08f
refactor(refresh): remove deprecated refresh state handling
Nov 12, 2025
d06a319
feat(refresh): add ListRefreshTableStates RPC and related structures
Nov 12, 2025
7d71cd9
feat(refresh): introduce source refresh mode and update related struc…
Nov 12, 2025
07bf149
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 12, 2025
fcd5247
fix
Nov 12, 2025
94a6a81
fix slt
Nov 12, 2025
b2e223c
fix proto breaking change
Nov 12, 2025
1122be1
remove deprecated refresh_state field from catalog manager
Nov 12, 2025
8910e5d
longer waiting time
Nov 13, 2025
193e047
Merge branch 'main' into tab/refactor-tracker-2
tabVersion Nov 13, 2025
11eba47
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 17, 2025
d13841e
chore(ci): increase timeout for pull request workflow from 25 to 30 m…
Nov 17, 2025
d1324e8
feat: support setting `refresh_interval_sec` (#23780)
tabVersion Nov 18, 2025
5082350
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 18, 2025
85d8183
feat(refresh): implement refresh manager metrics and dashboard integr…
Nov 18, 2025
fd91964
fix metrics
Nov 18, 2025
3eb063a
fix: update refresh mode terminology from 'FULL_RECOMPUTE' to 'FULL_R…
Nov 18, 2025
a93e705
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 18, 2025
e4ed8f7
fix
Nov 18, 2025
419d8ce
fix ci
Nov 18, 2025
f953f62
Merge branch 'main' into tab/refactor-tracker-2
tabVersion Nov 23, 2025
31b7b1d
logging
Nov 24, 2025
80ba362
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 24, 2025
0f8b12c
ci settings
Nov 24, 2025
c6b88c4
fix: update stream refresh scheduler interval in CI configuration
Nov 24, 2025
22780c7
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 24, 2025
3d884da
fix(iceberg): add retry logic to query in refresh_iceberg_table.slt
Nov 24, 2025
b8026e6
Merge remote-tracking branch 'origin' into tab/refactor-tracker-2
Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ steps:
- docker-compose#v5.5.0: *docker-compose
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 25
timeout_in_minutes: 30
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
Expand Down
1 change: 0 additions & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
control substitution on

statement ok
create secret IF NOT EXISTS my_secret with (
backend = 'meta'
) as 'hummockadmin';

statement ok
create connection IF NOT EXISTS my_conn
with (
type = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
);

statement ok
set iceberg_engine_connection = 'public.my_conn';

statement ok
create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg;

statement ok
insert into t_refresh values (1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c');

statement ok
flush;

statement ok
delete from t_refresh where id = 4;

statement ok
flush;


statement error
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_interval_sec = 'abcdef'
);
----
db error: ERROR: Failed to run the query

Caused by:
Invalid Parameter Value: `refresh_interval_sec` must be a positive integer and larger than 0, but got: abcdef (error: invalid digit found in string)


statement error
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_interval_sec = '-10'
);
----
db error: ERROR: Failed to run the query

Caused by:
Invalid Parameter Value: `refresh_interval_sec` must be larger than 0, but got: -10


statement error
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_interval_sec = '15'
);
----
db error: ERROR: Failed to run the query

Caused by:
Invalid Parameter Value: `refresh_interval_sec` is not allowed when `refresh_mode` is not 'FULL_RELOAD'


statement ok
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = '5'
);

query ?
select current_status, trigger_interval_secs from rw_refresh_table_state;
----
IDLE 5


# wait for refresh
sleep 20s


query ? retry 3 backoff 1s
select id from t_refresh order by id;
----
1
2
3
5


query ? retry 3 backoff 1s
select id from iceberg_batch_table order by id;
----
1
2
3
5


statement ok
insert into t_refresh values (6, 'aaa', 'a') , (7, 'bbb', 'b') , (8, 'ccc', 'c');

statement ok
flush;

sleep 20s

query ? retry 3 backoff 1s
select id from t_refresh order by id;
----
1
2
3
5
6
7
8


query ? retry 3 backoff 1s
select id from iceberg_batch_table order by id;
----
1
2
3
5
6
7
8


# Cleanup

statement ok
drop table iceberg_batch_table;

statement ok
drop table t_refresh;

statement ok
drop connection my_conn;

statement ok
drop secret my_secret;
29 changes: 24 additions & 5 deletions e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ flush;

sleep 10s

query ?
query ? retry 5 backoff 1s
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes';
----
t
Expand Down Expand Up @@ -68,13 +68,25 @@ create table iceberg_batch_table ( primary key (id) ) with (
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_mode = 'FULL_RECOMPUTE'
refresh_mode = 'FULL_RELOAD'
);


query ?
select current_status from rw_refresh_table_state;
----
IDLE


statement ok
refresh table iceberg_batch_table;

query ?
select current_status from rw_refresh_table_state;
----
REFRESHING


sleep 10s

query I
Expand Down Expand Up @@ -112,16 +124,23 @@ t
statement ok
refresh table iceberg_batch_table;

sleep 10s
sleep 15s

query I
query I retry 3 backoff 1s
select id from t_refresh order by id;
----
1
2


query I
# make sure the refresh finished
query ?
select current_status from rw_refresh_table_state;
----
IDLE


query I retry 3 backoff 1s
select id from iceberg_batch_table order by id;
----
1
Expand Down
1 change: 1 addition & 0 deletions grafana/dashboard/dev/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def section(func: Callable[[Panels], list]):
from . import kinesis_metrics as _
from . import system_params as _
from . import vector_search as _
from . import refresh_manager as _

def generate_panels(panels: Panels):
return [x for s in sections for x in s(panels)]
53 changes: 53 additions & 0 deletions grafana/dashboard/dev/refresh_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from ..common import *
from . import section

@section
def _(outer_panels: Panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Refresh Manager",
[
panels.timeseries_latency(
"Refresh Job Duration",
"Time taken to complete a refresh job",
[
panels.target(
f"{metric('meta_refresh_job_duration')}",
"{{table_id}} - {{status}}"
)
]
),
panels.timeseries_count(
"Refresh Job Finish Rate",
"Number of finished refresh jobs",
[
panels.target(
f"{metric('meta_refresh_job_finish_cnt')}",
"{{status}}"
)
]
),
panels.timeseries_count(
"Refresh Cron Job Triggers",
"Number of cron refresh jobs triggered",
[
panels.target(
f"rate({metric('meta_refresh_cron_job_trigger_cnt')}[$__rate_interval])",
"{{table_id}}"
)
]
),
panels.timeseries_count(
"Refresh Cron Job Misses",
"Number of skipped cron refresh jobs",
[
panels.target(
f"rate({metric('meta_refresh_cron_job_miss_cnt')}[$__rate_interval])",
f"{{table_id}}"
)
]
),
]
)
]
1 change: 0 additions & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ message Table {

optional CdcTableType cdc_table_type = 47;

// Current refresh state of the table for refreshable tables
optional RefreshState refresh_state = 48;
reserved "refresh_state";
reserved 48;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
Expand Down
13 changes: 13 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ message ListCdcProgressResponse {
map<uint32, CdcProgress> cdc_progress = 1;
}

message ListRefreshTableStatesRequest {}

message ListRefreshTableStatesResponse {
message RefreshTableState {
uint32 table_id = 1;
string current_status = 2;
optional string last_trigger_time = 3;
optional int64 trigger_interval_secs = 4;
}
repeated RefreshTableState states = 1;
}

service StreamManagerService {
rpc Flush(FlushRequest) returns (FlushResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
Expand All @@ -429,6 +441,7 @@ service StreamManagerService {
rpc SetSyncLogStoreAligned(SetSyncLogStoreAlignedRequest) returns (SetSyncLogStoreAlignedResponse);
rpc ListCdcProgress(ListCdcProgressRequest) returns (ListCdcProgressResponse);
rpc ListUnmigratedTables(ListUnmigratedTablesRequest) returns (ListUnmigratedTablesResponse);
rpc ListRefreshTableStates(ListRefreshTableStatesRequest) returns (ListRefreshTableStatesResponse);
}

// Below for cluster service.
Expand Down
6 changes: 4 additions & 2 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,12 @@

message SourceRefreshMode {
message SourceRefreshModeStreaming {}
message SourceRefreshModeFullRecompute {}
message SourceRefreshModeFullReload {
optional int64 refresh_interval_sec = 1;
}

oneof refresh_mode {
SourceRefreshModeStreaming streaming = 1;
SourceRefreshModeFullRecompute full_recompute = 2;
SourceRefreshModeFullReload full_reload = 2;

Check failure on line 316 in proto/plan_common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "SourceRefreshMode" changed name from "full_recompute" to "full_reload".

Check failure on line 316 in proto/plan_common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "full_reload" on message "SourceRefreshMode" changed type from "plan_common.SourceRefreshMode.SourceRefreshModeFullRecompute" to "plan_common.SourceRefreshMode.SourceRefreshModeFullReload".

Check failure on line 316 in proto/plan_common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "full_reload" on message "SourceRefreshMode" changed option "json_name" from "fullRecompute" to "fullReload".
}
}
1 change: 1 addition & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ profile:
- use: frontend

ci-iceberg-test:
config-path: src/config/ci.toml
steps:
- use: minio
- use: mysql
Expand Down
Loading
Loading