Skip to content

feat: add async HuggingFace export support#1681

Open
caixianzhang wants to merge 5 commits intoInternLM:mainfrom
caixianzhang:feature/async-hf
Open

feat: add async HuggingFace export support#1681
caixianzhang wants to merge 5 commits intoInternLM:mainfrom
caixianzhang:feature/async-hf

Conversation

@caixianzhang
Copy link
Copy Markdown

Summary

This PR adds the async_hf_export runtime implementation in XTuner and the corresponding unit tests.

The goal is to reduce foreground training stalls at HF export points while keeping the final exported HF directory consumable in the same shape as the existing sync HF path.

Main runtime changes are in:

  • xtuner/v1/train/trainer.py
  • xtuner/v1/engine/train_engine.py
  • xtuner/v1/model/base.py

Tests are added in:

  • xtuner/tests/train/test_trainer_async_hf.py

What This PR Adds

1. Async HF runtime path

This PR adds a dedicated async HF export flow controlled by async_hf_export.

At a high level, the flow is:

  1. Trainer._maybe_async_save_hf() checks whether the current step hits the HF export interval.
  2. The parent process prepares a rank-local CPU snapshot for HF export.
  3. A child process writes .safetensors files in the background.
  4. The parent process continues training immediately.
  5. On the next HF trigger or at the end of training, the parent waits and finalizes the pending export.

The async HF path is intentionally separate from async checkpoint:

  • async_checkpoint controls training-state checkpointing
  • async_hf_export controls HF export only

2. Prepare / writer split

This PR adds a prepare/writer split in TrainEngine:

  • prepare_async_hf_snapshot(hf_dir=...)
    • iterates HF save chunks from BaseModel._iter_hf_save_chunks(...)
    • refreshes the long-lived _async_hf_tensor_cache
    • records the rank-local weight_map
  • write_async_hf_snapshot()
    • runs in the forked child process
    • writes rank-local .safetensors
    • writes async-hf-writer-status-rank-xxxxx-of-yyyyy.json

This keeps the expensive file-writing portion off the foreground training path.

3. Commit semantics

The async HF path uses explicit commit semantics:

  • export first writes into hf-<step>.incomplete
  • parent waits for child completion and gathers all-rank writer status
  • only if all ranks succeed:
    • rank0 writes HF index/config
    • tokenizer files are saved when applicable
    • hf-<step>.incomplete is renamed to hf-<step>
    • hf-latest is updated
    • .xtuner metadata is updated

This avoids exposing half-written HF exports as committed artifacts.

4. Reuse of latest sync HF infrastructure

The async HF path is built on the latest sync HF helpers in BaseModel, including:

  • _iter_hf_save_chunks(...)
  • _write_hf_save_plan(...)
  • _write_hf_index_and_config(...)

As a result, async HF inherits the current sync HF save semantics, including newer behaviors around:

  • fp32_keys_pattern
  • _get_save_dtype()
  • ignored/FSDP-excluded parameters in HF export

5. Retention behavior

This PR also wires async HF into the existing retention/update flow:

  • hf_max_keep cleanup works for async HF exports
  • old HF export directories are removed during finalize
  • meta.latest_exp.hf_checkpoint_list stays consistent with on-disk state
  • hf-latest points to the latest committed async HF export

Tests

Added tests:

  • TestTrainerAsyncSaveHF.test_async_save_hf_interval
  • TestTrainerAsyncSaveHF.test_async_save_hf_raises_on_writer_failure

Covered behaviors:

  • async HF export triggers at the expected save steps
  • pending async HF jobs are finalized correctly
  • per-rank writer status is merged into the final HF weight_map
  • rank0 writes final HF index/config artifacts
  • hf_max_keep cleanup behaves as expected
  • hf-latest points to the newest committed export
  • writer failure raises a global consistency error
  • pending async HF state is cleared after failure

Test note:

  • the fake trainer config disables sync checkpoint saving by default:
    • checkpoint_interval=None
    • snapshot_interval=None
      This keeps the unit test focused on async HF and avoids requiring a fake save_dcp() implementation in the mock engine.

Executed with:

cd /mnt/shared-storage-user/ailab-sys/zhangcaixian/my_xtunner/upload/xtuner
XTUNER_TEST_WORLD_SIZE=2 python3 -m pytest tests/train/test_trainer_async_hf.py -v

Result:

2 passed

Scope

This PR adds:

  • async HF runtime implementation
  • async HF unit tests

This PR does not change:

  • sync HF export behavior
  • checkpoint save/load behavior outside the trainer integration points needed for async HF
  • existing tests/train/test_trainer.py

- support async save_pretrained
- add async trainer pipeline
- add test for async export
- keep backward compatibility
@HAOCHENYE
Copy link
Copy Markdown
Collaborator

@claude review

@@ -1,5 +1,6 @@
import json
import os
import hashlib
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit: hashlib should be placed before json to maintain alphabetical import ordering (stdlib convention).

Suggested change
import hashlib
import hashlib

