feat: add async HuggingFace export support#1681
feat: add async HuggingFace export support#1681caixianzhang wants to merge 5 commits intoInternLM:mainfrom
Conversation
- support async save_pretrained - add async trainer pipeline - add test for async export - keep backward compatibility
|
@claude review |
| @@ -1,5 +1,6 @@ | |||
| import json | |||
| import os | |||
| import hashlib | |||
There was a problem hiding this comment.
Claude: Nit: hashlib should be placed before json to maintain alphabetical import ordering (stdlib convention).
| import hashlib | |
| import hashlib |
| self.fake_hf_model_dir = Path(self.temp_dir.name) / "fake_hf_model" | ||
| self.work_dir = Path(self.temp_dir.name) / "work_dir" |
There was a problem hiding this comment.
Claude: Nit: The fake's _async_hf_writer_status_filename uses underscores and no zero-padding (async_hf_writer_status_rank{rank}_of_{world_size}.json) while the real engine uses dashes and 5-digit zero-padding (async-hf-writer-status-rank-{rank:05d}-of-{world_size:05d}.json). While this is internally consistent within the test (both read and write go through the fake), keeping the format closer to the real implementation would catch accidental format mismatches if the trainer ever parses filenames by pattern.
| save_dtype=save_dtype, | ||
| safetensors_prefix=safetensors_prefix, | ||
| device=DEVICE, | ||
| ): | ||
| cached_names: list[str] = [] | ||
| for name, hf_tensor in zip(name_list, hf_tensor_list): | ||
| cache_key = (("root", "hf"), ("name", name)) |
There was a problem hiding this comment.
Claude: Warning — GPU memory during prepare phase: _iter_hf_save_chunks is called here with device=DEVICE (GPU), which means allgathered tensors reside on GPU before being copied to CPU pinned memory. In the existing sync path (_save_hf), generators default to device="cpu", so allgathered results go directly to host memory.
For large models, holding the gathered shards on GPU during the prepare iteration could cause OOM, since this runs during training while activations and optimizer states are still resident. Consider using device="cpu" here (matching the sync path) at the cost of slower allgather, or at minimum document the increased GPU memory requirement when async_hf_export=True.
|
|
||
| def _write_hf_save_plan(self, save_plan: Mapping[str, object]) -> list[str]: | ||
| hf_dir = cast(Path, save_plan["hf_dir"]) | ||
| save_tasks = cast(list[tuple[str, dict[str, torch.Tensor]]], save_plan["save_tasks"]) |
There was a problem hiding this comment.
Claude: Nit — weakly typed parameter: save_plan: Mapping[str, object] is immediately cast to specific types. Per CLAUDE.md, consider using a TypedDict to make the contract explicit:
class _HFSavePlan(TypedDict):
hf_dir: Path
save_tasks: list[tuple[str, dict[str, torch.Tensor]]]This would eliminate the cast calls and make the expected shape discoverable at the type level.
| @@ -1609,6 +1621,148 @@ def _maybe_save_hf(self): | |||
| total_epoch=self.total_epoch, | |||
| ) | |||
There was a problem hiding this comment.
Claude: Warning — missing return type annotation: CLAUDE.md requires type hints for all new function signatures. This method should declare -> None.
Same applies to _finalize_pending_async_hf (although _wait_and_finalize_pending_async_hf already has it — nice).
|
|
||
| self._engine.prepare_async_hf_snapshot(hf_dir=tmp_hf_path) | ||
| pid = os.fork() | ||
| if pid == 0: | ||
| try: | ||
| self._engine.write_async_hf_snapshot() | ||
| os._exit(0) | ||
| except Exception as exc: |
There was a problem hiding this comment.
Claude: Warning — os.fork() + CUDA/NCCL threading risk: os.fork() is used here while NCCL background threads and potential CUDA streams are active in the parent. The forked child inherits these in an undefined state. The os._exit(0) mitigation (avoiding destructors) is the standard pattern used by Megatron/DeepSpeed, but if write_async_hf_snapshot() inadvertently triggers any CUDA or NCCL operation, the child will deadlock silently.
Recommend adding a brief code comment noting this constraint (child must be strictly CPU-only I/O), so future contributors don't accidentally add GPU operations in the writer path.
| "tmp_hf_path": tmp_hf_path, | ||
| "latest_hf_link": latest_hf_link, | ||
| "meta_path": meta_path, | ||
| "async_writer_status_path": self.exp_dir |
There was a problem hiding this comment.
Claude: Warning — status files leak on disk: After _wait_and_finalize_pending_async_hf succeeds, the per-rank async-hf-writer-status-*.json files in exp_dir are never cleaned up. Over many training steps these accumulate. Consider deleting them after a successful finalization (similar to how .incomplete dirs are cleaned up via rename).
|
Claude: ## Code Review Summary Well-structured PR that adds async HF export with clean commit separation (commit 1 adds everything, commit 2 removes the out-of-scope async checkpoint code). The prepare/write split, commit semantics ( IssuesWarning
Nit
VerdictCOMMENT — no critical bugs found. The warnings above are quality/safety improvements worth addressing before merge. |
Summary
This PR adds the
async_hf_exportruntime implementation in XTuner and the corresponding unit tests.The goal is to reduce foreground training stalls at HF export points while keeping the final exported HF directory consumable in the same shape as the existing sync HF path.
Main runtime changes are in:
xtuner/v1/train/trainer.pyxtuner/v1/engine/train_engine.pyxtuner/v1/model/base.pyTests are added in:
xtuner/tests/train/test_trainer_async_hf.pyWhat This PR Adds
1. Async HF runtime path
This PR adds a dedicated async HF export flow controlled by
async_hf_export.At a high level, the flow is:
Trainer._maybe_async_save_hf()checks whether the current step hits the HF export interval..safetensorsfiles in the background.The async HF path is intentionally separate from async checkpoint:
async_checkpointcontrols training-state checkpointingasync_hf_exportcontrols HF export only2. Prepare / writer split
This PR adds a prepare/writer split in
TrainEngine:prepare_async_hf_snapshot(hf_dir=...)BaseModel._iter_hf_save_chunks(...)_async_hf_tensor_cacheweight_mapwrite_async_hf_snapshot().safetensorsasync-hf-writer-status-rank-xxxxx-of-yyyyy.jsonThis keeps the expensive file-writing portion off the foreground training path.
3. Commit semantics
The async HF path uses explicit commit semantics:
hf-<step>.incompletehf-<step>.incompleteis renamed tohf-<step>hf-latestis updated.xtunermetadata is updatedThis avoids exposing half-written HF exports as committed artifacts.
4. Reuse of latest sync HF infrastructure
The async HF path is built on the latest sync HF helpers in
BaseModel, including:_iter_hf_save_chunks(...)_write_hf_save_plan(...)_write_hf_index_and_config(...)As a result, async HF inherits the current sync HF save semantics, including newer behaviors around:
fp32_keys_pattern_get_save_dtype()5. Retention behavior
This PR also wires async HF into the existing retention/update flow:
hf_max_keepcleanup works for async HF exportsmeta.latest_exp.hf_checkpoint_liststays consistent with on-disk statehf-latestpoints to the latest committed async HF exportTests
Added tests:
TestTrainerAsyncSaveHF.test_async_save_hf_intervalTestTrainerAsyncSaveHF.test_async_save_hf_raises_on_writer_failureCovered behaviors:
weight_maphf_max_keepcleanup behaves as expectedhf-latestpoints to the newest committed exportTest note:
checkpoint_interval=Nonesnapshot_interval=NoneThis keeps the unit test focused on async HF and avoids requiring a fake
save_dcp()implementation in the mock engine.Executed with:
cd /mnt/shared-storage-user/ailab-sys/zhangcaixian/my_xtunner/upload/xtuner XTUNER_TEST_WORLD_SIZE=2 python3 -m pytest tests/train/test_trainer_async_hf.py -vResult:
Scope
This PR adds:
This PR does not change:
tests/train/test_trainer.py