Skip to content

Commit 682af22

Browse files
committed
feat(refresh): implement coordinated refresh progress tracking
- Introduced `RefreshProgressTracker` to manage progress across multiple actors during refresh operations, preventing race conditions. - Updated data structures to track per-actor progress for list and load phases. - Added new `RefreshProgress` protobuf message for communication. - Enhanced `BarrierCompleteResult` to include refresh progress data. - Integrated the tracker into `DatabaseCheckpointControl` and updated related components for compatibility. - Added migration for new refresh job table and related functionality. Next steps include integrating the tracker with barrier checkpoint control and updating RPC call sites to handle refresh progress.
1 parent 5ece5e6 commit 682af22

File tree

16 files changed

+622
-237
lines changed

16 files changed

+622
-237
lines changed

src/meta/model/migration/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ mod m20250916_120000_add_refresh_fields;
5656
mod m20251005_000000_fragment_splits;
5757
mod m20251016_220528_fragment_parallelism;
5858
mod m20251022_294610_source_refresh_mode;
59+
mod m20251030_120000_refresh_jobs;
5960
mod utils;
6061

6162
pub struct Migrator;
@@ -150,6 +151,7 @@ impl MigratorTrait for Migrator {
150151
Box::new(m20251005_000000_fragment_splits::Migration),
151152
Box::new(m20251016_220528_fragment_parallelism::Migration),
152153
Box::new(m20251022_294610_source_refresh_mode::Migration),
154+
Box::new(m20251030_120000_refresh_jobs::Migration),
153155
]
154156
}
155157
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use sea_orm_migration::prelude::*;
2+
3+
#[derive(DeriveMigrationName)]
4+
pub struct Migration;
5+
6+
#[async_trait::async_trait]
7+
impl MigrationTrait for Migration {
8+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9+
manager
10+
.create_table(
11+
sea_orm_migration::prelude::Table::create()
12+
.table(RefreshJob::Table)
13+
.col(
14+
ColumnDef::new(RefreshJob::TableId)
15+
.integer()
16+
.not_null()
17+
.primary_key(),
18+
)
19+
.col(
20+
ColumnDef::new(RefreshJob::JobCreateTime)
21+
.date_time()
22+
.not_null()
23+
.default(Expr::current_timestamp()),
24+
)
25+
.col(ColumnDef::new(RefreshJob::LastTriggerTime).date_time())
26+
.col(ColumnDef::new(RefreshJob::TriggerIntervalSecs).big_integer())
27+
.col(
28+
ColumnDef::new(RefreshJob::CurrentStatus)
29+
.string_len(16)
30+
.not_null()
31+
.default("IDLE"),
32+
)
33+
.foreign_key(
34+
ForeignKey::create()
35+
.name("fk_refresh_job_table")
36+
.from(RefreshJob::Table, RefreshJob::TableId)
37+
.to(Table::Table, Table::TableId)
38+
.on_delete(ForeignKeyAction::Cascade),
39+
)
40+
.to_owned(),
41+
)
42+
.await
43+
}
44+
45+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
46+
manager
47+
.drop_table(
48+
sea_orm_migration::prelude::Table::drop()
49+
.table(RefreshJob::Table)
50+
.to_owned(),
51+
)
52+
.await
53+
}
54+
}
55+
56+
#[derive(DeriveIden)]
57+
enum RefreshJob {
58+
#[sea_orm(iden = "refresh_job")]
59+
Table,
60+
TableId,
61+
JobCreateTime,
62+
LastTriggerTime,
63+
TriggerIntervalSecs,
64+
CurrentStatus,
65+
}
66+
67+
#[derive(DeriveIden)]
68+
enum Table {
69+
#[sea_orm(iden = "table")]
70+
Table,
71+
TableId,
72+
}

src/meta/model/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub mod iceberg_tables;
5353
pub mod index;
5454
pub mod object;
5555
pub mod object_dependency;
56+
pub mod refresh_job;
5657
pub mod schema;
5758
pub mod secret;
5859
pub mod serde_seaql_migration;

src/meta/model/src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub use super::hummock_version_stats::Entity as HummockVersionStats;
3333
pub use super::index::Entity as Index;
3434
pub use super::object::Entity as Object;
3535
pub use super::object_dependency::Entity as ObjectDependency;
36+
pub use super::refresh_job::Entity as RefreshJob;
3637
pub use super::schema::Entity as Schema;
3738
pub use super::secret::Entity as Secret;
3839
pub use super::session_parameter::Entity as SessionParameter;