Comment on lines +113 to +114
self.fake_hf_model_dir = Path(self.temp_dir.name) / "fake_hf_model"
self.work_dir = Path(self.temp_dir.name) / "work_dir"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit: The fake's _async_hf_writer_status_filename uses underscores and no zero-padding (async_hf_writer_status_rank{rank}_of_{world_size}.json) while the real engine uses dashes and 5-digit zero-padding (async-hf-writer-status-rank-{rank:05d}-of-{world_size:05d}.json). While this is internally consistent within the test (both read and write go through the fake), keeping the format closer to the real implementation would catch accidental format mismatches if the trainer ever parses filenames by pattern.

Comment on lines +362 to +368
save_dtype=save_dtype,
safetensors_prefix=safetensors_prefix,
device=DEVICE,
):
cached_names: list[str] = []
for name, hf_tensor in zip(name_list, hf_tensor_list):
cache_key = (("root", "hf"), ("name", name))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — GPU memory during prepare phase: _iter_hf_save_chunks is called here with device=DEVICE (GPU), which means allgathered tensors reside on GPU before being copied to CPU pinned memory. In the existing sync path (_save_hf), generators default to device="cpu", so allgathered results go directly to host memory.

For large models, holding the gathered shards on GPU during the prepare iteration could cause OOM, since this runs during training while activations and optimizer states are still resident. Consider using device="cpu" here (matching the sync path) at the cost of slower allgather, or at minimum document the increased GPU memory requirement when async_hf_export=True.

Comment thread xtuner/v1/model/base.py
Comment on lines +1295 to +1298

def _write_hf_save_plan(self, save_plan: Mapping[str, object]) -> list[str]:
hf_dir = cast(Path, save_plan["hf_dir"])
save_tasks = cast(list[tuple[str, dict[str, torch.Tensor]]], save_plan["save_tasks"])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit — weakly typed parameter: save_plan: Mapping[str, object] is immediately cast to specific types. Per CLAUDE.md, consider using a TypedDict to make the contract explicit:

class _HFSavePlan(TypedDict):
    hf_dir: Path
    save_tasks: list[tuple[str, dict[str, torch.Tensor]]]

This would eliminate the cast calls and make the expected shape discoverable at the type level.

@@ -1609,6 +1621,148 @@ def _maybe_save_hf(self):
total_epoch=self.total_epoch,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — missing return type annotation: CLAUDE.md requires type hints for all new function signatures. This method should declare -> None.

Same applies to _finalize_pending_async_hf (although _wait_and_finalize_pending_async_hf already has it — nice).

Comment on lines +1644 to +1651

self._engine.prepare_async_hf_snapshot(hf_dir=tmp_hf_path)
pid = os.fork()
if pid == 0:
try:
self._engine.write_async_hf_snapshot()
os._exit(0)
except Exception as exc:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — os.fork() + CUDA/NCCL threading risk: os.fork() is used here while NCCL background threads and potential CUDA streams are active in the parent. The forked child inherits these in an undefined state. The os._exit(0) mitigation (avoiding destructors) is the standard pattern used by Megatron/DeepSpeed, but if write_async_hf_snapshot() inadvertently triggers any CUDA or NCCL operation, the child will deadlock silently.

Recommend adding a brief code comment noting this constraint (child must be strictly CPU-only I/O), so future contributors don't accidentally add GPU operations in the writer path.

Comment on lines +1660 to +1663
"tmp_hf_path": tmp_hf_path,
"latest_hf_link": latest_hf_link,
"meta_path": meta_path,
"async_writer_status_path": self.exp_dir
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — status files leak on disk: After _wait_and_finalize_pending_async_hf succeeds, the per-rank async-hf-writer-status-*.json files in exp_dir are never cleaned up. Over many training steps these accumulate. Consider deleting them after a successful finalization (similar to how .incomplete dirs are cleaned up via rename).

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 21, 2026

Claude: ## Code Review Summary

Well-structured PR that adds async HF export with clean commit separation (commit 1 adds everything, commit 2 removes the out-of-scope async checkpoint code). The prepare/write split, commit semantics (.incomplete → final rename), and per-rank writer status design are solid.

Issues

Warning

  • [xtuner/v1/engine/train_engine.py:362-368] _iter_hf_save_chunks called with device=DEVICE (GPU) during prepare, keeping allgathered tensors on GPU. The sync path defaults to device="cpu". Could cause OOM for large models during training.
  • [xtuner/v1/train/trainer.py:1644-1651] os.fork() with active NCCL threads — standard risk mitigated by os._exit(), but a comment noting the child must stay CPU-only would help future contributors.
  • [xtuner/v1/train/trainer.py:1660-1663] Per-rank async-hf-writer-status-*.json files are never cleaned up after finalization.
  • [xtuner/v1/train/trainer.py:1622] _maybe_async_save_hf missing return type annotation (CLAUDE.md compliance).

Nit

  • [xtuner/v1/engine/train_engine.py:3] import hashlib should be before import json (alphabetical ordering).
  • [xtuner/v1/model/base.py:1295-1298] _write_hf_save_plan takes Mapping[str, object] then casts — a TypedDict would be cleaner.
  • [tests/train/test_trainer_async_hf.py:113-114] Fake status filename format (underscores, no zero-padding) diverges from real engine format (dashes, 5-digit padding).

Verdict

COMMENT — no critical bugs found. The warnings above are quality/safety improvements worth addressing before merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants