-
Notifications
You must be signed in to change notification settings - Fork 708
feat: add periodic refresh table jobs & refactor ProgressTracker #23737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces infrastructure for periodic refresh table jobs and refactors the progress tracking mechanism from a static global singleton to an instance-based approach managed by a new GlobalRefreshManager.
- Added a new
periodic_refresh_jobsdatabase table to track periodic refresh schedules and status - Introduced
GlobalRefreshManagerto centralize refresh process management and periodic job scheduling - Refactored
REFRESH_TABLE_PROGRESS_TRACKERfrom a staticLazyLock<Mutex<>>to an instance-basedArc<RwLock<>>owned byGlobalRefreshManager
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/meta/src/stream/refresh_manager.rs | Core implementation of GlobalRefreshManager with periodic refresh scheduling, job registration, and refactored progress tracking from static global to instance-based |
| src/meta/src/barrier/worker.rs | Updated to accept and pass GlobalRefreshManagerRef to barrier worker context |
| src/meta/src/barrier/manager.rs | Updated to accept and pass GlobalRefreshManagerRef to barrier manager |
| src/meta/src/barrier/context/recovery.rs | Updated to use instance-based progress tracker instead of static global |
| src/meta/src/barrier/context/mod.rs | Added global_refresh_manager field to worker context struct |
| src/meta/src/barrier/context/context_impl.rs | Updated all progress tracker accesses to use instance-based tracker from global_refresh_manager |
| src/meta/service/src/stream_service.rs | Added global_refresh_manager parameter to stream service and passed to RefreshManager |
| src/meta/node/src/server.rs | Initialized GlobalRefreshManager, started periodic refresh loop, and wired it through the service stack (contains duplicate code blocks that need cleanup) |
| src/meta/model/src/periodic_refresh_job.rs | New entity model for periodic refresh jobs table |
| src/meta/model/src/lib.rs | Added periodic_refresh_job module export |
| src/meta/model/migration/src/m20251110_224156_periodic_refresh_jobs.rs | New migration to create periodic_refresh_jobs table |
| src/meta/model/migration/src/lib.rs | Registered new periodic refresh jobs migration |
- Introduced `RefreshProgressTracker` to manage progress across multiple actors during refresh operations, preventing race conditions. - Updated data structures to track per-actor progress for list and load phases. - Added new `RefreshProgress` protobuf message for communication. - Enhanced `BarrierCompleteResult` to include refresh progress data. - Integrated the tracker into `DatabaseCheckpointControl` and updated related components for compatibility. - Added migration for new refresh job table and related functionality. Next steps include integrating the tracker with barrier checkpoint control and updating RPC call sites to handle refresh progress.
0b6a3a8 to
682af22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
- Updated SLT queries to include retry logic with backoff for improved reliability. - Removed redundant logging in `context_impl.rs` after table refresh completion. - Enhanced error handling in `alter_op.rs` for refresh job insertion, logging when a job already exists. - Added logging for table refresh completion in `refresh_manager.rs`. - Changed logging level from info to debug in `materialize.rs` for progress tracking.
b0ac399 to
837a81d
Compare
- Removed the `refresh_state` field from the `Table` message in `catalog.proto` and related code. - Updated `TableCatalog` and other components to eliminate references to the removed `refresh_state`. - Refactored the `RefreshState` enum and its usage across the codebase to streamline refresh job management. - Adjusted migration files to reflect the removal of the `source_refresh_mode` migration. - Enhanced the refresh job status handling in various modules to ensure consistency.
- Introduced `ListRefreshTableStatesRequest` and `ListRefreshTableStatesResponse` messages in `meta.proto` to facilitate querying refresh job states. - Implemented the `list_refresh_table_states` method in the `FrontendMetaClient` and `StreamManagerService` to handle the new RPC. - Created `RwRefreshTableState` struct to represent the state of refresh jobs in the system catalog. - Updated migration files to accommodate changes in refresh job handling. - Enhanced the `MetaClient` to support the new RPC call for listing refresh table states.
…tures - Added a new migration for `source_refresh_mode` to enhance refresh job capabilities. - Updated `RefreshJob` and `RwRefreshTableState` structures to remove deprecated fields and accommodate new logic. - Modified the `StreamManagerService` to handle timestamp conversions for last trigger times. - Enhanced the `GlobalRefreshManager` to streamline refresh job management and ensure proper state handling. - Included `chrono` dependency for improved date and time handling across the codebase.
|
@tabVersion This is a user-facing feature, so please add a release note to describe this feature and provide an example to illustrate how to use it, thanks. |
Please also mention that this is an experimental feature for doc team's reference. |
proto/plan_common.proto
Outdated
| message SourceRefreshMode { | ||
| message SourceRefreshModeStreaming {} | ||
| message SourceRefreshModeFullRecompute {} | ||
| message SourceRefreshModeFullRecompute { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User facing question: after this PR, user specifies refresh_mode = 'FULL_RECOMPUTE' in order to use the refreshable batch source table feature. Personally I feel FULL_RECOMPUTE can mislead user to think that there will be full recomputation on the MV instead of just on the table to calculate the diff.
I am thinking whether we should call it refresh_mode = 'SNAPSHOT_DIFF' instead. WDYT? @tabVersion @chenzl25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We borrow the same name from here https://docs.databricks.com/aws/en/optimizations/incremental-refresh#determine-the-refresh-type-of-an-update I think it is fine, because that's how to refresh this table.snapshot diff is more like something we generate it to the downstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had an offline discussion, the name finalized to FULL_RELOAD
…ation - Added a new `refresh_manager.py` file to define panels for monitoring refresh job metrics in the RisingWave Dev Dashboard. - Updated the `MetaMetrics` struct to include metrics for refresh job duration, finish count, cron job triggers, and misses. - Enhanced the `GlobalRefreshManager` to track and report refresh job metrics, including success and failure statuses. - Modified the `remove_progress_tracker` method to log metrics upon job completion or failure. - Updated the dashboard JSON files to reflect the new refresh manager panels.
…ELOAD' - Changed references in multiple files to reflect the updated refresh mode terminology. - Adjusted error messages and logic to ensure consistency with the new naming convention. - Updated related tests and utility functions to align with the changes.
|
✅ Cherry-pick PRs (or issues if encountered conflicts) have been created successfully to all target branches. |
) Co-authored-by: tab <[email protected]>
periodic_refresh_jobstable.GlobalRefreshManagerto manage ongoing refresh processes and periodic refresh jobs.I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
following #23527
Key aspects of this change include:
Checklist
Documentation
Release note
PLEASE MARK AS EXPERIMENTAL
Now, with FULL_RELOAD, you gain:
This means you can set up data sources (like Iceberg tables) to be periodically reloaded into RisingWave, ensuring your views and queries are always up-to-date with the latest batch data, without manual intervention.
How it Works
When creating a refreshable table, you can now specify a FULL_RELOAD mode with an optional refresh_interval_sec property:
still can manually refresh the table
the status can be queried with sql