Add async checkpoint feature#1703
Open
VincentCheungKokomo wants to merge 1 commit intoInternLM:mainfrom
Open
Conversation
5730aa4 to
7a7136b
Compare
7a7136b to
302b6ec
Compare
HAOCHENYE
reviewed
Apr 27, 2026
| from xtuner.v1.utils.grad_norm import cal_grad_norm | ||
|
|
||
|
|
||
| if BlockingAsyncStager is not None: |
Collaborator
There was a problem hiding this comment.
In [2]: fw = FileSystemWriter("./")
In [3]: from torch.distributed.checkpoint.staging import AsyncStager, BlockingAsyncStager
In [4]: isinstance(fw, AsyncStager)
Out[4]: True
is _CachingStagingWriter necessary?
| options=_set_options, | ||
| ) | ||
|
|
||
| def load_dcp_merged( |
Collaborator
There was a problem hiding this comment.
The state dict format should be consistant with async_save and save. If merged_state_dict performs better, just replace the current implementation.
Comment on lines
+540
to
+543
| self._async_checkpoint = async_checkpoint | ||
| self._pending_staging_futures: list[Future] | None = None | ||
| self._pending_upload_futures: list[Future] | None = None | ||
| self._pending_checkpoint_finalize: _CheckpointFinalize | None = None |
Collaborator
There was a problem hiding this comment.
Following dcp.async_save, the async interface should return an awaitable future. We can assume there is at most one in-flight async save future in the trainer at any time, and the trainer will always wait for the previous async save to finish before issuing a new one.
| ckpt_saved = self._maybe_save(is_snapshot=False) | ||
| if not ckpt_saved: | ||
| _ = self._maybe_save(is_snapshot=True) | ||
| checkpoint_time = time.time() - time_before_checkpoint |
Collaborator
There was a problem hiding this comment.
Just log the checkpoint time in train_engine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add async DCP checkpoint support
This change adds async checkpoint saving for XTuner v1 training. The trainer
now supports an async_checkpoint option, starts merged async DCP saves for model
and optimizer state, and defers checkpoint metadata finalization until the
background staging/upload futures complete.
The async path writes model and optimizer state into a merged weights/
checkpoint format, while resume keeps compatibility with both the new merged
format and the existing model/optimizer DCP format. Checkpoint metadata is only
registered after async save completion, so failed async saves are not exposed as
resumable checkpoints.
The training engine now creates a dedicated process group for async checkpoint
work, supports merged async save/load helpers, and cleans up the async process
group at trainer shutdown.
Tests and benchmark configs are added to cover async checkpoint intervals and
provide reproducible verification runs for 8B and 30B models.