src/meta/model/src/refresh_job.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use sea_orm::entity::prelude::*;
16+
use serde::{Deserialize, Serialize};
17+
18+
use crate::TableId;
19+
20+
#[derive(
21+
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, Copy, Hash,
22+
)]
23+
#[sea_orm(rs_type = "String", db_type = "string(None)")]
24+
pub enum RefreshJobStatus {
25+
#[sea_orm(string_value = "IDLE")]
26+
Idle,
27+
#[sea_orm(string_value = "RUNNING")]
28+
Running,
29+
#[sea_orm(string_value = "DISABLED")]
30+
Disabled,
31+
}
32+
33+
impl RefreshJobStatus {
34+
pub fn is_active(self) -> bool {
35+
matches!(self, RefreshJobStatus::Idle | RefreshJobStatus::Running)
36+
}
37+
}
38+
39+
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
40+
#[sea_orm(table_name = "refresh_job")]
41+
pub struct Model {
42+
#[sea_orm(primary_key, auto_increment = false)]
43+
pub table_id: TableId,
44+
pub job_create_time: DateTime,
45+
pub last_trigger_time: Option<DateTime>,
46+
pub trigger_interval_secs: Option<i64>,
47+
pub current_status: RefreshJobStatus,
48+
}
49+
50+
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
51+
pub enum Relation {
52+
#[sea_orm(
53+
belongs_to = "super::table::Entity",
54+
from = "Column::TableId",
55+
to = "super::table::Column::TableId",
56+
on_update = "Cascade",
57+
on_delete = "Cascade"
58+
)]
59+
Table,
60+
}
61+
62+
impl Related<super::table::Entity> for Entity {
63+
fn to() -> RelationDef {
64+
Relation::Table.def()
65+
}
66+
}
67+
68+
impl ActiveModelBehavior for ActiveModel {}

src/meta/node/src/server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
3535
use risingwave_meta::rpc::ElectionClientRef;
3636
use risingwave_meta::rpc::election::dummy::DummyElectionClient;
3737
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38-
use risingwave_meta::stream::ScaleController;
38+
use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
3939
use risingwave_meta_service::AddressInfo;
4040
use risingwave_meta_service::backup_service::BackupServiceImpl;
4141
use risingwave_meta_service::cloud_service::CloudServiceImpl;
@@ -506,6 +506,14 @@ pub async fn start_service_as_election_leader(
506506
env.opts.iceberg_gc_interval_sec,
507507
));
508508

509+
let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
510+
metadata_manager.clone(),
511+
barrier_scheduler.clone(),
512+
env.shared_actor_infos().clone(),
513+
)
514+
.await?;
515+
sub_tasks.push((refresh_handle, refresh_shutdown));
516+
509517
let scale_controller = Arc::new(ScaleController::new(
510518
&metadata_manager,
511519
source_manager.clone(),
@@ -521,6 +529,7 @@ pub async fn start_service_as_election_leader(
521529
sink_manager.clone(),
522530
scale_controller.clone(),
523531
barrier_scheduler.clone(),
532+
refresh_manager.clone(),
524533
)
525534
.await;
526535
tracing::info!("GlobalBarrierManager started");
@@ -579,6 +588,7 @@ pub async fn start_service_as_election_leader(
579588
barrier_manager.clone(),
580589
stream_manager.clone(),
581590
metadata_manager.clone(),
591+
refresh_manager.clone(),
582592
);
583593
let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
584594
let hummock_srv = HummockServiceImpl::new(

src/meta/service/src/stream_service.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use risingwave_meta::controller::fragment::StreamingJobInfo;
2525
use risingwave_meta::controller::utils::FragmentDesc;
2626
use risingwave_meta::manager::MetadataManager;
2727
use risingwave_meta::model::ActorId;
28-
use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
28+
use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo, ThrottleConfig};
2929
use risingwave_meta::{MetaError, model};
3030
use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
3131
use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
@@ -55,6 +55,7 @@ pub struct StreamServiceImpl {
5555
barrier_manager: BarrierManagerRef,
5656
stream_manager: GlobalStreamManagerRef,
5757
metadata_manager: MetadataManager,
58+
refresh_manager: GlobalRefreshManagerRef,
5859
}
5960

6061
impl StreamServiceImpl {
@@ -64,13 +65,15 @@ impl StreamServiceImpl {
6465
barrier_manager: BarrierManagerRef,
6566
stream_manager: GlobalStreamManagerRef,
6667
metadata_manager: MetadataManager,
68+
refresh_manager: GlobalRefreshManagerRef,
6769
) -> Self {
6870
StreamServiceImpl {
6971
env,
7072
barrier_scheduler,
7173
barrier_manager,
7274
stream_manager,
7375
metadata_manager,
76+
refresh_manager,
7477
}
7578
}
7679
}
@@ -486,7 +489,7 @@ impl StreamManagerService for StreamServiceImpl {
486489
.chain(source_fragments.values().flatten().copied())
487490
.collect();
488491

489-
let guard = self.env.shared_actor_info.read_guard();
492+
let guard = self.env.shared_actor_infos().read_guard();
490493
guard
491494
.iter_over_fragments()
492495
.filter(|(frag_id, _)| all_fragment_ids.contains(frag_id))
@@ -588,14 +591,9 @@ impl StreamManagerService for StreamServiceImpl {
588591

589592
tracing::info!("Refreshing table with id: {}", req.table_id);
590593

591-
// Create refresh manager and execute refresh
592-
let refresh_manager = risingwave_meta::stream::RefreshManager::new(
593-
self.metadata_manager.clone(),
594-
self.barrier_scheduler.clone(),
595-
);
596-
597-
let response = refresh_manager
598-
.refresh_table(req, self.env.shared_actor_infos())
594+
let response = self
595+
.refresh_manager
596+
.trigger_manual_refresh(req, self.env.shared_actor_infos())
599597
.await?;
600598

601599
Ok(Response::new(response))

src/meta/src/barrier/context/context_impl.rs

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::{HashMap, HashSet};
1616
use std::sync::Arc;
1717

18-
use anyhow::{Context, anyhow};
18+
use anyhow::Context;
1919
use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
2020
use risingwave_common::id::JobId;
2121
use risingwave_meta_model::ActorId;
@@ -28,6 +28,7 @@ use risingwave_pb::stream_service::barrier_complete_response::{
2828
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
2929
use risingwave_rpc_client::StreamingControlHandle;
3030

31+
use crate::MetaResult;
3132
use crate::barrier::command::CommandContext;
3233
use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
3334
use crate::barrier::progress::TrackingJob;
@@ -39,8 +40,7 @@ use crate::barrier::{
3940
};
4041
use crate::hummock::CommitEpochInfo;
4142
use crate::model::FragmentDownstreamRelation;
42-
use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, SplitState};
43-
use crate::{MetaError, MetaResult};
43+
use crate::stream::{SourceChange, SplitState};
4444

4545
impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
4646
#[await_tree::instrument]
@@ -134,17 +134,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
134134
}
135135

136136
for ((table_id, associated_source_id), actors) in list_finished_info {
137-
let allow_yield = {
138-
let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
139-
let single_task_tracker =
140-
lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
141-
MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
142-
})?;
143-
single_task_tracker.report_list_finished(actors.iter().copied());
144-
let allow_yield = single_task_tracker.is_list_finished()?;
145-
146-
Ok::<_, MetaError>(allow_yield)
147-
}?;
137+
let allow_yield = self
138+
.refresh_manager
139+
.mark_list_stage_finished(table_id, &actors)?;
148140

149141
if !allow_yield {
150142
continue;
@@ -194,17 +186,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
194186
}
195187

196188
for ((table_id, associated_source_id), actors) in load_finished_info {
197-
let allow_yield = {
198-
let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
199-
let single_task_tracker =
200-
lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
201-
MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
202-
})?;
203-
single_task_tracker.report_load_finished(actors.iter().copied());
204-
let allow_yield = single_task_tracker.is_load_finished()?;
205-
206-
Ok::<_, MetaError>(allow_yield)
207-
}?;
189+
let allow_yield = self
190+
.refresh_manager
191+
.mark_load_stage_finished(table_id, &actors)?;
208192

209193
if !allow_yield {
210194
continue;
@@ -244,29 +228,18 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
244228
refresh_finished_table_job_ids: Vec<JobId>,
245229
) -> MetaResult<()> {
246230
for job_id in refresh_finished_table_job_ids {
247-
{
248-
let table_id = &job_id.as_mv_table_id();
249-
let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
250-
let remove_res = lock_handle.inner.remove(table_id);
251-
debug_assert!(remove_res.is_some());
252-
253-
// try remove the table_id from the table_id_by_database_id
254-
lock_handle
255-
.table_id_by_database_id
256-
.values_mut()
257-
.for_each(|table_ids| {
258-
table_ids.remove(table_id);
259-
});
260-
}
231+
let table_id = job_id.as_mv_table_id();
232+
233+
self.refresh_manager.mark_refresh_complete(table_id).await?;
261234

262235
// Update the table's refresh state back to Idle (refresh complete)
263236
self.metadata_manager
264237
.catalog_controller
265-
.set_table_refresh_state(job_id.as_mv_table_id(), RefreshState::Idle)
238+
.set_table_refresh_state(table_id, RefreshState::Idle)
266239
.await
267240
.context("Failed to set table refresh state to Idle")?;
268241

269-
tracing::info!(%job_id, "Table refresh completed, state updated to Idle");
242+
tracing::info!(%job_id, %table_id, "Table refresh completed, state updated to Idle");
270243
}
271244

272245
Ok(())
@@ -309,9 +282,16 @@ impl CommandContext {
309282
}
310283

311284
Command::DropStreamingJobs {
285+
streaming_job_ids,
312286
unregistered_state_table_ids,
313287
..
314288
} => {
289+
for job_id in streaming_job_ids {
290+
barrier_manager_context
291+
.refresh_manager
292+
.remove_progress_tracker(job_id.as_mv_table_id());
293+
}
294+
315295
barrier_manager_context
316296
.hummock_manager
317297
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())

0 commit comments

Comments
 (0)